示例

总览

Inlong-dataproxy-sdk 提供 TCP、HTTP两种协议的接入 api ,使用 TCP 或者 HTTP 接入时需要保证 Dataproxy 服务器端,有对应的协议的接入配置(即对应的 Source 服务配置)。如果,需要使用 UDP 方式接入, 需要自己按照 TCP 传输的 bytes 数组格式进行组包,采用 UDP 协议发送到 dataproxy 服务器,服务器端采用与TCP一样的方式对接。 此外,在 Inlong-dataproxy-sdk 中的 example 目录下提供了 TCP、HTTP、UDP 三种协议的接入演示代码,大家在接入时可以参考。

Api 详情,请查看总览

增加依赖

  1. <dependency>
  2. <groupId>org.apache.inlong</groupId>
  3. <artifactId>dataproxy-sdk</artifactId>
  4. <version>${inlong_version}</version>
  5. </dependency>

TCP 示例

创建 messageSender

  1. public DefaultMessageSender getMessageSender(String localIP, String inLongManagerAddr, String inLongManagerPort,
  2. String netTag, String dataProxyGroup, boolean isLocalVisit, boolean isReadProxyIPFromLocal,
  3. String configBasePath, int msgType) {
  4. ProxyClientConfig dataProxyConfig = null;
  5. DefaultMessageSender messageSender = null;
  6. try {
  7. dataProxyConfig = new ProxyClientConfig(localIP, isLocalVisit, inLongManagerAddr,
  8. Integer.valueOf(inLongManagerPort), dataProxyGroup, netTag);
  9. if (StringUtils.isNotEmpty(configBasePath)) {
  10. dataProxyConfig.setConfStoreBasePath(configBasePath);
  11. }
  12. dataProxyConfig.setReadProxyIPFromLocal(isReadProxyIPFromLocal);
  13. messageSender = DefaultMessageSender.generateSenderByClusterId(dataProxyConfig);
  14. messageSender.setMsgtype(msgType);
  15. } catch (Exception e) {
  16. logger.error("getMessageSender has exception e = {}", e);
  17. }
  18. return messageSender;
  19. }

参数说明如下:

参数名称类型说明
inLongManagerAddrStringinlong 管理台地址
inLongManagerPortStringinlong 管理台端口
netTagString网络标签,暂未使用,可以传空字符串
dataProxyGroupStringdataProxy 组名称,用户在启用本地配置的时候,用于本地配置的名称
isLocalVisitboolean是否使用本地配置, true 使用 https 访问管理台,false 使用 http 请求管理台
isReadProxyIPFromLocalboolean是否从本地配置文件中获取 Dataproxy 服务器地址信息,本地自测,不能访问管理台的情况下可以配置为 true
configBasePathString本地配置文件的路径 默认 ./inlong,isReadProxyIPFromLocal 为 true 时从这个目录查找配置文件/
msgtypeint消息类型,取值(3,5,7,8),建议使用7,每种消息类型代表一种传递过程中消息的拼装协议,具体请参照SDK的代码实现

当 isReadProxyIPFromLocal 为 true 的时候, 会从本地配置文件中获取 Dataproxy 的配置信息。

本地文件的路径为:

  1. ${configBasePath}

文件名称为:

  1. ${dataProxyGroup}.local

例如:

  1. configBasePath = /data/inlong
  2. dataProxyGroup = inlong_test

则本地文件的全路径名称为:

  1. /data/inlong/inlong_test.local

文件配置内容为( json 格式),其中 host 为 DataProxy 服务器地址,port为对应的端口,这需要至少配置两个(可以配置为相同的两项):

  1. {"isInterVisit":1,"cluster_id":"1","size":1,"switch":1,"address":[{"host":"127.0.0.1","port":"46802"},{"host":"127.0.0.1","port":"46802"}]}

发送消息

  1. public void sendTcpMessage(DefaultMessageSender sender, String inlongGroupId,
  2. String inlongStreamId, String messageBody, long dt) throws Exception {
  3. SendResult result = sender.sendMessage(messageBody.getBytes("utf8"),inlongGroupId, inlongStreamId,
  4. 0, String.valueOf(dt), 20,TimeUnit.SECONDS);
  5. logger.info("messageSender {} ", result);
  6. }

参数说明如下:

参数名称类型说明
senderHttpProxySender第一步创建的 sender
inlongGroupIdStringinglongGroupId
inlongStreamIdStringinlongStreamId
messageBodyString发送的消息内容
dtlong时间戳

HTTP 示例

创建 messageSender

  1. public HttpProxySender getMessageSender(String localIP, String inLongManagerAddr, String inLongManagerPort,
  2. String netTag, String dataProxyGroup, boolean isLocalVisit, boolean isReadProxyIPFromLocal,
  3. String configBasePath) {
  4. ProxyClientConfig proxyConfig = null;
  5. HttpProxySender sender = null;
  6. try {
  7. proxyConfig = new ProxyClientConfig(localIP, isLocalVisit, inLongManagerAddr,
  8. Integer.valueOf(inLongManagerPort),
  9. dataProxyGroup, netTag);
  10. proxyConfig.setGroupId(dataProxyGroup);
  11. proxyConfig.setConfStoreBasePath(configBasePath);
  12. proxyConfig.setReadProxyIPFromLocal(isReadProxyIPFromLocal);
  13. proxyConfig.setDiscardOldMessage(true);
  14. sender = new HttpProxySender(proxyConfig);
  15. } catch (ProxysdkException e) {
  16. e.printStackTrace();
  17. } catch (Exception e) {
  18. e.printStackTrace();
  19. }
  20. return sender;
  21. }

参数说明如下:

参数名称类型说明
inLongManagerAddrStringinlong 管理台地址
inLongManagerPortStringinlong 管理台端口
netTagString网络标签,暂未使用,可以传空字符串
dataProxyGroupStringdataProxy 组名称,用户在启用本地配置的时候,用于本地配置的名称
isLocalVisitboolean是否使用本地配置, true 使用 https 访问管理台,false 使用 http 请求管理台
isReadProxyIPFromLocalboolean是否从本地配置文件中获取 Dataproxy 服务器地址信息,本地自测,不能访问管理台的情况下可以配置为 true
configBasePathString本地配置文件的路径 默认 ./inlong,isReadProxyIPFromLocal 为 true 时从这个目录查找配置文件/

当 isReadProxyIPFromLocal 为 true 的时候, 会从本地配置文件中获取 Dataproxy 的配置信息。

本地文件的路径为

  1. ${configBasePath}

文件名称为:

  1. ${dataProxyGroup}.local

例如:

  1. configBasePath = /data/inlong
  2. dataProxyGroup = inlong_test

则本地文件的全路径名称为:

  1. /data/inlong/inlong_test.local

文件配置内容为( json 格式),其中 host 为 DataProxy 服务器地址,port 为对应的端口,这需要至少配置两个(可以配置为相同的两项):

  1. {"isInterVisit":1,"cluster_id":"1","size":1,"switch":1,"address":[{"host":"127.0.0.1","port":"46802"},{"host":"127.0.0.1","port":"46802"}]}

发送消息

  1. public void sendHttpMessage(HttpProxySender sender, String inlongGroupId,
  2. String inlongStreamId, String messageBody) throws Exception {
  3. List<String> bodyList = new ArrayList<>();
  4. bodyList.add(messageBody);
  5. sender.asyncSendMessage(bodyList, inlongGroupId, inlongStreamId, System.currentTimeMillis(),
  6. 20, TimeUnit.SECONDS, new MyMessageCallBack());
  7. }

参数说明如下:

参数名称类型说明
senderHttpProxySender第一步创建的 sender
inlongGroupIdStringinglongGroupId
inlongStreamIdStringinlongStreamId
messageBodyString发送的消息内容

UDP 示例

Inlong-dataproxy-sdk 不支持发送 UDP 协议的消息,如果用户需要,需要自己按照 SDK 中的消息拼装方式, 组织二进制数组,按照 UDP方式发送,具体示例参照 inlong-sdk/dataporxy-sdk 中的相关的 example 代码。