Message Trace

1. Key attributes of message trace data

ProducerConsumerBroker
Production instance informationConsumer instance information Message Topic
Time of message sendingDelivery time and delivery round Message store location
Success of message sendingSuccess of message consumptionMessage key value
Time taken to sendTime taken to consume Message tag value

2. Cluster Deployment for Supporting Message Trace

2.1 Broker configuration file

Here is the content of the properties configuration file for enabling the message trace feature on the Broker :

  1. brokerClusterName=DefaultCluster
  2. brokerName=broker-a
  3. brokerId=0
  4. deleteWhen=04
  5. fileReservedTime=48
  6. brokerRole=ASYNC_MASTER
  7. flushDiskType=ASYNC_FLUSH
  8. storePathRootDir=/data/rocketmq/rootdir-a-m
  9. storePathCommitLog=/data/rocketmq/commitlog-a-m
  10. autoCreateSubscriptionGroup=true
  11. ## if msg tracing is open,the flag will be true
  12. traceTopicEnable=true
  13. listenPort=10911
  14. brokerIP1=XX.XX.XX.XX1
  15. namesrvAddr=XX.XX.XX.XX:9876

2.2 Normal mode

In RocketMQ cluster, each Broker node is used to store message trace data collected and sent by the Client. Therefore, there is no requirement or limit on the number of Broker nodes in the RocketMQ cluster.

2.3 Physical IO isolation mode

For scenarios with a large amount of message trace data, you can choose one of the Broker nodes in the RocketMQ cluster to be dedicated to storing message traces, so that the physical IO of the user’s ordinary message data is completely isolated from the message trace data and does not affect each other. In this mode, there are at least two Broker nodes in the RocketMQ cluster, and one of them is defined as the server that stores the message trace data.

2.4 Start the Broker with message trace enabled

nohup sh mqbroker -c ../conf/2m-noslave/broker-a.properties &

3. Save the topic definition for message trace

RocketMQ’s message trace feature supports two ways of storing trace data:

3.1 System-level TraceTopic

By default, the message trace data is stored in the system-level TraceTopic (whose name is: RMQ_SYS_TRACE_TOPIC). This Topic is automatically created when the Broker node is started (as mentioned above, the traceTopicEnable switch variable must be set to true in the Broker side configuration file).

3.2 User-defined TraceTopic

If the user does not want to store the message trace data in the default system-level TraceTopic, they can also define and create a user-level Topic to save the trace (that is, create a regular Topic to store the message trace data). The next section will introduce how the Client interface supports user-defined TraceTopic.

4. The Client with Message Trace Practice

In order to minimize the transformation work of the user’s business system using the RocketMQ message trace feature, the author designed to add a switch parameter (enableMsgTrace) to the original interface to enable or disable message trace, and added a custom parameter (customizedTraceTopic) to allow the user to store the message trace data in their own user-level Topic created.

4.1 Enable message trace when sending messages

  1. DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true);
  2. producer.setNamesrvAddr("XX.XX.XX.XX1");
  3. producer.start();
  4. try {
  5. {
  6. Message msg = new Message("TopicTest",
  7. "TagA",
  8. "OrderID188",
  9. "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
  10. SendResult sendResult = producer.send(msg);
  11. System.out.printf("%s%n", sendResult);
  12. }
  13. } catch (Exception e) {
  14. e.printStackTrace();
  15. }

4.2 Enable message trace when subscribing to messages

  1. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1",true);
  2. consumer.subscribe("TopicTest", "*");
  3. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  4. consumer.setConsumeTimestamp("20181109221800");
  5. consumer.registerMessageListener(new MessageListenerConcurrently() {
  6. @Override
  7. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  8. System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
  9. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  10. }
  11. });
  12. consumer.start();
  13. System.out.printf("Consumer Started.%n");

4.3 Support custom storage message trace Topic

To support custom storage message trace Topic, modify the initialization of the DefaultMQProducer and DefaultMQPushConsumer instances as follows when sending and subscribing to messages.

  1. ##Topic_test11111 needs to be created by the user in advance to store message traces:
  2. DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true,"Topic_test11111");
  3. ......
  4. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1",true,"Topic_test11111");
  5. ......

4.4 Use the mqadmin command to send and view traces

  • Send message

    1. ./mqadmin sendMessage -m true --topic some-topic-name -n 127.0.0.1:9876 -p "your meesgae content"
  • Query trace

    1. ./mqadmin QueryMsgTraceById -n 127.0.0.1:9876 -i "some-message-id"
  • Query trace result

    1. RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
    2. RocketMQLog:WARN Please initialize the logger system properly.
    3. #Type #ProducerGroup #ClientHost #SendTime #CostTimes #Status
    4. Pub 1623305799667 xxx.xxx.xxx.xxx 2021-06-10 14:16:40 131ms success