Push Consumer

The simple code of RocketMQ Push Consumer is as follows:

  1. public class Consumer {
  2. public static void main(String[] args) throws InterruptedException, MQClientException {
  3. // Initialize Consumer and set Consumer Goup Name
  4. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
  5. // Set the address of NameServer
  6. consumer.setNamesrvAddr("localhost:9876");
  7. // Subscribe One or more of topics,and specify the tag filtering conditions, here specify * means receive all tag messages
  8. consumer.subscribe("TopicTest", "*");
  9. // Register a callback interface to handle messages received from the Broker
  10. consumer.registerMessageListener(new MessageListenerConcurrently() {
  11. @Override
  12. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  13. System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
  14. // Return to the message consumption status, ConsumeConcurrentlyStatus.CONSUME_SUCCESS for successful consumption
  15. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  16. }
  17. });
  18. // Start Consumer
  19. consumer.start();
  20. System.out.printf("Consumer Started.%n");
  21. }
  22. }

First, initialize the consumer. When initializing the consumer, the consumer must with the ConsumerGroupName, the ConsumerGroupName of the same consumer group is the same, which is an important attribute to determine whether the consumer belongs to the same consumer group. Then, set the NameServer address, which is not introduced here as like Producer. Then, call the subscribe method to subscribe to Topic. The subscribe method needs to specify the Topic name needed to subscribe to, it can also add the message filtering conditions, such as TagA, etc. The above code to specify * means to receive all tag messages. In addition to subscribing, it also needs to register the callback interface to write the consumption logic to handle the messages received from the Broker. If call the registerMessageListener method, it needs to pass in the MessageListener implementation. The above code is concurrent consumption, so it is MessageListenerConcurrently implementation, its interface is as follows:

Push Consumer - 图1MessageListenerConcurrently Interface

  1. /**
  2. * A MessageListenerConcurrently object is used to receive asynchronously delivered messages concurrently
  3. */
  4. public interface MessageListenerConcurrently extends MessageListener {
  5. /**
  6. * It is not recommend to throw exception,rather than returning ConsumeConcurrentlyStatus.RECONSUME_LATER if
  7. * consumption failure
  8. *
  9. * @param msgs msgs.size() >= 1<br> DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here
  10. * @return The consume status
  11. */
  12. ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,
  13. final ConsumeConcurrentlyContext context);
  14. }

where msgs is the list of messages to be consumed obtained from the Broker, and the user implements the interface and writes the consumption logic for the messages in the consumeMessage method, and then returns the consumption status, ConsumeConcurrentlyStatus.CONSUME_SUCCESS indicates successful consumption, or CONSUME_LATER means that the consumption has failed and will be re-consumed after a period of time.

The RocketMQ provides a very simple consumer API, users don’t need to focus on rebalancing or pulling logic, they just need to write their own consumption logic.

Cluster and Broadcast Mode

We can set it to use cluster mode by the following code. RocketMQ Push Consumer uses cluster mode by default, where consumers in the same consumer group consume together.

  1. consumer.setMessageModel(MessageModel.CLUSTERING);

Set up the use of broadcast mode with the following code. In broadcast mode, each consumer within the consumer group consumes the full messages.

  1. consumer.setMessageModel(MessageModel.BROADCASTING);

Concurrent Consumption and Order Consumption

Setting up Push Consumer concurrent consumption has been described above and is accomplished by passing in the implementation of the MessageListenerConcurrently interface when registering the consumption callback interface. In concurrent consumption, there may be multiple threads consuming messages from a queue at the same time, so even if the sender ensures that messages are in the same queue in FIFO order by sending order messages, there is no guarantee that the messages are actually consumed orderly.

RocketMQ therefore provides a order consumption approach. The only difference between order consumption setup and concurrent consumption at the API level is that the implementation of the MessageListenerOrderly interface is passed in when registering the consumption callback interface.

  1. consumer.registerMessageListener(new MessageListenerOrderly() {
  2. AtomicLong consumeTimes = new AtomicLong(0);
  3. @Override
  4. public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
  5. System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
  6. this.consumeTimes.incrementAndGet();
  7. if ((this.consumeTimes.get() % 2) == 0) {
  8. return ConsumeOrderlyStatus.SUCCESS;
  9. } else if ((this.consumeTimes.get() % 5) == 0) {
  10. context.setSuspendCurrentQueueTimeMillis(3000);
  11. return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
  12. }
  13. return ConsumeOrderlyStatus.SUCCESS;
  14. }
  15. });

There are also two return results for order consumption, ConsumeOrderlyStatus.SUCCESS for successful consumption and ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT for failed consumption.

Message Filtering

Message filtering means that message producers set message attributes to classify messages when sending messages to a Topic, and consumers set filtering conditions according to the message attributes when subscribing to a Topic, so that only messages that meet the filtering conditions are delivered to the consumer side for consumption.

If the consumer subscribes to a Topic without setting filter conditions, all messages in the Topic will be delivered to the consumer for consumption, regardless of whether the filter attributes are set when the message is sent.

There are two types of message filtering supported by RocketMQ, Tag filtering and SQL92 filtering.

Message FilteringInstructScenario
Tag filteringIf the Tag subscribed by the consumer and the message Tag set by the sender match each other, the message is cast to the consumer for consumption.Simple filtering Scenario: a message supports setting one Tag, which can be used when only one level of classification and filtering of messages in Topic is required.
SQL92 filteringThe sender sets the Tag or message attribute, and the consumer subscribes to the message that satisfies the SQL92 filter expression is cast to the consumer for consumption.Complex filtering Scenarios: a message supports setting multiple attributes and can be customized to combine multiple types of expressions according to SQL syntax to classify messages at multiple levels and achieve multi-dimensional filtering.

Tag Filtering

Tag has been introduced in the Producers chapter and is used to classify messages under a certain Topic. When sending a message, the producer specifies the Tag of the message, and the consumer has to subscribe according to the Tag already specified.

Take the following e-commerce transaction scenario as an example, the process from the customer’s order to the receipt of goods will produce a series of messages, as follows:

  • Order News
  • Payment News
  • Logistics News

These messages are sent to a Topic with the name Trade_Topic and are subscribed to by various systems, as exemplified by the following:

  • Payment system: subscribe to payment messages only.
  • Logistics system: subscribe to logistics messages only.
  • Real-time calculation system: subscribe to all transaction-related messages.
  • Transaction success rate analysis system: subscribe to order and payment messages.

The filtering schematic is shown below

Tag过滤

For logistics systems and payment systems, they both subscribe to a single Tag, at which point it is sufficient to mark the Tag when calling the subcribe interface.

  1. consumer.subscribe("TagFilterTest", "TagA");

For a real-time computing system, it subscribes to all messages under the transaction Topic, and the Tag is simply indicated by an asterisk (*).

  1. consumer.subscribe("TagFilterTest", "*");

For the transaction success rate analysis system, it subscribes to messages for both Order and Payment Tags, and it is fine to separate multiple Tags with two vertical lines (||).

  1. consumer.subscribe("TagFilterTest", "TagA||TagB");

It should be noted here that if the same consumer subscribes to a Tag under a Topic multiple times, the last subscription will prevail.

  1. //In the following error code, the Consumer can only subscribe to the message of TagB under TagFilterTest, but not the message of TagA.
  2. consumer.subscribe("TagFilterTest", "TagA");
  3. consumer.subscribe("TagFilterTest", "TagB");

SQL92 Filtering

SQL92 filtering is to set the Tag or custom attribute of the message when the message is sent, and the consumer subscribes to set the filter expression using SQL syntax to filter the message based on the custom attribute or Tag.

Tag belongs to a special kind of message property, and the property value of Tag is TAGS in the SQL syntax. Enable property filtering first set the configuration enablePropertyFilter=true on the Broker side, the value is false by default.

Take the following e-commerce transaction scenario as an example, the process from the customer’s order to the receipt of goods will produce a series of messages, according to the type of messages into order messages and logistics messages, which define the geographical attributes of logistics messages, according to the region into Hangzhou and Shanghai:

  • Order News
  • Logistics News
    • Logistics information and the region is Hangzhou
    • Logistics information and the region is Shanghai

These messages are sent to the Topic with the name Trade_Topic and are subscribed by various systems, as an example, the following system:

  • Logistics system 1: only need to subscribe to the logistics message and the message area is Hangzhou.
  • Logistics system 2: only need to subscribe to the logistics news and the news area is Hangzhou or Shanghai.
  • Order tracking system: only need to subscribe to order information.

The SQL92 filtering schematic is shown below:

SQL92过滤

The locale will be set as a custom property in the message.

  • Message sender. Set the custom properties of the message.
  1. Message msg = new Message("topic", "tagA", "Hello MQ".getBytes());
  2. // Set custom property A with property value 1.
  3. msg.putUserProperties("a", "1");
  • Message consumer. Set filter expressions using SQL syntax and filter messages based on custom properties.
  1. consumer.subscribe("SqlFilterTest",
  2. MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
  3. "and (a is not null and a between 0 and 3)"));

Message Retry and Dead-Letter Queue

Message Retry

If the Consumer fails to consume a message, RocketMQ will re-pitch the message to the Consumer after the retry interval, and if the message is not successfully consumed after the maximum number of retries, the message will be pitched to the dead message queue.

Message retry is only effective for cluster mode; broadcast mode does not provide the message retry feature. In the broadcast mode, after a failed consumption, the failed message will not be retry and continue to consume new messages.

  • Maximum number of retries: the maximum number of times a message can be repeatedly delivered after a failed consumption.
  1. consumer.setMaxReconsumeTimes(10);

Retry interval: the interval after the message consumption fails to be cast to the Consumer again for consumption, which only works in sequential consumption.

  1. consumer.setSuspendCurrentQueueTimeMillis(5000);

The retry mechanism of order consumption and concurrent consumption is not the same. After the order consumption fails to consume, it will first retry locally on the client side until the maximum number of retries, so as to avoid the failed messages being skipped and consuming the next message and disrupting the order of order consumption, while the concurrent consumption will re-cast the failed messages back to the server after the failed consumption, and then wait for the server to re-cast them back, during which it will normally consume the messages behind the queue.

When concurrent consumption fails, it is not cast back to the original Topic, but to a special Topic named %RETRY%ConsumerGroupName, and each ConsumerGroup in cluster mode will correspond to a special Topic and will subscribe to that Topic. The difference between the two parameters is as follows

Consumption typeRetry intervalMaximum number of retries
Order consumptionThe retry interval time is configured with the custom parameter SuspendCurrentQueueTimeMillisThe maximum number of retries can be configured with the custom parameter MaxReconsumeTimes. There is no maximum limit to the value of this parameter. If the parameter is not set, the default maximum number of retries is Integer.MAX .
Concurrent consumptionThe retry interval time changes in steps according to the number of retries, the value range: 1 second ~ 2 hours. Custom configuration is not supportedThe maximum number of retries can be configured by the custom parameter MaxReconsumeTimes. The default value is 16 times. There is no maximum limit for this parameter, and it is recommended to use the default value.

The retry interval for concurrent consumption is as follows, which can be seen to be exactly the same as the time when the third level of delayed messages starts.

Retry number of timesThe time between the last retryRetry number of timesThe time between the last retry
110s97min
230s108min
31min119min
42min1210min
53min1320min
64min1430min
75min151h
86min162h

Dead-Letter Queue

When a message fails to be consumed for the first time, RocketMQ will automatically retry the message. After reaching the maximum number of retries, if the consumption still fails, it means that the consumer cannot consume the message correctly under normal circumstances. At this point, the message is not immediately discarded, but sent to a special queue corresponding to that consumer, which is called a Dead-Letter Message, and the special queue storing the dead message is called a Dead-Letter Queue, which is a separate queue with a unique number of partitions under the Dead-Letter Topic. If a Dead-Letter Message is generated, the corresponding ConsumerGroup’s Dead-Letter Topic name is %DLQ%ConsumerGroupName, and the messages in the Dead-Letter Queue will not be consumed again. You can use RocketMQ Admin tool or RocketMQ Dashboard to find out the information of the corresponding dead message.