RocketMQ Streams 快速开始

RocketMQ Streams工程中运行

参考RocketMQ Streams工程rocketmq-streams-examples模块下程序可以直接运行;运行example步骤:

  • 本地启动RocketMQ 5.0及以上版本;
  • 使用mqAdmin创建example中数据源topic;
  • 启动example中例子;
  • 向RocketMQ的源topic中写入合适数据(依据示例而定);

RocketMQ Streams以SDK方式被应用依赖

环境准备

  • 64bit JDK 1.8及以上
  • Maven 3.2及以上
  • 本地启动RocketMQ,启动文档

构建RocketMQ Streams

添加pom依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.rocketmq</groupId>
  4. <artifactId>rocketmq-streams</artifactId>
  5. <!-- 根据需要修改 -->
  6. <version>1.1.0</version>
  7. </dependency>
  8. </dependencies>

编写流计算程序

  1. public class WordCount {
  2. public static void main(String[] args) {
  3. StreamBuilder builder = new StreamBuilder("wordCount");
  4. builder.source("sourceTopic", total -> {
  5. String value = new String(total, StandardCharsets.UTF_8);
  6. return new Pair<>(null, value);
  7. })
  8. .flatMap((ValueMapperAction<String, List<String>>) value -> {
  9. String[] splits = value.toLowerCase().split("\\W+");
  10. return Arrays.asList(splits);
  11. })
  12. .keyBy(value -> value)
  13. .count()
  14. .toRStream()
  15. .print();
  16. TopologyBuilder topologyBuilder = builder.build();
  17. Properties properties = new Properties();
  18. properties.put(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876");
  19. RocketMQStream rocketMQStream = new RocketMQStream(topologyBuilder, properties);
  20. final CountDownLatch latch = new CountDownLatch(1);
  21. Runtime.getRuntime().addShutdownHook(new Thread("wordcount-shutdown-hook") {
  22. @Override
  23. public void run() {
  24. rocketMQStream.stop();
  25. latch.countDown();
  26. }
  27. });
  28. try {
  29. rocketMQStream.start();
  30. latch.await();
  31. } catch (final Throwable e) {
  32. System.exit(1);
  33. }
  34. System.exit(0);
  35. }
  36. }

向RocketMQ sourceTopic中写入数据并观察结果

如果向sourceTopic中写入的数据如下:每行数据作为一个消息发送;

  1. "To be, or not to be,--that is the question:--",
  2. "Whether 'tis nobler in the mind to suffer",
  3. "The slings and arrows of outrageous fortune",
  4. "Or to take arms against a sea of troubles,",
  5. "And by opposing end them?--To die,--to sleep,--",
  6. "No more; and by a sleep to say we end",
  7. "The heartache, and the thousand natural shocks",
  8. "That flesh is heir to,--'tis a consummation",

统计单词出现频率,计算结果如下:

  1. (key=to, value=1)
  2. (key=be, value=1)
  3. (key=or, value=1)
  4. (key=not, value=1)
  5. (key=to, value=2)
  6. (key=be, value=2)
  7. (key=that, value=1)
  8. (key=is, value=1)
  9. (key=the, value=1)
  10. (key=whether, value=1)
  11. (key=tis, value=1)
  12. (key=nobler, value=1)
  13. (key=mind, value=1)
  14. (key=against, value=1)
  15. (key=troubles, value=1)
  16. (key=slings, value=1)
  17. (key=die, value=1)
  18. (key=natural, value=1)
  19. (key=flesh, value=1)
  20. (key=sea, value=1)
  21. (key=fortune, value=1)
  22. (key=shocks, value=1)
  23. (key=consummation, value=1)
  24. (key=to, value=3)
  25. (key=to, value=4)
  26. (key=to, value=5)
  27. (key=say, value=1)
  28. (key=end, value=1)
  29. (key=end, value=2)
  30. (key=to, value=6)
  31. (key=to, value=7)
  32. (key=to, value=8)
  33. (key=or, value=2)
  34. (key=them, value=1)
  35. (key=take, value=1)
  36. (key=arms, value=1)
  37. (key=of, value=1)
  38. (key=and, value=1)
  39. (key=of, value=2)
  40. (key=and, value=2)
  41. (key=by, value=1)
  42. (key=sleep, value=1)
  43. (key=and, value=3)
  44. (key=by, value=2)
  45. (key=sleep, value=2)
  46. (key=and, value=4)
  47. (key=that, value=2)
  48. (key=arrows, value=1)
  49. (key=heir, value=1)
  50. (key=question, value=1)
  51. (key=is, value=2)
  52. (key=the, value=2)
  53. (key=suffer, value=1)
  54. (key=a, value=1)
  55. (key=the, value=3)
  56. (key=no, value=1)
  57. (key=a, value=2)
  58. (key=opposing, value=1)
  59. (key=the, value=4)
  60. (key=the, value=5)
  61. (key=a, value=3)
  62. (key=in, value=1)
  63. (key=more, value=1)
  64. (key=heartache, value=1)
  65. (key=outrageous, value=1)
  66. (key=we, value=1)
  67. (key=thousand, value=1)
  68. (key=tis, value=2)