Timescale 消息存储

提示

EMQX 3.1 版本后推出强大的规则引擎用于替换插件,建议您前往使用保存数据到 Timescale规则引擎中创建 保存数据到 Timescale

配置 Timescale 消息存储

etc/plugins/emqx_backend_timescale.conf:

  1. ## Timescale Server
  2. backend.timescale.pool1.server = 127.0.0.1:5432
  3. ## Timescale Pool Size
  4. backend.timescale.pool1.pool_size = 8
  5. ## Timescale Username
  6. backend.timescale.pool1.username = postgres
  7. ## Timescale Password
  8. backend.timescale.pool1.password = password
  9. ## Timescale Database
  10. backend.timescale.pool1.database = tutorial
  11. ## Timescale SSL
  12. backend.timescale.pool1.ssl = false
  13. ## SSL keyfile.
  14. ##
  15. ## Value: File
  16. ## backend.timescale.pool1.keyfile =
  17. ## SSL certfile.
  18. ##
  19. ## Value: File
  20. ## backend.timescale.pool1.certfile =
  21. ## SSL cacertfile.
  22. ##
  23. ## Value: File
  24. ## backend.timescale.pool1.cacertfile =
  25. ## Store Publish Message
  26. backend.timescale.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

Timescale Backend 消息存储规则参数:

OptionDescription
topic配置哪些主题下的消息需要执行 hook
action配置 hook 具体动作, function 为 Backend 提供的内置函数, 实现通用功能
poolPool Name, 实现连接多个 Timescale Server 功能

Example:

  1. ## Store Publish message with "sensor/#" topic
  2. backend.timescale.hook.message.publish.1 = {"topic": "sensor/#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
  3. ## Store Publish message with "stat/#" topic
  4. backend.timescale.hook.message.publish.2 = {"topic": "stat/#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

Timescale Backend 支持 Hook 与 相应内置函数列表:

HookFunction list
message.publishon_message_publish

Timescale Backend 提供 emqx_backend_timescale.tmpl 模板文件,用于从不同主题的 MQTT Message 中提取数据以写入 Timescale。

模板文件采用 Json 格式, 组成部分:

  • key - MQTT Topic, 字符串, 支持通配符主题
  • value - Template, Json 对象, 用于将 MQTT Message 转换成 measurement,tag_key=tag_value,... field_key=field_value,... timestamp 的形式以写入 InfluxDB。

你可以为不同 Topic 定义不同的 Template, 也可以为同一个 Topic 定义多个 Template, 类似:

  1. {
  2. <Topic 1>: <Template 1>,
  3. <Topic 2>: <Template 2>
  4. }

Template 格式如下:

  1. {
  2. "name": <Name of template>,
  3. "sql": <SQL INSERT INTO>,
  4. "param_keys": <Param Keys>
  5. }

name, sqlparam_keys 都是必选项。

name 可以是任意的字符串,确保没有重复即可。

sql 为 Timescale 可用的 SQL INSERT INTO 语句,例如:insert into sensor_data(time, location, temperature, humidity) values (NOW(), $1, $2, $3)

param_keys 是一个数组,它的第一个元素对应 sql 中出现的 $1,并以此类推。

数组中任意元素都可以是一个固定值, 它支持的数据类型依赖于你定义的数据表。当然更符合实际情况的是,你可以通过我们提供的占位符来获取 MQTT 消息中的数据。

目前我们支持的占位符如下:

PlaceholderDescription
$idMQTT 消息 UUID, 由 EMQX 分配
$clientid客户端使用的 Client ID
$username客户端使用的 Username
$peerhost客户端 IP
$qosMQTT 消息的 QoS
$topicMQTT 消息主题
$payloadMQTT 消息载荷, 必须为合法的 Json
$<Number>必须配合 $paylaod 使用, 用于从 Json Array 中获取数据
$timestampEMQX 准备转发消息时设置的时间戳, 精度: 毫秒

$payload 与 $<Number>:

你可以直接使用 $payload 取得完整的消息载荷, 也可以通过 ["$payload", <Key>, ...] 取得消息载荷内部的数据。

例如 payload{"data": {"temperature": 23.9}}, 你可以通过占位符 ["$payload", "data", "temperature"] 来获取其中的 23.9

考虑到 Json 还有数组这一数据类型的情况, 我们引入了 $0$<pos_integer>, $0 表示获取数组内所有元素, $<pos_integer> 表示获取数组内第 <pos_integer> 个元素。

一个简单例子, ["$payload", "$0", "temp"] 将从 [{"temp": 20}, {"temp": 21}] 中取得 [20, 21], 而 ["$payload", "$1", "temp"] 将只取得 20

值得注意的是, 当你使用 $0 时,我们希望你取得的数据个数都是相等的。因为我们需要将这些数组转换为多条记录写入 Timescale, 而当你一个字段取得了 3 份数据, 另一个字段却取得了 2 份数据, 我们将无从判断应当怎样为你组合这些数据。

Example

data/templates 目录下提供了一个示例模板 (emqx_backend_timescale_example.tmpl, 正式使用时请去掉文件名中的 “_example” 后缀) 供用户参考:

  1. {
  2. "sensor_data": {
  3. "name": "insert_sensor_data",
  4. "sql": "insert into sensor_data(time, location, temperature, humidity) values (NOW(), $1, $2, $3)",
  5. "param_keys": [
  6. ["$payload", "data", "$0", "location"],
  7. ["$payload", "data", "$0", "temperature"],
  8. ["$payload", "data", "$0", "humidity"]
  9. ]
  10. },
  11. "sensor_data2/#": {
  12. "name": "insert_sensor_data2",
  13. "sql": "insert into sensor_data(time, location, temperature, humidity) values (NOW(), $1, $2, $3)",
  14. "param_keys": [
  15. ["$payload", "location"],
  16. ["$payload", "temperature"],
  17. ["$payload", "humidity"]
  18. ]
  19. },
  20. "easy_data": {
  21. "name": "insert_easy_data",
  22. "sql": "insert into easy_data(time, data) values (NOW(), $1)",
  23. "param_keys": [
  24. "$payload"
  25. ]
  26. }
  27. }

当 Topic 为 “sensor_data” 的 MQTT Message 拥有以下 Payload 时:

  1. {
  2. "data":[
  3. {
  4. "location":"bedroom",
  5. "temperature":21.3,
  6. "humidity":40.3
  7. },
  8. {
  9. "location":"bathroom",
  10. "temperature":22.3,
  11. "humidity":61.8
  12. },
  13. {
  14. "location":"kitchen",
  15. "temperature":29.5,
  16. "humidity":58.7
  17. }
  18. ]
  19. }

[“$payload”, “data”, “$0”, “location”] 会先获取 MQTT Message 的 Payload,如果 Payload 为 json 格式,则继续尝试读取 data。data 的值是数组,这里我们用到了 “$0” 表示获取数组中所有的元素,因此 [“$payload”, “data”, “$0”, “location”] 将帮我们获得 [“bedroom”, “bathroom”, “kitchen”]。相应的,如果将 “$0” 替换为 “$1”,将只获得 [“bedroom”]。相应的,如果将

那么在这个场景中,我们将得到以下 SQL 语句:

  1. insert into sensor_data(time, location, temperature, humidity) values (NOW(), 'bedroom', 21.3, 40.3)
  2. insert into sensor_data(time, location, temperature, humidity) values (NOW(), 'bathroom', 22.3, 61.8)
  3. insert into sensor_data(time, location, temperature, humidity) values (NOW(), 'kitchen', 29.5, 58.7)

最终 Timescale Backend 执行这些 SQL 语句将数据写入 Timescale。