MongoDB

通过 MongoDB 数据桥接可以将 MQTT 消息和客户端事件存储到 MongoDB 中。

提示

EMQX 企业版功能。EMQX 企业版可以为您带来更全面的关键业务场景覆盖、更丰富的数据集成支持,更高的生产级可靠性保证以及 24/7 的全球技术支持,欢迎免费试用 MongoDB - 图1 (opens new window)

前置准备

功能清单

快速开始

安装 MongoDB

通过 Docker 安装并启动 MongoDB:

  1. # 启动一个 MongoDB 容器并设置密码为 public
  2. docker run -d --name mongodb -p 27017:27017 mongo
  3. # 进入容器
  4. docker exec -it mongodb bash
  5. # 在容器中连接到 MongoDB 服务器,
  6. mongo
  7. # 创建用户
  8. use admin
  9. db.createUser({ user: "admin", pwd: "public", roles: [ { role: "root", db: "admin" } ] })
  10. # 创建名为 emqx_data 的数据库
  11. use emqx_data
  12. # 创建名为 emqx_messages 的集合
  13. db.createCollection('emqx_messages')

创建 MongoDB 数据桥接

创建 MongoDB 数据桥接完成对客户端发布消息的存储。

  1. 转到 Dashboard 数据集成 -> 数据桥接页面。
  2. 点击页面右上角的创建
  3. 数据桥接类型中选择 MongoDB,点击下一步
  4. 输入数据桥接名称,要求是大小写英文字母或数字组合。
  5. 部署模式SRV 记录根据情况选择,此处选择 single 与不启用。
  6. 输入 MongoDB 连接信息,数据库名字填写 emqx_data服务器地址填写 127.0.0.1:27017用户名填写 admin密码填写 public集合存储数据的集合,支持通过占位符 ${var_name} 动态设置,本例中填入 emqx_messages。其他配置项保持默认配置即可。
  7. 配置 有效载荷模板,将 clientidtopicqostimestamppayload 字段存储到 MongoDB 中,该模板将通过 MongoDB insert 命令执行,对应模板如下:
  1. {
  2. "clientid": "${clientid}",
  3. "topic": "${topic}",
  4. "qos": ${qos},
  5. "timestamp": ${timestamp},
  6. "payload": ${payload}
  7. }

提示

配置有效载荷模板时需注意以下几点:

  • 所有的需要使用双引号 " 包裹;

  • 不支持自动推导的数据类型:

    • 字符类型的字段需要使用 " 包裹,否则将报错;
    • 数值类型字段不需要包裹,否则将被识别为字符类型;
    • 时间戳、日期和时间类型,如不做特殊处理,则将被识别为数值或字符类型,如希望以日期和时间类型存储,需要在规则 SQL 中使用 mongo_date 函数对字段进行处理,参考 时间与日期函数
  • 允许嵌套对象,当 为 JSON 对象时:

    • 模板中禁止使用双引号嵌套该值,否则将导致执行错误;

    • 对象将按自身结构嵌套存储;

    • 如需将对象存储为 JSON 字符,可以在规则 SQL 中使用 json_encode 函数转换,模板中对应的仍然禁止使用双引号包裹。

  1. 高级配置(可选),根据情况配置同步/异步模式,队列与批量等参数,详细请参考配置参数
  2. 点击创建按钮完成数据桥接创建。此时 EMQX 将提示您数据桥接已成功创建,并询问是否创建相关规则,可点击创建规则按钮或通过点击左侧导航栏的数据集成 -> 规则进行创建。

创建数据转发规则

  1. 点击页面右上角的创建
  2. 输入规则 ID my_rule,在 SQL 编辑器中输入规则,此处选择将 t/# 主题的 MQTT 消息存储至 MongoDB,请确规则选择出来的字段(SELECT 部分)包含所有 SQL 模板中用到的变量。此处规则 SQL 如下:
  1. SELECT
  2. *
  3. FROM
  4. "t/#"

您也可以使用以下 SQL 将 timestamp 保存为日期类型数据、将 JSON 格式的 payload 保存为 JSON 字符串:

  1. SELECT
  2. *,
  3. mongo_date(timestamp) as timestamp,
  4. json_encode(payload) as payload
  5. FROM
  6. "t/#"
  1. 添加动作,在动作下拉框中选择 使用数据桥接转发 选项,选择先前创建好的 MongoDB 数据桥接。点击页面底部的添加
  2. 点击最下方创建按钮完成规则的创建。

至此您已经完成整个创建过程,可以前往 数据集成 -> Flows 页面查看拓扑图,此时应当看到 t/# 主题的消息经过名为 my_rule 的规则处理,处理结果交由 MongoDB 存储。

测试数据桥接与规则

使用 MQTTX 向 t/1 主题发布消息:

  1. mqttx pub -i emqx_c -t t/1 -m '{ "msg": "hello MongoDB" }'

查看数据桥接运行统计,命中、发送成功次数均 +1。

查看数据是否已经写入emqx_messages 集合中:

  1. > db.emqx_messages.find().pretty()
  2. {
  3. "_id" : ObjectId("63db7059df489d01ed000009"),
  4. "clientid" : "emqx_c",
  5. "payload" : {
  6. "msg" : "hello MongoDB"
  7. },
  8. "qos" : 0,
  9. "timestamp" : NumberLong("1675325529070"),
  10. "topic" : "t/1"
  11. }

使用第二种规则 SQL 时,对应数据内容如下:

  1. > db.emqx_messages.find().pretty()
  2. {
  3. "_id" : ObjectId("63db7535df489d01ed000013"),
  4. "clientid" : "emqx_c",
  5. "payload" : "{ \"msg\": \"hello MongoDB\" }",
  6. "qos" : 0,
  7. "timestamp" : ISODate("2023-02-02T08:33:36.715Z"),
  8. "topic" : "t/1"
  9. }