OpenTSDB 数据存储

OpenTSDB是可扩展的分布式时序数据库,底层依赖 HBase

面对大规模快速增长的物联网传感器采集、交易记录等数据,时间序列数据累计速度非常快,时序数据库通过提高效率来处理这种大规模数据,并带来性能的提升,包括:更高的容纳率(Ingest Rates)、更快的大规模查询(尽管有一些比其他数据库支持更多的查询)以及更好的数据压缩。

本章节以在 CentOS 7.2 中的实际例子来说明如何通过 OpenTSDB 来存储相关的信息。

安装与验证 OpenTSDB 服务器

读者可以参考 OpenTSDB 官方文档) 或 Docker 来下载安装 OpenTSDB 服务器,本文章使用 OpenTSDB 2.4.0 版本。

配置 EMQ X 服务器

通过 RPM 方式安装的 EMQ X,OpenTSDB 相关的配置文件位于 /etc/emqx/plugins/emqx_backend_opentsdb.conf,考虑到功能定位,OpenTSDB 插件仅支持消息存储功能。

配置连接地址与连接池大小、batch 策略:

  1. ## OpenTSDB Server 接入地址
  2. backend.opentsdb.pool1.server = 127.0.0.1:4242
  3. ## OpenTSDB Pool Size
  4. backend.opentsdb.pool1.pool_size = 8
  5. ## Whether or not to return summary information
  6. ##
  7. ## Value: true | false
  8. backend.opentsdb.pool1.summary = true
  9. ## Whether or not to return detailed information
  10. ##
  11. ## Value: true | false
  12. backend.opentsdb.pool1.details = false
  13. ## Whether or not to wait for the data to be flushed to storage before returning the results.
  14. ##
  15. ## Value: true | false
  16. backend.opentsdb.pool1.sync = false
  17. ## A timeout, in milliseconds, to wait for the data to be flushed to
  18. ## storage before returning with an error.
  19. ##
  20. ## Value: Duration
  21. ##
  22. ## Default: 0
  23. backend.opentsdb.pool1.sync_timeout = 0
  24. ## Max batch size of put 最大批量写条数
  25. ##
  26. ## Value: Number >= 0
  27. ## Default: 20
  28. backend.opentsdb.pool1.max_batch_size = 20
  29. ## Store Publish Message QOS > 0
  30. backend.opentsdb.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

OpenTSDB Backend 消息存储规则参数:

通过 topic 过滤器,设置需要存储消息的主题,pool 参数区别多个数据源:

  1. ## Store Publish Message
  2. backend.opentsdb.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

启动该插件,启动插件的方式有 命令行控制台两种方式,读者可以任选其一。

消息模板

由于 MQTT Message 无法直接写入 OpenTSDB, OpenTSDB Backend 提供了 emqx_backend_opentsdb.tmpl 模板文件将 MQTT Message 转换为可写入 OpenTSDB 的 DataPoint。

tmpl 文件位于 data/templates/emqx_backend_opentsdb_example.tmpl,使用 json 格式, 用户可以为不同 Topic 定义不同的 Template, 类似:

  1. {
  2. "sample": {
  3. "measurement": "$topic",
  4. "tags": {
  5. "host": ["$payload", "data", "$0", "host"],
  6. "region": ["$payload", "data", "$0", "region"],
  7. "qos": "$qos",
  8. "from": "$from"
  9. },
  10. "value": ["$payload", "data", "$0", "temp"],
  11. "timestamp": "$timestamp"
  12. }
  13. }

其中, measurement 与 fields 为必选项, tags 与 timestamp 为可选项。 支持通过占位符如 $key 提取变量名为 key 的变量,支持的变量如下:

  • qos: 消息 QoS
  • form: 发布者信息
  • topic: 发布主题
  • timestamp: 时间戳
  • payload.*: JSON 消息体内任意变量,如 { "data": [{ "temp": 1 }] } 使用 ["$payload", "data", "temp"] 可以提取出 1

本示例设定模板如下:

  1. {
  2. "sample": {
  3. "measurement": "$topic",
  4. "tags": {
  5. "host": ["$payload", "data", "$0", "host"],
  6. "region": ["$payload", "data", "$0", "region"],
  7. "qos": "$qos",
  8. "from": "$from"
  9. },
  10. "value": ["$payload", "data", "$0", "temp"],
  11. "timestamp": "$timestamp"
  12. }
  13. }

当 Topic 为”sample” 的 MQTT Message 拥有以下 Payload 时:

  1. {
  2. "data": [
  3. {
  4. "temp": 1,
  5. "host": "serverA",
  6. "region": "hangzhou"
  7. },
  8. {
  9. "temp": 2,
  10. "host": "serverB",
  11. "region": "ningbo"
  12. }
  13. ]
  14. }

Backend 会将 MQTT Message 转换为:

  1. [
  2. {
  3. "measurement": "sample",
  4. "tags": {
  5. "from": "mqttjs_ebcc36079a",
  6. "host": "serverA",
  7. "qos": "0",
  8. "region": "hangzhou"
  9. },
  10. "value": "1",
  11. "timestamp": "1560743513626681000"
  12. },
  13. {
  14. "measurement": "sample",
  15. "tags": {
  16. "from": "mqttjs_ebcc36079a",
  17. "host": "serverB",
  18. "qos": "0",
  19. "region": "ningbo"
  20. },
  21. "value": "2",
  22. "timestamp": "1560743513626681000"
  23. }
  24. ]

使用示例

EMQ X 管理控制台 WebSocket 页面中,向 sample 主题发布如上格式消息消息,消息将解析存储到 OpenTSDB udp 数据库对应的 measurement 中。

总结

读者在理解了 OpenTSDB 中所存储的数据结构,可以结合 OpenTSDB 拓展相关应用。