Consumer Example

1 Consumer Example

TubeMQ provides two ways to consumer message, PullConsumer and PushConsumer:

1.1 PullConsumer

  1. ```java
  2. public class PullConsumerExample {
  3. public static void main(String[] args) throws Throwable {
  4. final String masterHostAndPort = "localhost:8000";
  5. final String topic = "test";
  6. final String group = "test-group";
  7. final ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group);
  8. consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
  9. final MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
  10. final PullMessageConsumer messagePullConsumer = messageSessionFactory.createPullConsumer(consumerConfig);
  11. messagePullConsumer.subscribe(topic, null);
  12. messagePullConsumer.completeSubscribe();
  13. // wait for client to join the exact consumer queue that consumer group allocated
  14. while (!messagePullConsumer.isPartitionsReady(1000)) {
  15. ThreadUtils.sleep(1000);
  16. }
  17. while (true) {
  18. ConsumerResult result = messagePullConsumer.getMessage();
  19. if (result.isSuccess()) {
  20. List<Message> messageList = result.getMessageList();
  21. for (Message message : messageList) {
  22. System.out.println("received message : " + message);
  23. }
  24. messagePullConsumer.confirmConsume(result.getConfirmContext(), true);
  25. }
  26. }
  27. }
  28. }
  29. ```

1.2 PushConsumer

  1. ```java
  2. public class PushConsumerExample {
  3. public static void test(String[] args) throws Throwable {
  4. final String masterHostAndPort = "localhost:8000";
  5. final String topic = "test";
  6. final String group = "test-group";
  7. final ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group);
  8. consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
  9. final MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
  10. final PushMessageConsumer pushConsumer = messageSessionFactory.createPushConsumer(consumerConfig);
  11. pushConsumer.subscribe(topic, null, new MessageListener() {
  12. @Override
  13. public void receiveMessages(PeerInfo peerInfo, List<Message> messages) throws InterruptedException {
  14. for (Message message : messages) {
  15. System.out.println("received message : " + new String(message.getData()));
  16. }
  17. }
  18. @Override
  19. public Executor getExecutor() {
  20. return null;
  21. }
  22. @Override
  23. public void stop() {
  24. //
  25. }
  26. });
  27. pushConsumer.completeSubscribe();
  28. CountDownLatch latch = new CountDownLatch(1);
  29. latch.await(10, TimeUnit.MINUTES);
  30. }
  31. }
  32. ```