消费者示例
1 Consumer 示例
TubeMQ 提供了两种方式来消费消息: PullConsumer and PushConsumer。
1.1 PullConsumer
public class PullConsumerExample {
public static void main(String[] args) throws Throwable {
final String masterHostAndPort = "localhost:8000";
final String topic = "test";
final String group = "test-group";
final ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group);
consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
final MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
final PullMessageConsumer messagePullConsumer = messageSessionFactory.createPullConsumer(consumerConfig);
messagePullConsumer.subscribe(topic, null);
messagePullConsumer.completeSubscribe();
// wait for client to join the exact consumer queue that consumer group allocated
while (!messagePullConsumer.isPartitionsReady(1000)) {
ThreadUtils.sleep(1000);
}
while (true) {
ConsumerResult result = messagePullConsumer.getMessage();
if (result.isSuccess()) {
List<Message> messageList = result.getMessageList();
for (Message message : messageList) {
System.out.println("received message : " + message);
}
messagePullConsumer.confirmConsume(result.getConfirmContext(), true);
}
}
}
}
1.2 PushConsumer
public class PushConsumerExample {
public static void test(String[] args) throws Throwable {
final String masterHostAndPort = "localhost:8000";
final String topic = "test";
final String group = "test-group";
final ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group);
consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
final MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
final PushMessageConsumer pushConsumer = messageSessionFactory.createPushConsumer(consumerConfig);
pushConsumer.subscribe(topic, null, new MessageListener() {
@Override
public void receiveMessages(PeerInfo peerInfo, List<Message> messages) throws InterruptedException {
for (Message message : messages) {
System.out.println("received message : " + new String(message.getData()));
}
}
@Override
public Executor getExecutor() {
return null;
}
@Override
public void stop() {
//
}
});
pushConsumer.completeSubscribe();
CountDownLatch latch = new CountDownLatch(1);
latch.await(10, TimeUnit.MINUTES);
}
}