Parse InLongMsg

Overview

If you consume data directly from a message queue (InLong TubeMQ or Pulsar), you need to parse InLongMsg first. Origin data can be parsed in the following ways.

Dependency

  • Add Maven Dependency

    1. <dependency>
    2. <groupId>org.apache.inlong</groupId>
    3. <artifactId>inlong-common</artifactId>
    4. <version>1.7.0</version>
    5. </dependency>
  • Add Parse Method

  1. public static List<byte[]> parserInLongMsg(byte[] bytes) {
  2. List<byte[]> originalContentByteList = new ArrayList<>();
  3. InLongMsg inLongMsg = InLongMsg.parseFrom(bytes);
  4. Set<String> attrs = inLongMsg.getAttrs();
  5. if (CollectionUtils.isEmpty(attrs)) {
  6. return originalContentByteList;
  7. }
  8. for (String attr : attrs) {
  9. if (attr == null) {
  10. continue;
  11. }
  12. Iterator<byte[]> iterator = inLongMsg.getIterator(attr);
  13. if (iterator == null) {
  14. continue;
  15. }
  16. while (iterator.hasNext()) {
  17. byte[] bodyBytes = iterator.next();
  18. if (bodyBytes == null || bodyBytes.length == 0) {
  19. continue;
  20. }
  21. // Origin data sended by InLong agent
  22. originalContentByteList.add(bodyBytes);
  23. }
  24. }
  25. return originalContentByteList;
  26. }