OpenTSDB 消息存储

配置 OpenTSDB 消息存储

配置文件:etc/plugins/emqx_backend_opentsdb.conf:

  1. ## OpenTSDB 服务地址
  2. backend.opentsdb.pool1.server = 127.0.0.1:4242
  3. ## OpenTSDB 连接池大小
  4. backend.opentsdb.pool1.pool_size = 8
  5. ## 是否返回 summary info
  6. ##
  7. ## Value: true | false
  8. backend.opentsdb.pool1.summary = true
  9. ## 是否返回 detailed info
  10. ##
  11. ## Value: true | false
  12. backend.opentsdb.pool1.details = false
  13. ## 是否同步写入
  14. ##
  15. ## Value: true | false
  16. backend.opentsdb.pool1.sync = false
  17. ## 同步写入超时时间,单位毫秒
  18. ##
  19. ## Value: Duration
  20. ##
  21. ## Default: 0
  22. backend.opentsdb.pool1.sync_timeout = 0
  23. ## 最大批量写条数
  24. ##
  25. ## Value: Number >= 0
  26. ## Default: 20
  27. backend.opentsdb.pool1.max_batch_size = 20
  28. ## 存储 PUBLISH 消息
  29. backend.opentsdb.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

OpenTSDB Backend 消息存储规则参数:

OptionDescription
topic配置哪些主题下的消息需要执行 hook
action配置 hook 具体动作, function 为 Backend 提供的内置函数, 实现通用功能
poolPool Name, 实现连接多个 OpenTSDB Server 功能

示例:

  1. ## 存储主题为 "sensor/#" 的 PUBLISH 消息
  2. backend.influxdb.hook.message.publish.1 = {"topic": "sensor/#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
  3. ## 存储主题为 "stat/#" 的 PUBLISH 消息
  4. backend.influxdb.hook.message.publish.2 = {"topic": "stat/#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

OpenTSDB Backend 支持 Hook 与 相应内置函数列表:

HookFunction list
message.publishon_message_publish

由于 MQTT Message 无法直接写入 OpenTSDB, OpenTSDB Backend 提供了 emqx_backend_opentsdb.tmpl 模板文件将 MQTT Message 转换为可写入 OpenTSDB 的 DataPoint。

模板文件采用 Json 格式, 组成部分:

  • key - MQTT Topic, 字符串, 支持通配符主题
  • value - Template, Json 对象, 用于将 MQTT Message 转换成 OpenTSDB 的 DataPoint。

你可以为不同 Topic 定义不同的 Template, 也可以为同一个 Topic 定义多个 Template, 类似:

  1. {
  2. \<Topic 1>: \<Template 1>,
  3. \<Topic 2>: \<Template 2>
  4. }

Template 格式如下:

  1. {
  2. "measurement": \<Measurement>,
  3. "tags": {
  4. \<Tag Key>: \<Tag Value>
  5. },
  6. "value": \<Value>,
  7. "timestamp": \<Timestamp>
  8. }

measurementvalue 为必选项, tagstimestamp 为可选项。

所有的值 (例如 \<Measurement> ) 你都可以直接在 Template 中配置为一个固定值, 它支持的数据类型依赖于你定义的数据表。当然更符合实际情况的是,你可以通过我们提供的占位符来获取 MQTT 消息中的数据。

目前我们支持的占位符如下:

PlaceholderDescription
$idMQTT 消息 UUID, 由 EMQ X 分配
$clientid客户端使用的 Client ID
$username客户端使用的 Username
$peerhost客户端 IP
$qosMQTT 消息的 QoS
$topicMQTT 消息主题
$payloadMQTT 消息载荷, 必须为合法的 Json
$<Number>必须配合 $paylaod 使用, 用于从 Json Array 中获取数据
$timestampEMQ X 准备转发消息时设置的时间戳, 精度: 毫秒

$payload 与 $ <Number>:

你可以直接使用 $payload 取得完整的消息载荷, 也可以通过 ["$payload", \<Key>, ...] 取得消息载荷内部的数据。

例如 payload{"data": {"temperature": 23.9}} , 你可以通过占位符 ["$payload", "data", "temperature"] 来获取其中的 23.9

考虑到 Json 还有数组这一数据类型的情况, 我们引入了 $0$\<pos_integer> , $0 表示获取数组内所有元素, $\<pos_integer> 表示获取数组内第 \<pos_integer> 个元素。

一个简单例子, ["$payload", "$0", "temp"] 将从 [{"temp": 20}, {"temp": 21}] 中取得 [20, 21] , 而 ["$payload", "$1", "temp"] 将只取得 20

值得注意的是, 当你使用 $0 时,我们希望你取得的数据个数都是相等的。因为我们需要将这些数组转换为多条记录写入 OpenTSDB, 而当你一个字段取得了 3 份数据, 另一个字段却取得了 2 份数据, 我们将无从判断应当怎样为你组合这些数据。

Example

data/templates 目录下提供了一个示例模板 (emqx_backend_opentsdb_example.tmpl, 正式使用时请去掉文件名中的 “_example” 后缀) 供用户参考:

  1. {
  2. "sample": {
  3. "measurement": "$topic",
  4. "tags": {
  5. "host": ["$payload", "data", "$0", "host"],
  6. "region": ["$payload", "data", "$0", "region"],
  7. "qos": "$qos",
  8. "clientid": "$clientid"
  9. },
  10. "value": ["$payload", "data", "$0", "temp"],
  11. "timestamp": "$timestamp"
  12. }
  13. }

当 Topic 为 “sample” 的 MQTT Message 拥有以下 Payload 时:

  1. {
  2. "data": [
  3. {
  4. "temp": 1,
  5. "host": "serverA",
  6. "region": "hangzhou"
  7. },
  8. {
  9. "temp": 2,
  10. "host": "serverB",
  11. "region": "ningbo"
  12. }
  13. ]
  14. }

Backend 将 MQTT Message 转换为以下数据写入 OpenTSDB:

  1. [
  2. {
  3. "measurement": "sample",
  4. "tags": {
  5. "clientid": "mqttjs_ebcc36079a",
  6. "host": "serverA",
  7. "qos": "0",
  8. "region": "hangzhou",
  9. },
  10. "value": "1",
  11. "timestamp": "1560743513626681000"
  12. },
  13. {
  14. "measurement": "sample",
  15. "tags": {
  16. "clientid": "mqttjs_ebcc36079a",
  17. "host": "serverB",
  18. "qos": "0",
  19. "region": "ningbo",
  20. },
  21. "value": "2",
  22. "timestamp": "1560743513626681000"
  23. }
  24. ]

启用 OpenTSDB 消息存储:

  1. ./bin/emqx_ctl plugins load emqx_backend_opentsdb