EMQ X 节点之间的桥接

多个 EMQ X 节点可以以桥接的方式互联,相互之间按照定义的规则转发和订阅消息,满足用户特定场景的需要。接下来我们以一个配置例说明如何建立 EMQ X 节点间的桥接。

场景描述

假设我们有两台 EMQ X 服务器,'emqx1'和'emqx2',我们需要在'emqx1'上创建一条桥接把所有"传感器(sensor)"主题消息转发至'emqx2',并订阅所有"控制(control)"主题。

节点节点名监听端口
emqx1emqx1@192.168.1.1001883
emqx2emqx2@192.168.1.1011883

配置 EMQ X 桥接

为了使 EMQ X 节点可以桥接其他节点,我们需要在配置文件emqx.conf做相应的配置。

在emqx1上,打开emqx.conf, 找到Bridges部分,在其中添加一个新的桥接配置。EMQ X 的桥接配置项的格式为bridge.bridge_name.directive1[.sub_directives],是一个以.分隔的字符串。桥接配置项总是以bridge开头,然后是自定义的桥接名,然后是配置项内容,如果配置项内容有子配置项,可以在后面继续以这种方式添加。

添加一个新桥接配置

通过以下配置我们可以添加一个新的 EMQ X 桥接

  1. ##--------------------------------------------------------------------
  2. ## Bridge example
  3. ##--------------------------------------------------------------------
  4. ## Start type of the bridge.
  5. ##
  6. ## Value: enum
  7. ## manual
  8. ## auto
  9. bridge.example.start_type = manual
  10. ## Bridge reconnect time.
  11. ##
  12. ## Value: Duration
  13. ## Default: 30 seconds
  14. bridge.example.reconnect_interval = 30s
  15. ## Bridge address: node name for local bridge, host:port for remote.
  16. ##
  17. ## Value: String
  18. ## Example: emqx2@192.168.1.101, or 192.168.1.101:1883
  19. bridge.example.address = 192.168.1.101:1883

在上面的配置指令中,我们定义了桥接的名字,启动方式,重连间隔,以及远端节点的地址。一个名字为example的桥接,这个桥接以手动的方式启动,在桥接断线时,以30秒的间隔重新连接远端的节点。bridge.example.address = 192.168.1.101:1883 为桥接指定了远端Broker的地址和监听端口。

配置桥接 MQTT 协议版本

配置 MQTT 协议版本的配置指令为:

  1. bridge.example.proto_ver = mqttv4

在这里配置值有三个可选项,为mqttv3, mqttv4mqttv5,分别对应 MQTT 协议的 3.1, 3.1.15.0版本。配置例以MQTT 3.1.1协议版本完成桥接节点间的通讯。

配置桥接时使用的客户端

在连接远端节点时,EMQ X 需要向对方提供自己的身份识别,通过对端的身份认证和鉴权。根据具体情况,一般需要配置clientid, usernamepassword等信息。

  1. bridge.example.client_id = bridge_example
  2. bridge.example.username = user
  3. bridge.example.password = passwd

配置桥接的连接属性

在配置中指定连接相关的属性。如clean_start,keepalive, mountpoint等。

  1. bridge.example.clean_start = false
  2. bridge.example.keepalive = 60s
  3. bridge.example.mountpoint = bridge/example/${node}/

在桥接断线重连时,我们一般会需要重用既有会话的特性来保证断线时缓存的消息不被丢弃,所以clean_start一般可以设置为falsekeepalive则可以按照网络情况和应用的具体需求设置。mountpoint是 topic 的安装点,由桥接转发的消息的主题将被加上mountpoint中指定的前缀然后发送给各个订阅该主题的节点。

配置桥接的转发和订阅

桥接以转发消息到远端节点,从远端节点订阅主题并发送到本地订阅的客户端上。所有我们需要指定用于转发和订阅的主题。

设置转发主题

  1. bridge.example.forwards = sensor/#

通过forwards配置项,可以为一个桥接设置一个或多个转发主题。如果有多个主题需要转发的话,主题之间用逗号(,)分隔。在主题中可以使用通配符。转发的消息以qos = 1的级别发送给远端节点。

设置订阅主题可以为一个桥接设置多个订阅,每个订阅都有自己独立的配置项。

  1. bridge.example.subscription.1.topic = control/#
  2. bridge.example.subscription.1.qos = 1

上面的配置例为example桥接以qos = 1的服务级别订阅了control/#主题。主题中可以使用通配符。如果需要订阅多个主题,可以继续添加bridge.example.subscription.2.topicbridge.example.subscription.2.qos 配置项。

配置桥接的消息队列

可以为桥接配置消息队列来缓存待转发的消息。

  1. bridge.example.mqueue_type = memory ##memory | disk
  2. bridge.example.max_pending_messages = 10000

以上配置例为example桥接设置了一个队列长度为10000的内存队列。EMQ X 也可以以磁盘文件的方式处理桥接的缓存消息队列。

配置安全连接

EMQ X 支持TLS/SSL方式的桥接以提高传输安全性。如果不需要开启这个特性,则不用配置下面的项。

  1. bridge.example.cacertfile = cacert.pem
  2. bridge.example.certfile = cert.pem
  3. bridge.example.keyfile = key.pem
  4. bridge.example.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384
  5. bridge.example.tls_versions = tlsv1.2,tlsv1.1,tlsv1

上面的配置例为example桥接指定了远程节点所持有的证书的CA证书,本地节点的证书文件和私钥,支持的安全套件选项和允许使用的TLS版本。这个配置在TLS开启了双向的证书认证,如果不需要客户端证书认证,则无需配置bridge.example.certfilebridge.example.keyfile这两项。

管理桥接

EMQ X 的emqx_ctl命令行工具提供了桥接管理的功能,它可以显示桥接状态,启停桥接,显示/增加/删除转发项,显示/增加/删除订阅项。

在完成上述配置后,我们执行以下命令来查看桥接状态, 启动桥接和转发/订阅的配置:

  1. $ ./emqx_ctl bridges list
  2. name: example status: Stopped
  3. $ ./emqx_ctl bridges start example
  4. start bridge successfully.
  5. $ ./emqx_ctl bridges list
  6. name: example status: Running
  7. $ ./emqx_ctl bridges forwards example
  8. topic: sensor/#
  9. $ ./emqx_ctl bridges subscriptions example
  10. topic: control/#, qos: 1

测试桥接

我们使用 mosquitto_pubmosquitto_sub 工具来测试上面的配置是否生效。

在 'emqx2' 上订阅'sensor/#'主题:

  1. $ mosquitto_sub -t sensor/# -p 1883 -d

在'emqx1'上对主题'sensor/1/temperature'发布消息:

  1. $ mosquitto_pub -t sensor/1/temperature -m "37.5" -d

发布之后,在'emqx2'上应能正常收到消息。

  1. $ mosquitto_sub -t "bridge/example/#" -p 1883 -d -h 192.168.1.101
  2. Client mosqsub|11612-Zeus- sending CONNECT
  3. Client mosqsub|11612-Zeus- received CONNACK
  4. Client mosqsub|11612-Zeus- sending SUBSCRIBE (Mid: 1, Topic: bridge/example/#, QoS: 0)
  5. Client mosqsub|11612-Zeus- received SUBACK
  6. Subscribed (mid: 1): 0
  7. Client mosqsub|11612-Zeus- received PUBLISH (d0, q0, r0, m0, 'bridge/example/emqx1@192.168.1.100/sensor/1/temperature', ... (4 bytes))
  8. 37.5

在 'emqx1' 上订阅'control/#'主题:

  1. $ mosquitto_sub -t control/# -p 1883 -d

在'emqx2'上对主题'control/device1/restart'发布消息:

  1. mosquitto_pub -t control/device1 -m "list_all" -d -h 192.168.1.101

发布之后,在'emqx1'上应能正常收到消息。

  1. $ mosquitto_sub -t "control/#" -p 1883 -d -h 192.168.1.100
  2. Client mosqsub|11625-Zeus- sending CONNECT
  3. Client mosqsub|11625-Zeus- received CONNACK
  4. Client mosqsub|11625-Zeus- sending SUBSCRIBE (Mid: 1, Topic: control/#, QoS: 0)
  5. Client mosqsub|11625-Zeus- received SUBACK
  6. Subscribed (mid: 1): 0
  7. Client mosqsub|11625-Zeus- received PUBLISH (d0, q0, r0, m0, 'control/device1', ... (8 bytes))
  8. list_all