RabbitMQ 桥接

EMQ X 桥接转发 MQTT 消息到 RabbitMQ 集群:

image

RabbitMQ 桥接插件配置文件: etc/plugins/emqx_bridge_rabbit.conf。

配置 RabbitMQ 桥接地址

  1. ## RabbitMQ 的服务器地址
  2. bridge.rabbit.1.server = 127.0.0.1:5672
  3. ## RabbitMQ 的连接池大小
  4. bridge.rabbit.1.pool_size = 4
  5. ## RabbitMQ 的用户名
  6. bridge.rabbit.1.username = guest
  7. ## RabbitMQ 的密码
  8. bridge.rabbit.1.password = guest
  9. ## RabbitMQ 的虚拟 Host
  10. bridge.rabbit.1.virtual_host = /
  11. ## RabbitMQ 的心跳间隔
  12. bridge.rabbit.1.heartbeat = 0
  13. # bridge.rabbit.2.server = 127.0.0.1:5672
  14. # bridge.rabbit.2.pool_size = 8
  15. # bridge.rabbit.2.username = guest
  16. # bridge.rabbit.2.password = guest
  17. # bridge.rabbit.2.virtual_host = /
  18. # bridge.rabbit.2.heartbeat = 0

配置 RabbitMQ 桥接规则

  1. ## Bridge Hooks
  2. bridge.rabbit.hook.client.subscribe.1 = {"action": "on_client_subscribe", "rabbit": 1, "exchange": "direct:emq.subscription"}
  3. bridge.rabbit.hook.client.unsubscribe.1 = {"action": "on_client_unsubscribe", "rabbit": 1, "exchange": "direct:emq.unsubscription"}
  4. bridge.rabbit.hook.message.publish.1 = {"topic": "$SYS/#", "action": "on_message_publish", "rabbit": 1, "exchange": "topic:emq.$sys"}
  5. bridge.rabbit.hook.message.publish.2 = {"topic": "#", "action": "on_message_publish", "rabbit": 1, "exchange": "topic:emq.pub"}
  6. bridge.rabbit.hook.message.acked.1 = {"topic": "#", "action": "on_message_acked", "rabbit": 1, "exchange": "topic:emq.acked"}

客户端订阅主题事件转发 RabbitMQ

  1. routing_key = subscribe
  2. exchange = emq.subscription
  3. headers = [{<<"x-emq-client-id">>, binary, ClientId}]
  4. payload = jsx:encode([{Topic, proplists:get_value(qos, Opts)} || {Topic, Opts} <- TopicTable])

客户端取消订阅事件转发 RabbitMQ

  1. routing_key = unsubscribe
  2. exchange = emq.unsubscription
  3. headers = [{<<"x-emq-client-id">>, binary, ClientId}]
  4. payload = jsx:encode([Topic || {Topic, _Opts} <- TopicTable]),

MQTT 消息转发 RabbitMQ

  1. routing_key = binary:replace(binary:replace(Topic, <<"/">>, <<".">>, [global]),<<"+">>, <<"*">>, [global])
  2. exchange = emq.$sys | emq.pub
  3. headers = [{<<"x-emq-publish-qos">>, byte, Qos},
  4. {<<"x-emq-client-id">>, binary, pub_from(From)},
  5. {<<"x-emq-publish-msgid">>, binary, emqx_base62:encode(Id)},
  6. {<<"x-emqx-topic">>, binary, Topic}]
  7. payload = Payload

MQTT 消息确认 (Ack) 事件转发 RabbitMQ

  1. routing_key = puback
  2. exchange = emq.acked
  3. headers = [{<<"x-emq-msg-acked">>, binary, ClientId}],
  4. payload = emqx_base62:encode(Id)

RabbitMQ 订阅消费 MQTT 消息示例

Python RabbitMQ消费者代码示例:

  1. #!/usr/bin/env python
  2. import pika
  3. import sys
  4. connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
  5. channel = connection.channel()
  6. channel.exchange_declare(exchange='direct:emq.subscription', exchange_type='direct')
  7. result = channel.queue_declare(exclusive=True)
  8. queue_name = result.method.queue
  9. channel.queue_bind(exchange='direct:emq.subscription', queue=queue_name, routing_key= 'subscribe')
  10. def callback(ch, method, properties, body):
  11. print(" [x] %r:%r" % (method.routing_key, body))
  12. channel.basic_consume(callback, queue=queue_name, no_ack=True)
  13. channel.start_consuming()

其他语言 RabbitMQ 客户端代码示例:

https://github.com/rabbitmq/rabbitmq-tutorialsRabbitMQ 桥接 - 图2 (opens new window)

启用 RabbitMQ 桥接插件

  1. ./bin/emqx_ctl plugins load emqx_bridge_rabbit