订阅关系一致

前言

订阅关系:一个消费者组订阅一个 Topic 的某一个 Tag,这种记录被称为订阅关系。

订阅关系一致:同一个消费者组下所有消费者实例所订阅的Topic、Tag必须完全一致。如果订阅关系(消费者组名-Topic-Tag)不一致,会导致消费消息紊乱,甚至消息丢失。

1 正确订阅关系示例

1.1 订阅一个Topic且订阅一个Tag

如下图所示,同一Group ID下的三个Consumer实例C1、C2和C3分别都订阅了TopicA,且订阅TopicA的Tag也都是Tag1,符合订阅关系一致原则。

1658453577894-0e64b114-cb4a-4220-a09a-62bc1f2943c6

正确示例代码一

C1、C2、C3的订阅关系一致,即C1、C2、C3订阅消息的代码必须完全一致,代码示例如下:

  1. Properties properties = new Properties();
  2. properties.put(PropertyKeyConst.GROUP_ID, "GID_test_1");
  3. Consumer consumer = ONSFactory.createConsumer(properties);
  4. consumer.subscribe("TopicA", "Tag1", new MessageListener() {
  5. public Action consume(Message message, ConsumeContext context) {
  6. System.out.println(message.getMsgID());
  7. return Action.CommitMessage;
  8. }
  9. });

1.2 订阅一个Topic且订阅多个Tag

如下图所示,同一Group ID下的三个Consumer实例C1、C2和C3分别都订阅了TopicB,订阅TopicB的Tag也都是Tag2和Tag3,表示订阅TopicB中所有Tag为Tag2或Tag3的消息,且顺序一致都是Tag2||Tag3,符合订阅关系一致性原则。

1658453865541-118b0cd0-d597-4a76-9561-ae765540567c

正确示例代码二

C1、C2、C3的订阅关系一致,即C1、C2、C3订阅消息的代码必须完全一致,代码示例如下:

  1. Properties properties = new Properties();
  2. properties.put(PropertyKeyConst.GROUP_ID, "GID_test_2");
  3. Consumer consumer = ONSFactory.createConsumer(properties);
  4. consumer.subscribe("TopicB", "Tag2||Tag3", new MessageListener() {
  5. public Action consume(Message message, ConsumeContext context) {
  6. System.out.println(message.getMsgID());
  7. return Action.CommitMessage;
  8. }
  9. });

1.3 订阅多个Topic且订阅多个Tag

如下图所示,同一Group ID下的三个Consumer实例C1、C2和C3分别都订阅了TopicA和TopicB,且订阅的TopicA都未指定Tag,即订阅TopicA中的所有消息,订阅的TopicB的Tag都是Tag2和Tag3,表示订阅TopicB中所有Tag为Tag2或Tag3的消息,且顺序一致都是Tag2||Tag3,符合订阅关系一致原则。

1658454292557-c07fa0ac-81be-4aac-9c5b-342821c554a6

正确示例代码三

C1、C2、C3的订阅关系一致,即C1、C2、C3订阅消息的代码必须完全一致,代码示例如下:

  1. Properties properties = new Properties();
  2. properties.put(PropertyKeyConst.GROUP_ID, "GID_test_3");
  3. Consumer consumer = ONSFactory.createConsumer(properties);
  4. consumer.subscribe("TopicA", "*", new MessageListener() {
  5. public Action consume(Message message, ConsumeContext context) {
  6. System.out.println(message.getMsgID());
  7. return Action.CommitMessage;
  8. }
  9. });
  10. consumer.subscribe("TopicB", "Tag2||Tag3", new MessageListener() {
  11. public Action consume(Message message, ConsumeContext context) {
  12. System.out.println(message.getMsgID());
  13. return Action.CommitMessage;
  14. }
  15. });

2 订阅关系不一致的排查

问题描述

在使用消息队列RocketMQ版实例时,可能会出现订阅关系不一致的情况,具体的问题现象如下:

  • 消息队列RocketMQ版控制台中订阅关系是否一致显示为否。
  • 消费者(Consumer)实例未收到订阅的消息。

请参考以下步骤进行检查

您可在消息消息队列RocketMQ版控制台Group 详情页面查看指定Group的订阅关系是否一致。若查询结果不一致,请参见本文(3 常见订阅关系不一致问题)排查Consumer实例的消费代码。

  1. 检查您Consumer实例中与订阅相关的配置代码,确保配置同一个Group ID的所有Consumer实例均订阅相同的Topic及Tag。
  2. 重启客户端应用。
  3. 登录消息队列RocketMQ版控制台,在左侧导航栏中单击实例列表,选择您的目标实例,进入实例详情页面。
  4. 在左侧导航栏中单击Group管理,选择对应的协议后,单击消费者状态,确认订阅关系是否一致显示为是。
  5. 测试并确认消息能够被预期的Consumer实例所消费。

3 常见订阅关系不一致问题

3.1 同一Group ID下的Consumer实例订阅的Topic不同

如下图所示,同一Group ID下的三个Consumer实例C1、C2和C3分别订阅了TopicA、TopicB和TopicC,订阅的Topic不一致,不符合订阅关系一致性原则。

image-20220722102131073

错误示例代码一

  • Consumer实例1-1:

    1. Properties properties = new Properties();
    2. properties.put(PropertyKeyConst.GROUP_ID, "GID_test_1");
    3. Consumer consumer = ONSFactory.createConsumer(properties);
    4. consumer.subscribe("TopicA", "*", new MessageListener() {
    5. public Action consume(Message message, ConsumeContext context) {
    6. System.out.println(message.getMsgID());
    7. return Action.CommitMessage;
    8. }
    9. });
  • Consumer实例1-2:

    1. Properties properties = new Properties();
    2. properties.put(PropertyKeyConst.GROUP_ID, "GID_test_1");
    3. Consumer consumer = ONSFactory.createConsumer(properties);
    4. consumer.subscribe("TopicB", "*", new MessageListener() {
    5. public Action consume(Message message, ConsumeContext context) {
    6. System.out.println(message.getMsgID());
    7. return Action.CommitMessage;
    8. }
    9. });
  • Consumer实例1-3:

    1. Properties properties = new Properties();
    2. properties.put(PropertyKeyConst.GROUP_ID, "GID_test_1");
    3. Consumer consumer = ONSFactory.createConsumer(properties);
    4. consumer.subscribe("TopicC", "*", new MessageListener() {
    5. public Action consume(Message message, ConsumeContext context) {
    6. System.out.println(message.getMsgID());
    7. return Action.CommitMessage;
    8. }
    9. });

3.2 同一Group ID下的Consumer实例订阅的Topic相同,但订阅的Tag不一致

如下图所示,同一Group ID下的三个Consumer实例C1、C2和C3分别都订阅了TopicA,但是C1订阅TopicA的Tag为Tag1,C2和C3订阅的TopicA的Tag为Tag2,订阅同一Topic的Tag不一致,不符合订阅关系一致性原则。

image-20220722102926055

错误示例代码二

  • Consumer实例2-1:

    1. Properties properties = new Properties();
    2. properties.put(PropertyKeyConst.GROUP_ID, "GID_test_2");
    3. Consumer consumer = ONSFactory.createConsumer(properties);
    4. consumer.subscribe("TopicA", "Tag1", new MessageListener() {
    5. public Action consume(Message message, ConsumeContext context) {
    6. System.out.println(message.getMsgID());
    7. return Action.CommitMessage;
    8. }
    9. });
  • Consumer实例2-2:

    1. Properties properties = new Properties();
    2. properties.put(PropertyKeyConst.GROUP_ID, "GID_test_2");
    3. Consumer consumer = ONSFactory.createConsumer(properties);
    4. consumer.subscribe("TopicA", "Tag2", new MessageListener() {
    5. public Action consume(Message message, ConsumeContext context) {
    6. System.out.println(message.getMsgID());
    7. return Action.CommitMessage;
    8. }
    9. });
  • Consumer实例2-3:

    1. Properties properties = new Properties();
    2. properties.put(PropertyKeyConst.GROUP_ID, "GID_test_2");
    3. Consumer consumer = ONSFactory.createConsumer(properties);
    4. consumer.subscribe("TopicA", "Tag2", new MessageListener() {
    5. public Action consume(Message message, ConsumeContext context) {
    6. System.out.println(message.getMsgID());
    7. return Action.CommitMessage;
    8. }
    9. });