RocketMQ
说明
Zebra 集成了 RocketMQ 用于消息发送和接收。
样例
请参考 zebra-sample/rocketmq/rocketmq-sample
依赖引入
<dependency>
<groupId>com.guosen</groupId>
<artifactId>zebra-rocketmq</artifactId>
<version>${zebra.version}</version>
</dependency>
生产者
配置
配置样例
zebra.rocketmq.namesrvAddr=nameserverIp:nameserverPort
zebra.rocketmq.producerGroupName=com-guosen-zebra-sample-rocketmq-producer
zebra.rocketmq.producerInstanceName=zebraRocketMQProducer
配置说明
配置项 | 类型 | 说明 | |
---|---|---|---|
zebra.rocketmq.namesrvAddr | String | namesrvAddr 地址 | |
zebra.rocketmq.producerGroupName | String | 生产者 group 名称,填微服务全称 RocketMQ group 名称只支持 %\ | a-zA-Z0-9_-,请将 . 手工替换为 - |
zebra.rocketmq.producerInstanceName | String | 生产者实例名称 | |
zebra.rocketmq.transactionProducerGroupName | String | 在使用事务生产者时配置 事务生产者 group 名称,填微服务全称 RocketMQ group 名称只支持 %\ | a-zA-Z0-9_-,请将 . 手工替换为 - |
zebra.rocketmq.producerTranInstanceName | String | 在使用事务生产者时配置 事务生产者实例名称 |
代码
使用Spring,将 DefaultMQProducer 依赖注入,然后使用 send 方法发送 Message 对象。
private static final String TOPIC_NAME = "com-guosen-zebra-sample-rocketmq-producer";
private static final String TAG_NAME = "tagA";
@Autowired
private DefaultMQProducer producer;
public String produce(String key, String value) {
byte[] body = value.getBytes(StandardCharsets.UTF_8);
Message message = new Message(TOPIC_NAME, TAG_NAME, key, body);
String returnInfo = null;
try {
SendResult sendResult = producer.send(message);
String messageId = sendResult.getMsgId();
returnInfo = "Message id is : " + messageId;
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
LOGGER.error("Failed to send message", e);
returnInfo = e.getMessage();
}
return returnInfo;
}
消费者
配置
配置样例
zebra.rocketmq.namesrvAddr=nameserverIp:nameserverPort
zebra.rocketmq.consumerGroupName=com-guosen-zebra-sample-rocketmq-consumer
zebra.rocketmq.consumerInstanceName=zebraRocketMQConsumer
zebra.rocketmq.consumerBatchMaxSize=1
zebra.rocketmq.consumerBroadcasting=false
zebra.rocketmq.subscribe[0]=com-guosen-zebra-sample-rocketmq-producer:tagA
zebra.rocketmq.enableHisConsumer=false
zebra.rocketmq.enableOrderConsumer=false
配置说明
配置项 | 类型 | 说明 | |
---|---|---|---|
zebra.rocketmq.namesrvAddr | String | namesrvAddr 地址 | |
zebra.rocketmq.consumerGroupName | String | 消费者 group 名称,填微服务全称 RocketMQ group 名称只支持 %\ | a-zA-Z0-9_-,请将 . 手工替换为 - |
zebra.rocketmq.consumerInstanceName | String | 消费者实例名称 | |
zebra.rocketmq.consumerBatchMaxSize | Integer | 一次最大消费多少数量消息 | |
zebra.rocketmq.consumerBroadcasting | Boolean | 广播消费 true : 广播 false : 集群 | |
zebra.rocketmq.subscribe[0] | String | 消费的topic和tag配置,格式为topic:tag topic 必须为所消费的 producerGroupName,否则无法展示 MQ 依赖关系 | |
zebra.rocketmq.enableHisConsumer | Boolean | 启动的时候是否消费历史记录 true :启动 false :不启动 | |
zebra.rocketmq.enableOrderConsumer | Boolean | 启动顺序消费 true :启动 false :不启动 |
代码
Zebra 把 RocketMQ 获取到的消息封装为 Spring 的 ApplicationEvent,开发者实现 ApplicationListener 接口即可消费消息。
@Component
public class RocketMQListener implements ApplicationListener<RocketmqEvent> {
private static final Logger LOGGER = LoggerFactory.getLogger(RocketMQListener.class);
@Override
public void onApplicationEvent(RocketmqEvent event) {
List<MessageExt> msgs = event.getMsgs();
for (MessageExt messageExt: msgs) {
handle(messageExt);
}
}
private void handle(MessageExt messageExt) {
String key = messageExt.getKeys();
byte[] body = messageExt.getBody();
String value = new String(body, StandardCharsets.UTF_8);
LOGGER.info("Received message, key : {}, value : {}", key, value);
}
}