RocketMQ

说明

Zebra 集成了 RocketMQ 用于消息发送和接收。

样例

请参考 zebra-sample/rocketmq/rocketmq-sample

依赖引入

  1. <dependency>
  2. <groupId>com.guosen</groupId>
  3. <artifactId>zebra-rocketmq</artifactId>
  4. <version>${zebra.version}</version>
  5. </dependency>

生产者

配置

配置样例

  1. zebra.rocketmq.namesrvAddr=nameserverIp:nameserverPort
  2. zebra.rocketmq.producerGroupName=com-guosen-zebra-sample-rocketmq-producer
  3. zebra.rocketmq.producerInstanceName=zebraRocketMQProducer

配置说明

配置项 类型 说明
zebra.rocketmq.namesrvAddr String namesrvAddr 地址
zebra.rocketmq.producerGroupName String 生产者 group 名称,填微服务全称 RocketMQ group 名称只支持 %\ a-zA-Z0-9_-,请将 . 手工替换为 -
zebra.rocketmq.producerInstanceName String 生产者实例名称
zebra.rocketmq.transactionProducerGroupName String 在使用事务生产者时配置 事务生产者 group 名称,填微服务全称 RocketMQ group 名称只支持 %\ a-zA-Z0-9_-,请将 . 手工替换为 -
zebra.rocketmq.producerTranInstanceName String 在使用事务生产者时配置 事务生产者实例名称

代码

使用Spring,将 DefaultMQProducer 依赖注入,然后使用 send 方法发送 Message 对象。

  1. private static final String TOPIC_NAME = "com-guosen-zebra-sample-rocketmq-producer";
  2. private static final String TAG_NAME = "tagA";
  3. @Autowired
  4. private DefaultMQProducer producer;
  5. public String produce(String key, String value) {
  6. byte[] body = value.getBytes(StandardCharsets.UTF_8);
  7. Message message = new Message(TOPIC_NAME, TAG_NAME, key, body);
  8. String returnInfo = null;
  9. try {
  10. SendResult sendResult = producer.send(message);
  11. String messageId = sendResult.getMsgId();
  12. returnInfo = "Message id is : " + messageId;
  13. } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
  14. LOGGER.error("Failed to send message", e);
  15. returnInfo = e.getMessage();
  16. }
  17. return returnInfo;
  18. }

消费者

配置

配置样例

  1. zebra.rocketmq.namesrvAddr=nameserverIp:nameserverPort
  2. zebra.rocketmq.consumerGroupName=com-guosen-zebra-sample-rocketmq-consumer
  3. zebra.rocketmq.consumerInstanceName=zebraRocketMQConsumer
  4. zebra.rocketmq.consumerBatchMaxSize=1
  5. zebra.rocketmq.consumerBroadcasting=false
  6. zebra.rocketmq.subscribe[0]=com-guosen-zebra-sample-rocketmq-producer:tagA
  7. zebra.rocketmq.enableHisConsumer=false
  8. zebra.rocketmq.enableOrderConsumer=false

配置说明

配置项 类型 说明
zebra.rocketmq.namesrvAddr String namesrvAddr 地址
zebra.rocketmq.consumerGroupName String 消费者 group 名称,填微服务全称 RocketMQ group 名称只支持 %\ a-zA-Z0-9_-,请将 . 手工替换为 -
zebra.rocketmq.consumerInstanceName String 消费者实例名称
zebra.rocketmq.consumerBatchMaxSize Integer 一次最大消费多少数量消息
zebra.rocketmq.consumerBroadcasting Boolean 广播消费 true : 广播 false : 集群
zebra.rocketmq.subscribe[0] String 消费的topic和tag配置,格式为topic:tag topic 必须为所消费的 producerGroupName,否则无法展示 MQ 依赖关系
zebra.rocketmq.enableHisConsumer Boolean 启动的时候是否消费历史记录 true :启动 false :不启动
zebra.rocketmq.enableOrderConsumer Boolean 启动顺序消费 true :启动 false :不启动

代码

Zebra 把 RocketMQ 获取到的消息封装为 Spring 的 ApplicationEvent,开发者实现 ApplicationListener 接口即可消费消息。

  1. @Component
  2. public class RocketMQListener implements ApplicationListener<RocketmqEvent> {
  3. private static final Logger LOGGER = LoggerFactory.getLogger(RocketMQListener.class);
  4. @Override
  5. public void onApplicationEvent(RocketmqEvent event) {
  6. List<MessageExt> msgs = event.getMsgs();
  7. for (MessageExt messageExt: msgs) {
  8. handle(messageExt);
  9. }
  10. }
  11. private void handle(MessageExt messageExt) {
  12. String key = messageExt.getKeys();
  13. byte[] body = messageExt.getBody();
  14. String value = new String(body, StandardCharsets.UTF_8);
  15. LOGGER.info("Received message, key : {}, value : {}", key, value);
  16. }
  17. }