共享订阅

共享订阅是在多个订阅者之间实现负载均衡的订阅方式,EMQX 在 MQTT v3.1.1 中已经实现共享订阅,MQTT v5.0 协议中这一特性成为标准的一部分。

共享订阅能够解决以下问题:

  1. 集群模式下,如果订阅者所在的节点发生故障,则发布者的消息会丢失(QoS 0)或者堆积在节点中(QoS 1, 2)。可以通过增加订阅节点的方式解决这一问题,但这样又产生了大量的重复消息浪费了性能,并增加了业务的复杂度。

  2. 当发布者的生产能力较强时,可能会出现订阅者的消费能力无法及时跟上的情况,此时只能由订阅者自行实现负载均衡来解决,又一次增加了用户的开发成本。

机制介绍

在原有主题的基础上,添加 $share 前缀即可为一组订阅端启用共享订阅。

shared_subscription

上图中,共享 3 个 subscriber 用共享订阅的方式订阅了同一个主题 $share/g/topic,其中topic 是它们订阅的真实主题名,而 $share/g/ 是共享订阅前缀。EMQX 支持两种格式的共享订阅前缀:

示例前缀真实主题名
$share/abc/t/1$share/abc/t/1

带群组的共享订阅

$share/<group-name> 为前缀的共享订阅是带群组的共享订阅:

group-name 可以为任意字符串,属于同一个群组内部的订阅者将以负载均衡接收消息,但 EMQX 会向不同群组广播消息。

例如,假设订阅者 s1,s2,s3 属于群组 g1,订阅者 s4,s5 属于群组 g2。那么当 EMQX 向这个主题发布消息 msg1 的时候:

  • EMQX 会向两个群组 g1 和 g2 同时发送 msg1

  • s1,s2,s3 中只有一个会收到 msg1

  • s4,s5 中只有一个会收到 msg1

shared_subscription_group

负载均衡策略与派发 ACK 配置

MQTT 协议规范中,没有明确的规范平衡策略。EMQX 在配置的帮助下支持一些不同的平衡策略。 关于调度策略的更多信息,请参考配置文件。

平衡策略可以在全局或每组中指定。

  • 全局策略可以在 broker.shared_subscription_strategy 配置中设置。
  • 配置 broker.shared_subscription_group.$group_name.strategy 为每组策略。
  1. # etc/emqx.conf
  2. # 均衡策略
  3. broker.shared_subscription_strategy = random
  4. # 当设备离线,或者消息等级为 QoS1、QoS2,因各种各样原因设备没有回复 ACK 确认,消息会被重新派发至群组内其他的设备。
  5. broker.shared_dispatch_ack_enabled = false
均衡策略描述
random在所有订阅者中随机选择
round_robin按照订阅顺序选择
round_robin_per_group在每个共享订阅组中按照订阅顺序进行选择
local随机在本地订阅中进行选择,如无法找到,则在集群范围内随机选择
sticky选定订阅者后,始终向其进行发送,直到该订阅者断开连接
hash_clientid通过对发送者的客户端 ID 进行 Hash 处理来选择订阅者
hash_topic通过对源主题进行 Hash 处理来选择订阅者

提示

无论是单客户端订阅还是共享订阅都要注意客户端性能与消息接收速率,否则会引发消息堆积、客户端崩溃等错误。

关于消息丢失的讨论

EMQX 向订阅者的会话发送消息。

当开启会话持久化功能(clean_session=false)时,订阅者可以在重新连接后立即恢复数据流而不丢失消息。

这与 “负载平衡” 有点矛盾,因为通常当共享订阅使用时,如果一个设备离线,组内的其他设备就会接手接管数据流。但是,如果设备离线足够长的时间,会话消息缓冲区最终会溢出,并导致消息丢失。

由于上述原因,持久化会话对于共享订阅业务来说并不常见,但是用户仍然可以使用这种方式。

配置 broker.shared_dispatch_ack_enabled 的引入是为了在持久化会话的情况下改善负载共享的逻辑。当设置为 true 时,EMQX 派发消息时会检查设备的在线状态,如果设备是离线的,将会把消息分配给组内的其他成员。

  • 一旦消息被派发到一个订阅者的会话,消息将留在会话中缓冲区,但不会立即重新派发。
  • 当会话终止时(Session 的生命周期结束),会话中的待发信息会被重新分配给组内的其他成员。会话终止时,会话中的待发信息会被重新分配给组内的其他成员。
  • 当所有成员都离线时,消息会按照配置的策略来派发。
  • 当共享订阅组中所有的会话都已经超期被销毁,消息就会被丢弃。

提示

更多有关 MQTT 共享订阅的内容请参考: