解析 InLongMsg

总览

如果直接从消息队列(InLong TubeMQ 或Pulsar)消费数据,需要先对InLongMsg 进行解析。可通过以下方式可以解析出源数据。

解析

  • 增加maven 依赖

    1. <dependency>
    2. <groupId>org.apache.inlong</groupId>
    3. <artifactId>inlong-common</artifactId>
    4. <version>1.2.0-incubating</version>
    5. </dependency>
  • 增加解析逻辑

  1. public static List<byte[]> parserInLongMsg(byte[] bytes) {
  2. List<byte[]> originalContentByteList = new ArrayList<>();
  3. InLongMsg inLongMsg = InLongMsg.parseFrom(bytes);
  4. Set<String> attrs = inLongMsg.getAttrs();
  5. if (CollectionUtils.isEmpty(attrs)) {
  6. return originalContentByteList;
  7. }
  8. for (String attr : attrs) {
  9. if (attr == null) {
  10. continue;
  11. }
  12. Iterator<byte[]> iterator = inLongMsg.getIterator(attr);
  13. if (iterator == null) {
  14. continue;
  15. }
  16. while (iterator.hasNext()) {
  17. byte[] bodyBytes = iterator.next();
  18. if (bodyBytes == null || bodyBytes.length == 0) {
  19. continue;
  20. }
  21. // agent 发送的原始用户数据
  22. originalContentByteList.add(bodyBytes);
  23. }
  24. }
  25. return originalContentByteList;
  26. }