Spring Boot集成消息队列

Apache RocketMQ是由开源的轻量级消息队列,于2017年正式成为Apache顶级项目。

在分布式消息队列中间件领域,最热门的项目是Kafka和RocketMQ:

  • Kafka是较早开源的”消息处理平台”,在写吞吐量上,有明显优势,更适合处理日志类消息。

  • RocketMQ借鉴了部分Kafka的设计思路,并对实时性、大分区数等方面进行了优化,较适合做为业务类的消息。

因此,本书选用RocketMQ做为业务类的消息队列。

安装并运行RocketMQ

RocketMQ的容器化比较落后,基本没有可用的镜像版本,我们采用手工单机部署的方式。

首先,下载最新版二进制文件,当前是4.9.1:

  1. wget https://dlcdn.apache.org/rocketmq/4.9.1/rocketmq-all-4.9.1-bin-release.zip

完成后,解压缩:

  1. unizp rocketmq-all-4.9.1-bin-release.zip

启动Name Server:

  1. nohup sh bin/mqnamesrv &
  2. tail -f ~/logs/rocketmqlogs/namesrv.log

最后启动Broker:

  1. nohup sh bin/mqbroker -n 127.0.0.1:9876 &
  2. tail -f ~/logs/rocketmqlogs/broker.log

如果启动成功,在上述两个日志中,会有如下的日志:

  1. 2021-10-12 4:30:02 INFO main - tls.client.keyPassword = null
  2. 2021-10-12 4:30:02 INFO main - tls.client.certPath = null
  3. 2021-10-12 4:30:02 INFO main - tls.client.authServer = false
  4. 2021-10-12 4:30:02 INFO main - tls.client.trustCertPath = null
  5. 2021-10-12 4:30:02 INFO main - Using JDK SSL provider
  6. 2021-10-12 4:30:03 INFO main - SSLContext created for server
  7. 2021-10-12 4:30:03 INFO main - Try to start service thread:FileWatchService started:false lastThread:null
  8. 2021-10-12 4:30:03 INFO NettyEventExecutor - NettyEventExecutor service started
  9. 2021-10-12 4:30:03 INFO FileWatchService - FileWatchService service started
  10. 2021-10-12 4:30:03 INFO main - The Name Server boot success. serializeType=JSON
  11. 2021-10-12 14:36:09 INFO brokerOutApi_thread_3 - register broker[0]to name server 127.0.0.1:9876 OK
  12. 2021-10-12 14:36:09 ERROR DiskCheckScheduledThread1 - Error when measuring disk space usage, file doesn't exist on this path: /Users/coder4/store/commitlog
  13. 2021-10-12 14:36:18 ERROR StoreScheduledThread1 - Error when measuring disk space usage, file doesn't exist on this path: /Users/coder4/store/commitlog
  14. 2021-10-12 14:36:19 ERROR DiskCheckScheduledThread1 - Error when measuring disk space usage, file doesn't exist on this path: /Users/coder4/store/commitlog

可以发现,NameServer是没有问题的,Broker报了一个”Error when measuring disk space usage”的错,这个是当前版本的Bug,不影响使用。

如果想退出服务,可以直接kill,或者执行:

  1. sh bin/mqshutdown broker
  2. sh bin/mqshutdown namesrv

RocketMQ架构简介

在集成RocketMQ之前,先介绍一下RocketMQ的基本架构:

  • NameServer:轻量级元信息服务,管理路由信息并提供对应的读写服务

  • Broker:支撑TOPIC和QUEUE的存储,支持Push和Pull两种协议,有容错、副本、故障恢复机制。

  • Producer:发布端服务,支持分布式部署,并向Broker集群发送

  • Consumer:消费端服务,同时支持Push和Pull协议。支持消费、广播、顺序消息等特性。

  • Topic:队列,用于区分不同消息。

  • Tag:同一个Topic下,可以设定不同Tag(例如前缀),通过Tag来过滤消息,只保留自己感兴趣的。

在使用Producer和Consumer时,需要指定消费组(Consumer Group),这是从Kafka中借鉴过来的机制。相同Consumer Group下的实例会共享同一个GroupId,会被认为是对等的、可负载均衡的。事件会随机分发给相同GroupId下的多个实例中。

在Spring Boot中集成RocketMQ

首先引入依赖:

  1. implementation 'org.apache.rocketmq:rocketmq-client:4.9.1'

接着,我们创建生产者的抽象基类:

  1. package com.coder4.homs.demo.server.mq;
  2. import com.fasterxml.jackson.databind.ObjectMapper;
  3. import org.apache.rocketmq.client.exception.MQClientException;
  4. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  5. import org.apache.rocketmq.client.producer.SendCallback;
  6. import org.apache.rocketmq.client.producer.SendResult;
  7. import org.apache.rocketmq.common.message.Message;
  8. import org.slf4j.Logger;
  9. import org.slf4j.LoggerFactory;
  10. import org.springframework.beans.factory.DisposableBean;
  11. import javax.annotation.PostConstruct;
  12. import java.nio.charset.StandardCharsets;
  13. /**
  14. * @author coder4
  15. */
  16. public abstract class BaseProducer<T> implements DisposableBean {
  17. private final Logger LOG = LoggerFactory.getLogger(getClass());
  18. abstract String getNamesrvAddr();
  19. abstract String getProducerGroup();
  20. abstract String getTopic();
  21. abstract String getTag();
  22. protected DefaultMQProducer producer;
  23. private ObjectMapper objectMapper = new ObjectMapper();
  24. public BaseProducer() {
  25. producer = new
  26. DefaultMQProducer(getProducerGroup());
  27. }
  28. @PostConstruct
  29. public void postConstruct() {
  30. producer.setNamesrvAddr(getNamesrvAddr());
  31. try {
  32. producer.start();
  33. } catch (MQClientException e) {
  34. LOG.error("producer start exception", e);
  35. throw new RuntimeException(e);
  36. }
  37. }
  38. @Override
  39. public void destroy() throws Exception {
  40. producer.shutdown();
  41. }
  42. protected Message buildMessage(String payload) {
  43. return new Message(getTopic(),
  44. getTag(),
  45. payload.getBytes(StandardCharsets.UTF_8)
  46. );
  47. }
  48. public void publish(T payload) {
  49. try {
  50. String val = objectMapper.writeValueAsString(payload);
  51. producer.send(buildMessage(val));
  52. LOG.info("publish success, topic = {}, tag = {}, msg = {}", getTopic(), getTag(), val);
  53. } catch (Exception e) {
  54. LOG.error("publish exception", e);
  55. }
  56. }
  57. public void publishAsync(T payload) {
  58. try {
  59. String val = objectMapper.writeValueAsString(payload);
  60. producer.send(buildMessage(val), new SendCallback() {
  61. @Override
  62. public void onSuccess(SendResult sendResult) {
  63. LOG.info("publishAsync success, topic = {}, tag = {}, msg = {}", getTopic(), getTag(), val);
  64. }
  65. @Override
  66. public void onException(Throwable e) {
  67. LOG.error("publish async exception", e);
  68. }
  69. });
  70. } catch (Exception e) {
  71. LOG.error("publishAsync exception", e);
  72. }
  73. }
  74. }

如上所示:

  • nameServr、topic、tag由子类组成

  • 我们在构造函数中,创建了Producer对象

  • postConstruct中:设定了NameServer地址,并启动producer

  • publish / publishAsync:发送消息,先根据topic和tag构造消息,然后调用同步 / 异步的接口发送。

  • destroy时,停止producer

接下来我们看下Consumer的基类:

  1. /**
  2. * @(#)BaseConsumer.java, 10月 12, 2021.
  3. * <p>
  4. * Copyright 2021 coder4.com. All rights reserved.
  5. * CODER4.COM PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
  6. */
  7. package com.coder4.homs.demo.server.mq;
  8. import com.fasterxml.jackson.core.type.TypeReference;
  9. import com.fasterxml.jackson.databind.ObjectMapper;
  10. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  11. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  12. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  13. import org.apache.rocketmq.client.exception.MQClientException;
  14. import org.apache.rocketmq.common.message.MessageExt;
  15. import org.slf4j.Logger;
  16. import org.slf4j.LoggerFactory;
  17. import org.springframework.beans.factory.DisposableBean;
  18. import org.springframework.util.CollectionUtils;
  19. import javax.annotation.PostConstruct;
  20. /**
  21. * @author coder4
  22. */
  23. public abstract class BaseConsumer<T> implements DisposableBean {
  24. protected final Logger LOG = LoggerFactory.getLogger(getClass());
  25. private static final int DEFAULT_BATCH_SIZE = 1;
  26. private static final int MAX_RETRY = 1024;
  27. abstract String getNamesrvAddr();
  28. abstract String getConsumerGroup();
  29. abstract String getTopic();
  30. abstract String getTag();
  31. abstract Class<T> getClassT();
  32. abstract boolean process(T msg);
  33. private ObjectMapper objectMapper = new ObjectMapper();
  34. protected DefaultMQPushConsumer consumer;
  35. public BaseConsumer() {
  36. consumer = new
  37. DefaultMQPushConsumer(getConsumerGroup());
  38. }
  39. @PostConstruct
  40. public void postConstruct() {
  41. consumer.setNamesrvAddr(getNamesrvAddr());
  42. try {
  43. consumer.subscribe(getTopic(), getTag());
  44. } catch (MQClientException e) {
  45. LOG.error("consumer subscribe exception", e);
  46. throw new RuntimeException(e);
  47. }
  48. consumer.setConsumeMessageBatchMaxSize(DEFAULT_BATCH_SIZE);
  49. consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
  50. if (CollectionUtils.isEmpty(msgs)) {
  51. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  52. }
  53. if (msgs.size() != DEFAULT_BATCH_SIZE) {
  54. LOG.error("MessageListenerConcurrently callback msgs.size() != 1");
  55. }
  56. MessageExt msg = msgs.get(0);
  57. if (msg.getReconsumeTimes() >= MAX_RETRY) {
  58. LOG.error("reconsume exceed max retry times");
  59. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  60. }
  61. try {
  62. if (process(objectMapper.readValue(new String(msg.getBody()), getClassT()))) {
  63. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  64. } else {
  65. return ConsumeConcurrentlyStatus.RECONSUME_LATER;
  66. }
  67. } catch (Exception e) {
  68. LOG.error("process exception", e);
  69. return ConsumeConcurrentlyStatus.RECONSUME_LATER;
  70. }
  71. });
  72. try {
  73. consumer.start();
  74. } catch (MQClientException e) {
  75. LOG.error("consumer start exception", e);
  76. throw new RuntimeException(e);
  77. }
  78. }
  79. @Override
  80. public void destroy() throws Exception {
  81. consumer.shutdown();
  82. }
  83. }

与Producer类似,topic、tag、namesrv由子类指定。

  • postConstruct:订阅了对应topic和tag的消息,并设定回掉函数,这里设定每批次最多拉取1个消息,以最简化处理失败的情况,你可以根据实际情况做出调整。

  • 接受消息时,会调用子类的process进行处理,同时进行json的反序列化操作

接下来,我们来写一个Demo的生产者、消费者:

首先配置nameSrv:

  1. # rocketmq
  2. rocketmq.namesrv: 127.0.0.1:9876

接着,定义消息:

  1. @Data
  2. @Builder
  3. @NoArgsConstructor
  4. @AllArgsConstructor
  5. public class DemoMessage {
  6. private String msg;
  7. private long ts;
  8. }

然后是具体的Consumer和Producer:

  1. package com.coder4.homs.demo.server.mq;
  2. import org.springframework.beans.factory.annotation.Value;
  3. import org.springframework.stereotype.Service;
  4. /**
  5. * @author coder4
  6. */
  7. @Service
  8. public class DemoConsumer extends BaseConsumer<DemoMessage> {
  9. @Value("${rocketmq.namesrv}")
  10. private String namesrv;
  11. @Override
  12. String getNamesrvAddr() {
  13. return namesrv;
  14. }
  15. @Override
  16. String getConsumerGroup() {
  17. return "demo-consumer";
  18. }
  19. @Override
  20. String getTopic() {
  21. return "demo";
  22. }
  23. @Override
  24. String getTag() {
  25. return "*";
  26. }
  27. @Override
  28. Class<DemoMessage> getClassT() {
  29. return DemoMessage.class;
  30. }
  31. @Override
  32. boolean process(DemoMessage msg) {
  33. LOG.info("process msg = {}", msg);
  34. return true;
  35. }
  36. }
  1. package com.coder4.homs.demo.server.mq;
  2. import org.springframework.beans.factory.annotation.Value;
  3. import org.springframework.stereotype.Service;
  4. /**
  5. * @author coder4
  6. */
  7. @Service
  8. public class DemoProducer extends BaseProducer<DemoMessage> {
  9. @Value("${rocketmq.namesrv}")
  10. private String namesrv;
  11. @Override
  12. String getNamesrvAddr() {
  13. return namesrv;
  14. }
  15. @Override
  16. String getProducerGroup() {
  17. return "demo-producer";
  18. }
  19. @Override
  20. String getTopic() {
  21. return "demo";
  22. }
  23. @Override
  24. String getTag() {
  25. return "*";
  26. }
  27. }

我们可以调用Producer发送一个消息,然后会收到如下的日志,说明消息已经被成功处理!

  1. 2021-10-12 8:01:37.340 INFO 6270 --- [MessageThread_1] c.c.homs.demo.server.mq.DemoConsumer : process msg = DemoMessage(msg=123, ts=1634032897315)

由于篇幅所限,我们只实战了基础的消息收发,推荐你根据文档继续探索其他内容,包括:集群部署)、顺序消息)、广播消息)等内容。