1 基本样例

在基本样例中我们提供如下的功能场景:

  • 使用RocketMQ发送三种类型的消息:同步消息、异步消息和单向消息。其中前两种消息是可靠的,因为会有发送是否成功的应答。
  • 使用RocketMQ来消费接收到的消息。

1.1 加入依赖:

maven:

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-client</artifactId>
  4. <version>4.3.0</version>
  5. </dependency>

gradle

  1. compile 'org.apache.rocketmq:rocketmq-client:4.3.0'

1.2 消息发送

1、Producer端发送同步消息

这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。

  1. public class SyncProducer {
  2. public static void main(String[] args) throws Exception {
  3. // 实例化消息生产者Producer
  4. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
  5. // 设置NameServer的地址
  6. producer.setNamesrvAddr("localhost:9876");
  7. // 启动Producer实例
  8. producer.start();
  9. for (int i = 0; i < 100; i++) {
  10. // 创建消息,并指定Topic,Tag和消息体
  11. Message msg = new Message("TopicTest" /* Topic */,
  12. "TagA" /* Tag */,
  13. ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
  14. );
  15. // 发送消息到一个Broker
  16. SendResult sendResult = producer.send(msg);
  17. // 通过sendResult返回消息是否成功送达
  18. System.out.printf("%s%n", sendResult);
  19. }
  20. // 如果不再发送消息,关闭Producer实例。
  21. producer.shutdown();
  22. }
  23. }

2、发送异步消息

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。

  1. public class AsyncProducer {
  2. public static void main(String[] args) throws Exception {
  3. // 实例化消息生产者Producer
  4. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
  5. // 设置NameServer的地址
  6. producer.setNamesrvAddr("localhost:9876");
  7. // 启动Producer实例
  8. producer.start();
  9. producer.setRetryTimesWhenSendAsyncFailed(0);
  10. int messageCount = 100;
  11. // 根据消息数量实例化倒计时计算器
  12. final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);
  13. for (int i = 0; i < messageCount; i++) {
  14. final int index = i;
  15. // 创建消息,并指定Topic,Tag和消息体
  16. Message msg = new Message("TopicTest",
  17. "TagA",
  18. "OrderID188",
  19. "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
  20. // SendCallback接收异步返回结果的回调
  21. producer.send(msg, new SendCallback() {
  22. @Override
  23. public void onSuccess(SendResult sendResult) {
  24. System.out.printf("%-10d OK %s %n", index,
  25. sendResult.getMsgId());
  26. }
  27. @Override
  28. public void onException(Throwable e) {
  29. System.out.printf("%-10d Exception %s %n", index, e);
  30. e.printStackTrace();
  31. }
  32. });
  33. }
  34. // 等待5s
  35. countDownLatch.await(5, TimeUnit.SECONDS);
  36. // 如果不再发送消息,关闭Producer实例。
  37. producer.shutdown();
  38. }
  39. }

3、单向发送消息

这种方式主要用在不特别关心发送结果的场景,例如日志发送。

  1. public class OnewayProducer {
  2. public static void main(String[] args) throws Exception{
  3. // 实例化消息生产者Producer
  4. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
  5. // 设置NameServer的地址
  6. producer.setNamesrvAddr("localhost:9876");
  7. // 启动Producer实例
  8. producer.start();
  9. for (int i = 0; i < 100; i++) {
  10. // 创建消息,并指定Topic,Tag和消息体
  11. Message msg = new Message("TopicTest" /* Topic */,
  12. "TagA" /* Tag */,
  13. ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
  14. );
  15. // 发送单向消息,没有任何返回结果
  16. producer.sendOneway(msg);
  17. }
  18. // 如果不再发送消息,关闭Producer实例。
  19. producer.shutdown();
  20. }
  21. }

1.3 消费消息

  1. public class Consumer {
  2. public static void main(String[] args) throws InterruptedException, MQClientException {
  3. // 实例化消费者
  4. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
  5. // 设置NameServer的地址
  6. consumer.setNamesrvAddr("localhost:9876");
  7. // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
  8. consumer.subscribe("TopicTest", "*");
  9. // 注册回调实现类来处理从broker拉取回来的消息
  10. consumer.registerMessageListener(new MessageListenerConcurrently() {
  11. @Override
  12. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  13. System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
  14. // 标记该消息已经被成功消费
  15. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  16. }
  17. });
  18. // 启动消费者实例
  19. consumer.start();
  20. System.out.printf("Consumer Started.%n");
  21. }
  22. }