Example

Overview

Inlong-dataproxy-sdk provides access api for TCP and HTTP protocols. When using TCP or HTTP access, you need to ensure that the dataproxy server has the access configuration of the corresponding protocol (ie the TCP or HTTP source service). If the user needs to use the UDP protocol to access, the user needs to packets data according to the bytes arrays transmitted by TCP, and send them to the dataproxy server using the UDP protocol. and dataproxy server will handle messages in the same way as TCP. In addition, access demo codes for TCP, HTTP, and UDP protocols are provided in the example directory of inlong dataproxy SDK, which you can refer to when accessing.

To view detailed API information overview.

Add Dependency

  1. <dependency>
  2. <groupId>org.apache.inlong</groupId>
  3. <artifactId>dataproxy-sdk</artifactId>
  4. <version>${inlong_version}</version>
  5. </dependency>

TCP Example

Create a messageSender

  1. public DefaultMessageSender getMessageSender(String localIP, String inLongManagerAddr, String inLongManagerPort,
  2. String netTag, String dataProxyGroup, boolean isLocalVisit, boolean isReadProxyIPFromLocal,
  3. String configBasePath, int msgType) {
  4. ProxyClientConfig dataProxyConfig = null;
  5. DefaultMessageSender messageSender = null;
  6. try {
  7. dataProxyConfig = new ProxyClientConfig(localIP, isLocalVisit, inLongManagerAddr,
  8. Integer.valueOf(inLongManagerPort), dataProxyGroup, netTag);
  9. if (StringUtils.isNotEmpty(configBasePath)) {
  10. dataProxyConfig.setConfStoreBasePath(configBasePath);
  11. }
  12. dataProxyConfig.setReadProxyIPFromLocal(isReadProxyIPFromLocal);
  13. messageSender = DefaultMessageSender.generateSenderByClusterId(dataProxyConfig);
  14. messageSender.setMsgtype(msgType);
  15. } catch (Exception e) {
  16. logger.error("getMessageSender has exception e = {}", e);
  17. }
  18. return messageSender;
  19. }

The parameter description is as follows:

parameter nametypeinstruction
inLongManagerAddrStringinlong admin server address
inLongManagerPortStringinlong admin server port
netTagStringNetwork label, not used yet, you can pass an empty string
dataProxyGroupStringdataProxy group name, the name used for local configuration when the user enables local configuration
isLocalVisitbooleanWhether to use local configuration, true use https to access the console, false use http to request the console
isReadProxyIPFromLocalbooleanWhether to obtain the address information of the Dataproxy server from the local configuration file, local self-test, can be set to true if the management console cannot be accessede
configBasePathStringThe path of the local configuration file. The default is ./inlong. When isReadProxyIPFromLocal is true, the configuration file is searched from this directory
msgtypeintMessage assembly type, value (3, 5, 7, 8), it is recommended to use 7, each message type represents a message assembly protocol during the transmission process, please refer to the code implementation of SDK for details

When isReadProxyIPFromLocal is true, the configuration information of Dataproxy will be obtained from the local configuration file.

The path to the local file is :

  1. ${configBasePath}

The file name is :

  1. ${dataProxyGroup}.local

For example:

  1. configBasePath = /data/inlong
  2. dataProxyGroup = inlong_test

Then the full path of the local file is:

  1. /data/inlong/inlong_test.local

The file configuration content is (json format), where host is the address of the DataProxy server, and port is the corresponding port, which requires at least two configurations (the same two items can be configured):

  1. {"isInterVisit":1,"cluster_id":"1","size":1,"switch":1,"address":[{"host":"127.0.0.1","port":"46802"},{"host":"127.0.0.1","port":"46802"}]}

Send Message

  1. public void sendTcpMessage(DefaultMessageSender sender, String inlongGroupId,
  2. String inlongStreamId, String messageBody, long dt) throws Exception {
  3. SendResult result = sender.sendMessage(messageBody.getBytes("utf8"),inlongGroupId, inlongStreamId,
  4. 0, String.valueOf(dt), 20,TimeUnit.SECONDS);
  5. logger.info("messageSender {} ", result);
  6. }

The parameter description is as follows:

parameter nametypeinstruction
senderHttpProxySenderThe sender created in the first step
inlongGroupIdStringinglongGroupId
inlongStreamIdStringinlongStreamId
messageBodyStringSent message content
dtlongtimestamp

HTTP Example

Create MessageSender

  1. public HttpProxySender getMessageSender(String localIP, String inLongManagerAddr, String inLongManagerPort,
  2. String netTag, String dataProxyGroup, boolean isLocalVisit, boolean isReadProxyIPFromLocal,
  3. String configBasePath) {
  4. ProxyClientConfig proxyConfig = null;
  5. HttpProxySender sender = null;
  6. try {
  7. proxyConfig = new ProxyClientConfig(localIP, isLocalVisit, inLongManagerAddr,
  8. Integer.valueOf(inLongManagerPort),
  9. dataProxyGroup, netTag);
  10. proxyConfig.setGroupId(dataProxyGroup);
  11. proxyConfig.setConfStoreBasePath(configBasePath);
  12. proxyConfig.setReadProxyIPFromLocal(isReadProxyIPFromLocal);
  13. proxyConfig.setDiscardOldMessage(true);
  14. sender = new HttpProxySender(proxyConfig);
  15. } catch (ProxysdkException e) {
  16. e.printStackTrace();
  17. } catch (Exception e) {
  18. e.printStackTrace();
  19. }
  20. return sender;
  21. }

The parameter description is as follows:

parameter nametypeinstruction
inLongManagerAddrStringinlong admin server address
inLongManagerPortStringinlong admin server port
netTagStringNetwork label, not used yet, you can pass an empty string
dataProxyGroupStringdataProxy group name, the name used for local configuration when the user enables local configuration
isLocalVisitbooleanWhether to use local configuration, true use https to access the console, false use http to request the console
isReadProxyIPFromLocalbooleanWhether to obtain the address information of the Dataproxy server from the local configuration file, local self-test, can be set to true if the management console cannot be accessed
configBasePathStringThe path of the local configuration file. The default is ./inlong. When isReadProxyIPFromLocal is true, the configuration file is searched from this directory.

When isReadProxyIPFromLocal is set true, the configuration information of Dataproxy will be obtained from the local configuration file.

The path to the local file is:

  1. ${configBasePath}

The file name is:

  1. ${dataProxyGroup}.local<br/>

For example:

  1. configBasePath = /data/inlong
  2. dataProxyGroup = inlong_test

Then the full path name of the local file is:

  1. /data/inlong/inlong_test.local

The file configuration content is (json format), where host is the address of the DataProxy server, and port is the corresponding port, which requires at least two configurations (the same two items can be configured):

  1. {"isInterVisit":1,"cluster_id":"1","size":1,"switch":1,"address":[{"host":"127.0.0.1","port":"46802"},{"host":"127.0.0.1","port":"46802"}]}

Send Message

  1. public void sendHttpMessage(HttpProxySender sender, String inlongGroupId,
  2. String inlongStreamId, String messageBody) throws Exception {
  3. List<String> bodyList = new ArrayList<>();
  4. bodyList.add(messageBody);
  5. sender.asyncSendMessage(bodyList, inlongGroupId, inlongStreamId, System.currentTimeMillis(),
  6. 20, TimeUnit.SECONDS, new MyMessageCallBack());
  7. }

The parameter description is as follows:

parameter nametypeinstruction
senderHttpProxySenderThe sender created in the first step
inlongGroupIdStringinglongGroupId
inlongStreamIdStringinlongStreamId
messageBodyStringSent message content

UDP Example

inlong-dataproxy-sdk does not support sending messages of UDP protocol. If users need it, they need to assemble them according to the message assembly method in SDK. Organize binary arrays and send them in upd mode. For specific examples, refer to the relevant example codes in inlong-sdk/dataporxy-sdk.