Pulsar 桥接

EMQ X 桥接转发 MQTT 消息到 Pulsar 集群:

image

Pulsar 桥接插件配置文件: etc/plugins/emqx_bridge_pulsar.conf。

配置 Pulsar 集群地址

  1. ## Pulsar 服务器集群配置
  2. ## bridge.pulsar.servers = 127.0.0.1:6650,127.0.0.2:6650,127.0.0.3:6650
  3. bridge.pulsar.servers = 127.0.0.1:6650
  4. ## 分区生产者是同步/异步模式选择
  5. bridge.pulsar.produce = sync
  6. ## 生产者同步模式下的超时时间
  7. ## bridge.pulsar.produce.sync_timeout = 3s
  8. ## 生产者 batch 的消息数量
  9. ## bridge.pulsar.producer.batch_size = 1000
  10. ## 默认情况下不为生产者启用压缩选项
  11. ## bridge.pulsar.producer.compression = no_compression
  12. ## 采用 base64 编码或不编码
  13. ## bridge.pulsar.encode_payload_type = base64
  14. ## bridge.pulsar.sock.buffer = 32KB
  15. ## bridge.pulsar.sock.recbuf = 32KB
  16. bridge.pulsar.sock.sndbuf = 1MB
  17. ## bridge.pulsar.sock.read_packets = 20

配置 Pulsar 桥接规则

  1. ## Bridge Pulsar Hooks
  2. ## ${topic}: the pulsar topics to which the messages will be published.
  3. ## ${filter}: the mqtt topic (may contain wildcard) on which the action will be performed .
  4. ## Client Connected Record Hook
  5. bridge.pulsar.hook.client.connected.1 = {"topic": "client_connected"}
  6. ## Client Disconnected Record Hook
  7. bridge.pulsar.hook.client.disconnected.1 = {"topic": "client_disconnected"}
  8. ## Session Subscribed Record Hook
  9. bridge.pulsar.hook.session.subscribed.1 = {"filter": "#", "topic": "session_subscribed"}
  10. ## Session Unsubscribed Record Hook
  11. bridge.pulsar.hook.session.unsubscribed.1 = {"filter": "#", "topic": "session_unsubscribed"}
  12. ## Message Publish Record Hook
  13. bridge.pulsar.hook.message.publish.1 = {"filter": "#", "topic": "message_publish"}
  14. ## Message Delivered Record Hook
  15. bridge.pulsar.hook.message.delivered.1 = {"filter": "#", "topic": "message_delivered"}
  16. ## Message Acked Record Hook
  17. bridge.pulsar.hook.message.acked.1 = {"filter": "#", "topic": "message_acked"}
  18. ## More Configures
  19. ## partitioner strategy:
  20. ## Option: random | roundrobin | first_key_dispatch
  21. ## Example: bridge.pulsar.hook.message.publish.1 = {"filter":"#", "topic":"message_publish", "strategy":"random"}
  22. ## key:
  23. ## Option: ${clientid} | ${username}
  24. ## Example: bridge.pulsar.hook.message.publish.1 = {"filter":"#", "topic":"message_publish", "key":"${clientid}"}
  25. ## format:
  26. ## Option: json | json
  27. ## Example: bridge.pulsar.hook.message.publish.1 = {"filter":"#", "topic":"message_publish", "format":"json"}

Pulsar 桥接规则说明

事件说明
bridge.pulsar.hook.client.connected.1
客户端登录
bridge.pulsar.hook.client.disconnected.1
客户端退出
bridge.pulsar.hook.session.subscribed.1
订阅主题
bridge.pulsar.hook.session.unsubscribed.1
取消订阅主题    
bridge.pulsar.hook.message.publish.1
发布消息
bridge.pulsar.hook.message.delivered.1
delivered 消息
bridge.pulsar.hook.message.acked.1
ACK 消息

客户端上下线事件转发 Pulsar

设备上线 EMQ X 转发上线事件消息到 Pulsar:

  1. topic = "client_connected",
  2. value = {
  3. "client_id": ${clientid},
  4. "username": ${username},
  5. "node": ${node},
  6. "ts": ${ts}
  7. }

设备下线 EMQ X 转发下线事件消息到 Pulsar:

  1. topic = "client_disconnected",
  2. value = {
  3. "client_id": ${clientid},
  4. "username": ${username},
  5. "reason": ${reason},
  6. "node": ${node},
  7. "ts": ${ts}
  8. }

客户端订阅主题事件转发 Pulsar

  1. topic = session_subscribed
  2. value = {
  3. "client_id": ${clientid},
  4. "topic": ${topic},
  5. "qos": ${qos},
  6. "node": ${node},
  7. "ts": ${timestamp}
  8. }

客户端取消订阅主题事件转发 Pulsar

  1. topic = session_unsubscribed
  2. value = {
  3. "client_id": ${clientid},
  4. "topic": ${topic},
  5. "qos": ${qos},
  6. "node": ${node},
  7. "ts": ${timestamp}
  8. }

MQTT 消息转发到 Pulsar

  1. topic = message_publish
  2. value = {
  3. "client_id": ${clientid},
  4. "username": ${username},
  5. "topic": ${topic},
  6. "payload": ${payload},
  7. "qos": ${qos},
  8. "node": ${node},
  9. "ts": ${timestamp}
  10. }

MQTT 消息派发 (Deliver) 事件转发 Pulsar

  1. topic = message_delivered
  2. value = {
  3. "client_id": ${clientid},
  4. "username": ${username},
  5. "from": ${fromClientId},
  6. "topic": ${topic},
  7. "payload": ${payload},
  8. "qos": ${qos},
  9. "node": ${node},
  10. "ts": ${timestamp}
  11. }

MQTT 消息确认 (Ack) 事件转发 Pulsar

  1. topic = message_acked
  2. value = {
  3. "client_id": ${clientid},
  4. "username": ${username},
  5. "from": ${fromClientId},
  6. "topic": ${topic},
  7. "payload": ${payload},
  8. "qos": ${qos},
  9. "node": ${node},
  10. "ts": ${timestamp}
  11. }

Pulsar 消费示例

Pulsar 读取 MQTT 客户端上下线事件消息:

  1. pulsar-client consume client_connected -s "client_connected" -n 1000
  2. pulsar-client consume client_disconnected -s "client_disconnected" -n 1000

Pulsar 读取 MQTT 主题订阅事件消息:

  1. pulsar-client consume session_subscribed -s "session_subscribed" -n 1000
  2. pulsar-client consume session_unsubscribed -s "session_unsubscribed" -n 1000

Pulsar 读取 MQTT 发布消息:

  1. pulsar-client consume message_publish -s "message_publish" -n 1000

Pulsar 读取 MQTT 消息发布 (Deliver)、确认 (Ack)事件:

  1. pulsar-client consume message_delivered -s "message_delivered" -n 1000
  2. pulsar-client consume message_acked -s "message_acked" -n 1000

TIP

默认 payload 被 base64 编码,可通过修改配置 bridge.pulsar.encode_payload_type 指定 payload 数据格式。

启用 Pulsar 桥接插件

  1. ./bin/emqx_ctl plugins load emqx_bridge_pulsar