MQTT

MQTT 数据桥接是一种连接多个 EMQX 集群或其他 MQTT 服务的方式。本页介绍了 EMQX 中 MQTT 数据桥接的工作原理,并提供了在EMQX Dashboard 或使用配置文件创建 MQTT 数据桥接的快速入门教程。

桥接模式

EMQX 支持在两种主要模式下工作的 MQTT 数据桥接:入口和出口。本节将详细介绍每种模式的工作原理。同时,还介绍了在这两种模式中使用的连接池。

入口模式

在入口模式下,本地的 EMQX 从桥接的远程 MQTT 服务器订阅主题,并在当前集群内分发接收到的消息。下面是入口方向的消息服务流程:

MQTT 数据桥接 igress 示意图

MQTT 数据桥接可以单独使用,也可以与规则结合使用,以实现更强大、更灵活的数据处理能力。在 入口方向上,数据桥接可以作为规则的数据源。MQTT 数据桥接与规则配合工作的消息流程如下:

bridge_igress_rule_link

出口模式

在出口模式下,本地的 EMQX 根据规则设置,将当前集群中的消息转发给桥接的远程 MQTT 服务器。下面是出口方向上的消息服务流程:

MQTT 数据桥接 egress 示意图

出口方向下,可以将规则处理结果作为消息,转发到远程 MQTT 服务器的指定主题下:

bridge_egress_rule

连接池

EMQX 允许多个客户端同时连接到桥接的 MQTT 服务器。在创建桥接时您可以设置一个 MQTT 客户端连接池,并配置连接池大小以表明连接池中的客户端连接数。在 MQTT 数据桥接中启用连接池,可以充分利用服务器资源,以实现更大的消息吞吐和更好的并发性能。这对于处理高负载、高并发的场景非常重要。

由于 MQTT 协议要求连接到一个 MQTT 服务器的客户端必须具有唯一的客户端 ID,因此连接池中的每个客户端都被分配了一个唯一的客户端 ID。为了使客户端 ID 可预测,EMQX 根据以下模式自动生成客户端 ID:

  1. [${ClientIDPrefix}:]${BridgeName}:${Mode}:${NodeName}:${N}
片段描述
${ClientIDPrefix}配置的客户端 ID 前缀。如果未设置,则省略整个第一片段。
${BridgeName}桥接的名称,由用户提供。
${Mode}ingressegress
${NodeName}运行 MQTT 客户端的节点名称。
${N}1 到配置的 MQTT 客户端连接池的大小的数字。

在入口模式下使用连接池

尽管连接池适用于入口和出口模式,但在入口模式下使用连接池需要考虑一些注意事项。当您拥有多个节点的 EMQX 集群并配置了一个入口 MQTT 桥接以从远程 MQTT 服务器订阅非共享主题时,如果连接池中的所有客户端都订阅相同的主题,它们将从远程 MQTT 服务器接收到重复的消息,这将给服务器带来压力。在这种情况下,强烈建议使用共享订阅作为一种安全措施。例如,您可以将远程 MQTT 服务器的主题配置为 $share/name1/topic1 或者在使用主题过滤器时配置为 $share/name2/topic2/#。在非共享订阅情况下,MQTT 客户端连接池将缩减为一个客户端,这意味着只有一个客户端会启动。

快速开始

下面将以 EMQX 的在线 MQTT 服务器MQTT - 图5 (opens new window)作为桥接服务器,指导您如何配置连接与桥接。

前置准备

功能清单

通过 Dashboard 配置

  1. 转到 Dashboard 数据集成 -> 数据桥接页面。

  2. 点击页面右上角的创建

  3. 数据桥接类型中选择 MQTT,点击下一步

  4. 输入数据桥接名称,要求是大小写英文字母或数字组合,例如 my_mqtt_bridge

  5. 进行连接相关配置。MQTT 服务地址设为 broker.emqx.io:1883,由于该服务器不需要认证,因此用户名密码留空即可。该区域的其他字段可保留默认设置,也可根据实际场景设置。

  6. 通过入口配置出口配置设定桥接规则。

    提示

    入口配置与出口配置应至少配置一个,您可打开下方的开关进行相关配置。

    • 入口配置(可选):配置桥接规则,将远程 MQTT 服务上的消息转发到本地;我们希望订阅 remote/topic/ingress 下的消息,并将收到的信息转发至 local/topic/ingress 主题,因此将进行如下配置:

      • 远程 MQTT 服务:订阅主题以获取消息。

        • 主题:在集群工作模式下,须通过共享订阅来避免消息重复,因此填入 $share/g/remote/topic/ingress
        • QoS:选择 0
      • 本地 MQTT 服务:将订阅得到的消息发布到指定主题中,也可以留空,通过配置规则处理并使用消息重发布动作转发。

        • 主题:填入 local/topic/ingress
        • QoS:选择 0,或 ${qos} (跟随消息 QoS)。
        • Retain:通过勾选确认是否以保留消息方式发布消息。
        • 消息模版:转发的消息 Payload 模板,支持使用 ${field} 语法提取数据。
      • 连接池大小:指定本地 MQTT 服务的客户端连接池的大小。在这个例子中,您可以设置为8。只要远程 MQTT 服务的主题使用共享订阅,这样的设置不会影响性能。
    • 出口配置(可选):将本地指定 MQTT 主题下的消息发布到远程 MQTT 服务,可以理解为入口配置的反向数据流。我们希望将 local/topic/egress 主题下的消息转发到远程 MQTT 服务 remote/topic/egress 主题中,因此将进行如下配置:

      • 本地 MQTT 服务:指定待转发的消息主题。

        • 主题:填入 local/topic/egress
      • 远程 MQTT 服务:指定远程服务器的目标主题。

        • 主题:填入 remote/topic/egress
        • QoS:选择 0,或 ${qos} (跟随消息 QoS)。
        • Retain:通过勾选确认是否以保留消息方式发布消息。
        • 消息模版:转发的消息 Payload 模板,支持使用 ${field} 语法提取数据。
      • 连接池大小:指定本地 MQTT 服务器的客户端连接池的大小。在这个例子中,您可以设置为8
  7. 其他配置(可选),根据情况配置同步/异步模式,队列与批量等参数。

  8. 点击创建按钮完成数据桥接创建。

通过配置文件配置

EMQX 同时也支持使用配置文件创建 MQTT 数据桥接,对应的配置示例如下:

  1. bridges.mqtt.my_mqtt_bridge {
  2. enable = true
  3. server = "broker.emqx.io:1883"
  4. username = "emqx_u"
  5. password = "public"
  6. proto_ver = "v4"
  7. clean_start = true
  8. keepalive = "60s"
  9. reconnect_interval = "10s"
  10. egress {
  11. local {topic = "local/topic/egress"}
  12. remote {
  13. payload = "${payload}"
  14. qos = 1
  15. retain = true
  16. topic = "remote/topic/egress"
  17. }
  18. }
  19. ingress {
  20. local {
  21. topic = "$share/g/remote/topic/ingress"
  22. qos = 1
  23. payload = "${payload}"
  24. }
  25. remote {qos = 1, topic = "local/topic/ingress"}
  26. }
  27. }