消息重传

消息重传 (Message Retransmission) 是属于 MQTT 协议标准规范的一部分。

协议中规定了作为通信的双方 服务端客户端 对于自己发送到对端的 PUBLISH 消息都应满足其 服务质量 (Quality of Service levels) 的要求。如:

  • QoS 1:表示 消息至少送达一次 (At least once delivery);即发送端会一直重发该消息,除非收到了对端对该消息的确认。意思是在 MQTT 协议的上层(即业务的应用层)相同的 QoS 1 消息可能会收到多次。

  • QoS 2:表示 消息只送达一次 (Exactly once delivery);即该消息在上层仅会接收到一次。

虽然,QoS 1 和 QoS 2 的 PUBLISH 报文在 MQTT 协议栈这一层都会发生重传,但请你谨记的是:

  • QoS 1 消息发生重传后,在 MQTT 协议栈上层,也会收到这些重发的 PUBLISH 消息。
  • QoS 2 消息无论如何重传,最终在 MQTT 协议栈上层,都只会收到一条 PUBLISH 消息

基础配置

有两种场景会导致消息重发:

  1. PUBLISH 报文发送给对端后,规定时间内未收到应答。则重发这个报文。
  2. 在保持会话的情况下,客户端重连后;EMQ X 会自动重发 未应答的消息,以确保 QoS 流程的正确。

etc/emqx.conf 中可配置:

配置项类型可取值默认值说明
retry_intervalduration-30s等待一个超时间隔,如果没收到应答则重传消息

一般来说,你只需要关心以上内容就足够了。

如需了解更多 EMQ X 在处理 MQTT 协议的重传的细节见以下内容。

协议规范与设计

重传的对象

首先,在了解 EMQ X 对于重传机制的设计前,我们需要先确保你已经了解协议中 QoS 1 和 QoS 2 的传输过程,否则请参见 MQTTv3.1.1 - QoS 1: At least once deliveryMQTTv3.1.1 - QoS 2: Exactly once delivery

此处,仅作一个简单的回顾,用来说明不同 QoS 下重传的对象有哪些。

QoS 1

QoS 1 要求消息至少送达一次;所以消息在 MQTT 协议层中,可能会不断的重传,直到发送端收到了该消息的确认报文。

其流程示意图如下:

  1. PUBLISH
  2. #1 Sender ---------------> Receiver (*)
  3. PUBACK
  4. #2 Sender <--------------- Receiver
  • 涉及到 2 个报文;共 2 次发送动作,发送端和接收端各 1 次;这 2 个报文都持有相同的 PacketId。
  • 行尾标记为 * 号的,表示发送方在等待确认报文超时后,可能会主动发起重传。

可见 QoS 1 消息只需要对 PUBLISH 报文进行重发

QoS 2

QoS 2 要求消息只送达一次;所以在实现它时,需要更复杂的流程。其流程示意图如下:

  1. PUBLISH
  2. #1 Sender ---------------> Receiver (*)
  3. PUBREC
  4. #2 Sender <--------------- Receiver
  5. PUBREL
  6. #3 Sender ---------------> Receiver (*)
  7. PUBCOM
  8. #4 Sender <--------------- Receiver
  • 涉及到 4 个报文;共 4 次发送动作,发送端和接收端各 2 次;这 4 个报文都持有相同的 PacketId。
  • 行尾标记为 * 号的,表示发送方在等待确认报文超时后,可能会主动发起重传。

可见 QoS 2 消息需要对 PUBLISH 和 PUBREL 报文进行重发

综上:

  • 重传动作 都是由于 发送端 报文发送后,在 规定时间 内未收到其期待的返回而触发的。

  • 重传对象 仅包含以下三种:

    • QoS 1 的 PUBLISH 报文
    • QoS 2 的 PUBLISH 报文
    • QoS 2 的 PUBREL 报文

当 EMQ X 作为 PUBLISH 消息的接收端时,它不需要重发操作

飞行窗口与最大接收值

其概念的定义和解释参见 飞行窗口与消息队列

引入这两个概念的作用是为了理解:

  1. EMQ X 作为发送端时,再次重发的消息,必然是已存储在飞行窗口中的消息
  2. EMQ X 作为接收端时,发送端重发的消息时:
    • 如 QoS 1,EMQ X 则直接回复 PUBACK 进行应答;
    • 如 QoS 2,EMQ X 则会释放,存储在 最大接收消息 队列中的 PUBLISH 或者 PUBREL 报文。

消息顺序

当然,以上的概念仅需要了解即可,你最需要关心的是,消息在被重复发送后,消息顺序出现的变化,尤其是 QoS 1 类的消息。例如:

假设,当前飞行窗口设置为 2 时,EMQ X 计划向客户端的某主题投递 4 条 QoS 1 的消息。并假设客户端程序、或网络在中间出现过问题,那么整个发送流程会变成:

  1. #1 [4,3,2,1 || ] -----> []
  2. #2 [4,3 || 2, 1] -----> [1, 2]
  3. #3 [4 || 3, 2] -----> [1, 2, 3]
  4. #4 [4 || 3, 2] -----> [1, 2, 3, 2, 3]
  5. #5 [ || 4] -----> [1, 2, 3, 2, 3, 4]
  6. #6 [ || ] -----> [1, 2, 3, 2, 3, 4]

流程共 6 个步骤;左边表示 EMQ X 的 消息队列 和 飞行窗口,以 || 分割;右侧表示客户端收到的消息顺序,其中每步表示:

  1. Broker 将 4 条消息放入消息队列中。
  2. Broker 依次发送 1 2,并将其放入 飞行窗口 中;客户端仅应答消息 1;且此时由于客户端发送流出现了问题,无法发送后续应答报文。
  3. Broker 收到消息 1 的应答;从飞行窗口移除消息 1;并将 3 发送出去;继续等待 2 3 的应答;
  4. Broker 等待应答超时,重发了报文 2 3;客户端收到重发的报文 2 3 并正常应答。
  5. Broker 从飞行窗口移除了消息 2 3,并发送报文 4;客户端收到了报文 4 并回复应答。
  6. 至此,所有报文处理完成。客户端收到的报文顺序为 [1, 2, 3, 2, 3, 4],并也依次上报给 MQTT 协议栈的上层。

虽然,存在重复的报文消息。但这是完全符合协议的规范的,每个报文第一次出现的位置都是有序的,并且重复收到的报文 2 3 的报文中,会携带一个标识位,表明其为重发报文。

MQTT 协议和 EMQ X 将这个主题认为是 有序的主题 (Ordered Topic) 参见: MQTTv3.1.1 - Message ordering

它确保 相同的主题和 QoS 下,消息是按顺序投递和应答的

此外,如果用户期望所有主题下的 QoS 1 与 QoS 2 消息都严格有序,那么需要设置飞行窗口的最大长度为 1,但代价是会降低该客户端的吞吐。

相关配置

此节列举了上述机制中,用到的所有配置。它们都包含在 etc/emqx.conf 中:

配置项类型可取值默认值说明
mqueue_store_qos0booltrue, falsetrue是否将 QoS 0 消息存入消息队列中
max_mqueue_leninteger>= 01000消息队列长度
max_inflightinteger>= 00飞行窗口大小;默认 0 即无限制
max_awaiting_relinteger>= 00最大接收;默认 0 即无限制
await_rel_timeoutdurtaion> 0300s最大接收 中消息等待释放的最大超时时间;超过则直接丢弃