MongoDB 消息存储

提示

EMQX 3.1 版本后推出强大的规则引擎用于替换插件,建议您前往使用保存数据到 MongoDB规则引擎中创建 保存数据到 MongoDB

配置 MongoDB 消息存储

配置文件: emqx_backend_mongo.conf

配置 MongoDB 服务器

支持配置多台 MongoDB 服务器连接池:

  1. ## MongoDB 部署类型: single | unknown | sharded | rs
  2. backend.mongo.pool1.type = single
  3. ## 是否启用 SRV 和 TXT 记录解析
  4. backend.mongo.pool1.srv_record = false
  5. ## 如果您的 MongoDB 以副本集方式部署,则需要指定相应的副本集名称
  6. ##
  7. ## 如果启用了 srv_record,即 backend.mongo.<Pool>.srv_record 设置为 true,
  8. ## 且您的 MongoDB 服务器域名添加了包含 replicaSet 选项的 DNS TXT 记录,
  9. ## 那么可以忽略此配置项
  10. ## backend.mongo.pool1.rs_set_name = testrs
  11. ## MongoDB 服务器地址列表
  12. ##
  13. ## 如果你的 URI 具有以下格式:
  14. ## mongodb://[username:password@]host1[:port1][,...hostN[:portN]][/[defaultauthdb][?options]]
  15. ## 请将 backend.mongo.<Pool>.server 配置为 host1[:port1][,...hostN[:portN]]
  16. ##
  17. ## 如果你的 URI 具有以下格式:
  18. ## mongodb+srv://server.example.com
  19. ## 请将 backend.mongo.<Pool>.server 配置为 server.example.com,并将 srv_record
  20. ## 设置为 true,EMQX 将自动查询 SRV 和 TXT 记录以获取服务列表和 replicaSet 等选项
  21. ##
  22. ## 现已支持 IPv6 和域名
  23. backend.mongo.pool1.server = 127.0.0.1:27017
  24. ## MongoDB 连接池大小
  25. backend.mongo.pool1.c_pool_size = 8
  26. ## 连接的数据库名称
  27. backend.mongo.pool1.database = mqtt
  28. ## MongoDB 认证用户名密码
  29. ## backend.mongo.pool1.login = emqtt
  30. ## backend.mongo.pool1.password = emqtt
  31. ## 指定用于授权的数据库,没有指定时默认为 admin
  32. ##
  33. ## 如果启用了 srv_record,即 backend.mongo.<Pool>.srv_record 设置为 true,
  34. ## 且您的 MongoDB 服务器域名添加了包含 authSource 选项的 DNS TXT 记录,
  35. ## 那么可以忽略此配置项
  36. ## backend.mongo.pool1.auth_source = admin
  37. ## 是否开启 SSL
  38. ## backend.mongo.pool1.ssl = false
  39. ## SSL 密钥文件路径
  40. ## backend.mongo.pool1.keyfile =
  41. ## SSL 证书文件路径
  42. ## backend.mongo.pool1.certfile =
  43. ## SSL CA 证书文件路径
  44. ## backend.mongo.pool1.cacertfile =
  45. ## MongoDB 数据写入模式: unsafe | safe
  46. ## backend.mongo.pool1.w_mode = safe
  47. ## MongoDB 数据读取模式: master | slaver_ok
  48. ## backend.mongo.pool1.r_mode = slave_ok
  49. ## MongoDB 底层 driver 配置, 保持默认即可
  50. ## backend.mongo.topology.pool_size = 1
  51. ## backend.mongo.topology.max_overflow = 0
  52. ## backend.mongo.topology.overflow_ttl = 1000
  53. ## backend.mongo.topology.overflow_check_period = 1000
  54. ## backend.mongo.topology.local_threshold_ms = 1000
  55. ## backend.mongo.topology.connect_timeout_ms = 20000
  56. ## backend.mongo.topology.socket_timeout_ms = 100
  57. ## backend.mongo.topology.server_selection_timeout_ms = 30000
  58. ## backend.mongo.topology.wait_queue_timeout_ms = 1000
  59. ## backend.mongo.topology.heartbeat_frequency_ms = 10000
  60. ## backend.mongo.topology.min_heartbeat_frequency_ms = 1000
  61. ## MongoDB Backend Hooks
  62. backend.mongo.hook.client.connected.1 = {"action": {"function": "on_client_connected"}, "pool": "pool1"}
  63. backend.mongo.hook.session.created.1 = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}
  64. backend.mongo.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}
  65. backend.mongo.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "pool": "pool1", "offline_opts": {"time_range": "2h", "max_returned_count": 500}}
  66. backend.mongo.hook.session.subscribed.2 = {"topic": "#", "action": {"function": "on_retain_lookup"}, "pool": "pool1"}
  67. backend.mongo.hook.session.unsubscribed.1= {"topic": "#", "action": {"function": "on_acked_delete"}, "pool": "pool1"}
  68. backend.mongo.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
  69. backend.mongo.hook.message.publish.2 = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}
  70. backend.mongo.hook.message.publish.3 = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}
  71. backend.mongo.hook.message.acked.1 = {"topic": "#", "action": {"function": "on_message_acked"}, "pool": "pool1"}
  72. ## 获取离线消息
  73. ### "offline_opts": 获取离线消息的配置
  74. #### - max_returned_count: 单次拉去的最大离线消息数目
  75. #### - time_range: 仅拉去在当前时间范围的消息
  76. ## backend.mongo.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "pool": "pool1", "offline_opts": {"time_range": "2h", "max_returned_count": 500}}
  77. ## 如果需要存储 Qos0 消息, 可开启以下配置
  78. ## 提示: 当开启以下配置时, 需关闭 'on_message_fetch', 否则 qos1, qos2 消息会被存储俩次
  79. ## backend.mongo.hook.message.publish.4 = {"topic": "#", "action": {"function": "on_message_store"}, "pool": "pool1", "payload_format": "mongo_json"}

backend 消息存储规则包括:

hooktopicaction说明
client.connectedon_client_connected存储客户端在线状态
session.createdon_subscribe_lookup订阅主题
client.disconnectedon_client_disconnected存储客户端离线状态
session.subscribed#on_message_fetch获取离线消息
session.subscribed#on_retain_lookup获取retain消息
session.unsubscribed#on_acked_delete删除 acked 消息
message.publish#on_message_publish存储发布消息
message.publish#on_message_retain存储retain消息
message.publish#on_retain_delete删除retain消息
message.acked#on_message_acked消息ACK处理

MongoDB 数据库初始化

  1. use mqtt
  2. db.createCollection("mqtt_client")
  3. db.createCollection("mqtt_sub")
  4. db.createCollection("mqtt_msg")
  5. db.createCollection("mqtt_retain")
  6. db.createCollection("mqtt_acked")
  7. db.mqtt_client.ensureIndex({clientid:1, node:2})
  8. db.mqtt_sub.ensureIndex({clientid:1})
  9. db.mqtt_msg.ensureIndex({sender:1, topic:2})
  10. db.mqtt_retain.ensureIndex({topic:1})

MongoDB 用户状态集合(Client Collection)

mqtt_client 存储设备在线状态:

  1. {
  2. clientid: string,
  3. state: 0,1, //0离线 1在线
  4. node: string,
  5. online_at: timestamp,
  6. offline_at: timestamp
  7. }

查询设备在线状态:

  1. db.mqtt_client.findOne({clientid: ${clientid}})

例如 ClientId 为 test 客户端上线:

  1. db.mqtt_client.findOne({clientid: "test"})
  2. {
  3. "_id" : ObjectId("58646c9bdde89a9fb9f7fb73"),
  4. "clientid" : "test",
  5. "state" : 1,
  6. "node" : "emqx@127.0.0.1",
  7. "online_at" : 1482976411,
  8. "offline_at" : null
  9. }

例如 ClientId 为 test 客户端下线:

  1. db.mqtt_client.findOne({clientid: "test"})
  2. {
  3. "_id" : ObjectId("58646c9bdde89a9fb9f7fb73"),
  4. "clientid" : "test",
  5. "state" : 0,
  6. "node" : "emqx@127.0.0.1",
  7. "online_at" : 1482976411,
  8. "offline_at" : 1482976501
  9. }

MongoDB 用户订阅主题集合(Subscription Collection)

mqtt_sub 存储订阅关系:

  1. {
  2. clientid: string,
  3. topic: string,
  4. qos: 0,1,2
  5. }

用户 test 分别订阅主题 test_topic0 test_topic1 test_topic2:

  1. db.mqtt_sub.insert({clientid: "test", topic: "test_topic1", qos: 1})
  2. db.mqtt_sub.insert({clientid: "test", topic: "test_topic2", qos: 2})

某个客户端订阅主题:

  1. db.mqtt_sub.find({clientid: ${clientid}})

查询 ClientId 为 “test” 的客户端已订阅主题:

  1. db.mqtt_sub.find({clientid: "test"})
  2. { "_id" : ObjectId("58646d90c65dff6ac9668ca1"), "clientid" : "test", "topic" : "test_topic1", "qos" : 1 }
  3. { "_id" : ObjectId("58646d96c65dff6ac9668ca2"), "clientid" : "test", "topic" : "test_topic2", "qos" : 2 }

MongoDB 发布消息集合(Message Collection)

mqtt_msg 存储 MQTT 消息:

  1. {
  2. _id: int,
  3. topic: string,
  4. msgid: string,
  5. sender: string,
  6. qos: 0,1,2,
  7. retain: boolean (true, false),
  8. payload: string,
  9. arrived: timestamp
  10. }

查询某个客户端发布的消息:

  1. db.mqtt_msg.find({sender: ${clientid}})

查询 ClientId 为 “test” 的客户端发布的消息:

  1. db.mqtt_msg.find({sender: "test"})
  2. {
  3. "_id" : 1,
  4. "topic" : "/World",
  5. "msgid" : "AAVEwm0la4RufgAABeIAAQ==",
  6. "sender" : "test",
  7. "qos" : 1,
  8. "retain" : 1,
  9. "payload" : "Hello world!",
  10. "arrived" : 1482976729
  11. }

MongoDB 保留消息集合(Retain Message Collection)

mqtt_retain 存储 Retain 消息:

  1. {
  2. topic: string,
  3. msgid: string,
  4. sender: string,
  5. qos: 0,1,2,
  6. payload: string,
  7. arrived: timestamp
  8. }

查询 retain 消息:

  1. db.mqtt_retain.findOne({topic: ${topic}})

查询topic为 “t/retain” 的 retain 消息:

  1. db.mqtt_retain.findOne({topic: "t/retain"})
  2. {
  3. "_id" : ObjectId("58646dd9dde89a9fb9f7fb75"),
  4. "topic" : "t/retain",
  5. "msgid" : "AAVEwm0la4RufgAABeIAAQ==",
  6. "sender" : "c1",
  7. "qos" : 1,
  8. "payload" : "Hello world!",
  9. "arrived" : 1482976729
  10. }

MongoDB 接收消息 ack 集合(Message Acked Collection)

mqtt_acked 存储客户端消息确认:

  1. {
  2. clientid: string,
  3. topic: string,
  4. mongo_id: int
  5. }

启用 MongoDB 数据存储插件

  1. ./bin/emqx_ctl plugins load emqx_backend_mongo