RocketMQ 桥接

EMQ X 桥接转发 MQTT 消息到 RocketMQ 集群:

image

RocketMQ 桥接插件配置文件: etc/plugins/emqx_bridge_rocket.conf。

配置 RocketMQ 集群地址

  1. ## RocketMQ 服务器集群配置
  2. ## bridge.rocket.servers = 127.0.0.1:9876,127.0.0.2:9876,127.0.0.3:9876
  3. bridge.rocket.servers = 127.0.0.1:9876
  4. bridge.rocket.refresh_topic_route_interval = 5S
  5. ## 分区生产者是同步/异步模式选择
  6. bridge.rocket.produce = sync
  7. ## 生产者同步模式下的超时时间
  8. ## bridge.rocket.produce.sync_timeout = 3s
  9. ## 生产者 batch 的消息数量
  10. ## bridge.rocket.producer.batch_size = 100
  11. ## 采用 base64 编码或不编码
  12. ## bridge.rocket.encode_payload_type = base64
  13. ## bridge.rocket.sock.buffer = 32KB
  14. ## bridge.rocket.sock.recbuf = 32KB
  15. bridge.rocket.sock.sndbuf = 1MB
  16. ## bridge.rocket.sock.read_packets = 20

配置 RocketMQ 桥接规则

  1. ## Bridge RocketMQ Hooks
  2. ## ${topic}: the RocketMQ topics to which the messages will be published.
  3. ## ${filter}: the mqtt topic (may contain wildcard) on which the action will be performed .
  4. ## Client Connected Record Hook
  5. bridge.rocket.hook.client.connected.1 = {"topic": "ClientConnected"}
  6. ## Client Disconnected Record Hook
  7. bridge.rocket.hook.client.disconnected.1 = {"topic": "ClientDisconnected"}
  8. ## Session Subscribed Record Hook
  9. bridge.rocket.hook.session.subscribed.1 = {"filter": "#", "topic": "SessionSubscribed"}
  10. ## Session Unsubscribed Record Hook
  11. bridge.rocket.hook.session.unsubscribed.1 = {"filter": "#", "topic": "SessionUnsubscribed"}
  12. ## Message Publish Record Hook
  13. bridge.rocket.hook.message.publish.1 = {"filter": "#", "topic": "MessagePublish"}
  14. ## Message Delivered Record Hook
  15. bridge.rocket.hook.message.delivered.1 = {"filter": "#", "topic": "MessageDeliver"}
  16. ## Message Acked Record Hook
  17. bridge.rocket.hook.message.acked.1 = {"filter": "#", "topic": "MessageAcked"}

RocketMQ 桥接规则说明

事件说明
bridge.rocket.hook.client.connected.1
客户端登录
bridge.rocket.hook.client.disconnected.1
客户端退出
bridge.rocket.hook.session.subscribed.1
订阅主题
bridge.rocket.hook.session.unsubscribed.1
取消订阅主题
bridge.rocket.hook.message.publish.1
发布消息
bridge.rocket.hook.message.delivered.1
delivered 消息
bridge.rocket.hook.message.acked.1
ACK 消息

客户端上下线事件转发 RocketMQ

设备上线 EMQ X 转发上线事件消息到 RocketMQ:

  1. topic = "ClientConnected",
  2. value = {
  3. "client_id": ${clientid},
  4. "username": ${username},
  5. "node": ${node},
  6. "ts": ${ts}
  7. }

设备下线 EMQ X 转发下线事件消息到 RocketMQ:

  1. topic = "ClientDisconnected",
  2. value = {
  3. "client_id": ${clientid},
  4. "username": ${username},
  5. "reason": ${reason},
  6. "node": ${node},
  7. "ts": ${ts}
  8. }

客户端订阅主题事件转发 RocketMQ

  1. topic = "SessionSubscribed"
  2. value = {
  3. "client_id": ${clientid},
  4. "topic": ${topic},
  5. "qos": ${qos},
  6. "node": ${node},
  7. "ts": ${timestamp}
  8. }

客户端取消订阅主题事件转发 RocketMQ

  1. topic = "SessionUnsubscribed"
  2. value = {
  3. "client_id": ${clientid},
  4. "topic": ${topic},
  5. "qos": ${qos},
  6. "node": ${node},
  7. "ts": ${timestamp}
  8. }

MQTT 消息转发到 RocketMQ

  1. topic = "MessagePublish"
  2. value = {
  3. "client_id": ${clientid},
  4. "username": ${username},
  5. "topic": ${topic},
  6. "payload": ${payload},
  7. "qos": ${qos},
  8. "node": ${node},
  9. "ts": ${timestamp}
  10. }

MQTT 消息派发 (Deliver) 事件转发 RocketMQ

  1. topic = "MessageDeliver"
  2. value = {
  3. "client_id": ${clientid},
  4. "username": ${username},
  5. "from": ${fromClientId},
  6. "topic": ${topic},
  7. "payload": ${payload},
  8. "qos": ${qos},
  9. "node": ${node},
  10. "ts": ${timestamp}
  11. }

MQTT 消息确认 (Ack) 事件转发 RocketMQ

  1. topic = "MessageAcked"
  2. value = {
  3. "client_id": ${clientid},
  4. "username": ${username},
  5. "from": ${fromClientId},
  6. "topic": ${topic},
  7. "payload": ${payload},
  8. "qos": ${qos},
  9. "node": ${node},
  10. "ts": ${timestamp}
  11. }

RocketMQ 消费示例

RocketMQ 读取 MQTT 客户端上下线事件消息:

  1. bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer ClientConnected
  2. bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer ClientDisconnected

RocketMQ 读取 MQTT 主题订阅事件消息:

  1. bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer SessionSubscribed
  2. bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer SessionUnsubscribed

RocketMQ 读取 MQTT 发布消息:

  1. bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer MessagePublish

RocketMQ 读取 MQTT 消息发布 (Deliver)、确认 (Ack) 事件:

  1. bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer MessageDeliver
  2. bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer MessageAcked

默认 payload 被 base64 编码,可通过修改配置 bridge.rocket.encode_payload_type 指定 payload 数据格式。

启用 RocketMQ 桥接插件

  1. ./bin/emqx_ctl plugins load emqx_bridge_rocket