Pull消费

在RocketMQ中有两种Pull方式,一种是比较原始Pull Consumer,它不提供相关的订阅方法,需要调用pull方法时指定队列进行拉取,并需要自己更新位点。另一种是Lite Pull Consumer,它提供了Subscribe和Assign两种方式,使用起来更加方便。

Pull Consumer

Pull Consumer示例如下

  1. public class PullConsumerTest {
  2. public static void main(String[] args) throws MQClientException {
  3. DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
  4. consumer.setNamesrvAddr("127.0.0.1:9876");
  5. consumer.start();
  6. try {
  7. MessageQueue mq = new MessageQueue();
  8. mq.setQueueId(0);
  9. mq.setTopic("TopicTest");
  10. mq.setBrokerName("jinrongtong-MacBook-Pro.local");
  11. long offset = 26;
  12. PullResult pullResult = consumer.pull(mq, "*", offset, 32);
  13. if (pullResult.getPullStatus().equals(PullStatus.FOUND)) {
  14. System.out.printf("%s%n", pullResult.getMsgFoundList());
  15. consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
  16. }
  17. } catch (Exception e) {
  18. e.printStackTrace();
  19. }
  20. consumer.shutdown();
  21. }
  22. }

首先需要初始化DefaultMQPullConsumer并启动,然后构造需要拉取的队列MessageQueue,除了构造外也可以如下所示调用fetchSubscribeMessageQueues方法获取某个Topic的所有队列,然后挑选队列进行拉取。

  1. Set<MessageQueue> queueSet = consumer.fetchSubscribeMessageQueues("TopicTest");

找到或者构造完队列之后,调用pull方法就可以进行拉取,需要传入拉取的队列,过滤表达式,拉取的位点,最大拉取消息条数等参数。拉取完成后会返回拉取结果PullResult,PullResult中的PullStatus表示结果状态,如下所示

  1. public enum PullStatus {
  2. /**
  3. * Founded
  4. */
  5. FOUND,
  6. /**
  7. * No new message can be pull
  8. */
  9. NO_NEW_MSG,
  10. /**
  11. * Filtering results can not match
  12. */
  13. NO_MATCHED_MSG,
  14. /**
  15. * Illegal offset,may be too big or too small
  16. */
  17. OFFSET_ILLEGAL
  18. }

FOUND表示拉取到消息,NO_NEW_MSG表示没有发现新消息,NO_MATCHED_MSG表示没有匹配的消息,OFFSET_ILLEGAL表示传入的拉取位点是非法的,有可能偏大或偏小。如果拉取状态是FOUND,我们可以通过pullResultgetMsgFoundList方法获取拉取到的消息列表。最后,如果消费完成,通过updateConsumeOffset方法更新消费位点。

Lite Pull Consumer

Lite Pull Consumer是RocketMQ 4.6.0推出的Pull Consumer,相比于原始的Pull Consumer更加简单易用,它提供了Subscribe和Assign两种模式,Subscribe模式示例如下

  1. public class LitePullConsumerSubscribe {
  2. public static volatile boolean running = true;
  3. public static void main(String[] args) throws Exception {
  4. DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
  5. litePullConsumer.subscribe("TopicTest", "*");
  6. litePullConsumer.setPullBatchSize(20);
  7. litePullConsumer.start();
  8. try {
  9. while (running) {
  10. List<MessageExt> messageExts = litePullConsumer.poll();
  11. System.out.printf("%s%n", messageExts);
  12. }
  13. } finally {
  14. litePullConsumer.shutdown();
  15. }
  16. }
  17. }

首先还是初始化DefaultLitePullConsumer并设置ConsumerGroupName,调用subscribe方法订阅topic并启动。与Push Consumer不同的是,LitePullConsumer拉取消息调用的是轮询poll接口,如果能拉取到消息则返回对应的消息列表,否则返回null。通过setPullBatchSize可以设置每一次拉取的最大消息数量,此外如果不额外设置,LitePullConsumer默认是自动提交位点。在subscribe模式下,同一个消费组下的多个LitePullConsumer会负载均衡消费,与PushConsumer一致。

如下是Assign模式的示例

  1. public class LitePullConsumerAssign {
  2. public static volatile boolean running = true;
  3. public static void main(String[] args) throws Exception {
  4. DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("please_rename_unique_group_name");
  5. litePullConsumer.setAutoCommit(false);
  6. litePullConsumer.start();
  7. Collection<MessageQueue> mqSet = litePullConsumer.fetchMessageQueues("TopicTest");
  8. List<MessageQueue> list = new ArrayList<>(mqSet);
  9. List<MessageQueue> assignList = new ArrayList<>();
  10. for (int i = 0; i < list.size() / 2; i++) {
  11. assignList.add(list.get(i));
  12. }
  13. litePullConsumer.assign(assignList);
  14. litePullConsumer.seek(assignList.get(0), 10);
  15. try {
  16. while (running) {
  17. List<MessageExt> messageExts = litePullConsumer.poll();
  18. System.out.printf("%s %n", messageExts);
  19. litePullConsumer.commitSync();
  20. }
  21. } finally {
  22. litePullConsumer.shutdown();
  23. }
  24. }
  25. }

Assign模式一开始仍然是初始化DefaultLitePullConsumer,这里我们采用手动提交位点的方式,因此设置AutoCommit为false,然后启动consumer。与Subscribe模式不同的是,Assign模式下没有自动的负载均衡机制,需要用户自行指定需要拉取的队列,因此在例子中,先用fetchMessageQueues获取了Topic下的队列,再取前面的一半队列进行拉取,示例中还调用了seek方法,将第一个队列拉取的位点设置从10开始。紧接着进入循环不停地调用poll方法拉取消息,拉取到消息后调用commitSync方法手动提交位点。