EdgeX 消息总线目标

该目标用于将消息发送到 EdgeX 消息总线上。

**请注意,如果你使用的是 ZeorMQ 消息总线,那么该 sink 会创建一个新的 EdgeX 消息总线(绑定到 Kuiper 服务所运行的地址),而不是利用原来既有的消息总线(通常为 application 服务所暴露的地址和端口)。 **

另外,如果你需要在别的主机上对你的端口可以进行访问,你需要在开始运行 Kuiper 服务之前,把端口号映射到主机上。

名称可选Description
protocoltrue协议,如未指定,使用缺省值 tcp.
host消息总线主机地址,使用缺省值 .
port消息总线端口号。 如未指定,使用缺省值 5563.
topic发布的主题名称,如未指定,使用缺省值 events.
contentType发布消息的内容类型,如未指定,使用缺省值 application/json.
metadata该属性为一个字段名称,该字段是 SQL SELECT 子句的一个字段名称,这个字段应该类似于 meta() AS xxx ,用于选出消息中所有的 EdgeX 元数据.
deviceName允许用户指定设备名称,该名称将作为从 Kuiper 中发送出来的 Event 结构体的设备名称.
type消息总线类型,目前支持两种类型的消息总线, zero 或者 mqtt,其中 zero 为缺省类型。
optional如果指定了 mqtt 消息总线,那么还可以指定一下可选的值。请参考以下可选的支持的配置类型。

以下为支持的可选的配置列表,您可以参考 MQTT 协议规范来获取更详尽的信息。

  • optional
    • ClientId
    • Username
    • Password
    • Qos
    • KeepAlive
    • Retained
    • ConnectionPayload
    • CertFile
    • KeyFile
    • CertPEMBlock
    • KeyPEMBlock
    • SkipCertVerify

例子

发布结果到 EdgeX 消息总线,而不保留原有的元数据

在此情况下,原有的元数据 (例如 Events 结构体中的 id, pushed, created, modified, origin,以及Reading 结构体中的 id, created, modified, origin, pushed, device 不会被保留)。Kuiper 在此情况下作为 EdgeX 的一个单独微服务,它有自己的 device name。 提供了属性 deviceName, 该属性允许用户指定 Kuiper 的设备名称。如下所示,

  1. 从 EdgeX 消息总线上的 events 主题上收到的消息,
  1. {
  2. "Device": "demo", "Created": 000,
  3. "readings":
  4. [
  5. {"Name": "Temperature", value: "30", "Created":123 …},
  6. {"Name": "Humidity", value: "20", "Created":456 …}
  7. ]
  8. }
  1. 使用如下的规则,并且在 edgex action 中给属性 deviceName 指定 kuiper
  1. {
  2. "id": "rule1",
  3. "sql": "SELECT temperature * 3 AS t1, humidity FROM events",
  4. "actions": [
  5. {
  6. "edgex": {
  7. "protocol": "tcp",
  8. "host": "*",
  9. "port": 5571,
  10. "topic": "application",
  11. "deviceName": "kuiper",
  12. "contentType": "application/json"
  13. }
  14. }
  15. ]
  16. }
  1. 发送到 EdgeX 消息总线上的数据。
  1. {
  2. "Device": "kuiper", "Created": 0,
  3. "readings":
  4. [
  5. {"Name": "t1", value: "90" , "Created": 0 …},
  6. {"Name": "humidity", value: "20" , "Created": 0 …}
  7. ]
  8. }

请注意,

  • Event 结构体中的设备名称( ``Device)变成了kuiper`
  • Events and Readings 结构体中的数据被更新为新的值。 字段 Created 被 Kuiper 更新为新的值 (这里为 0).

发布结果到 EdgeX 消息总线,并保留原有的元数据

但是在某些场景中,你可能需要保留原来的元数据。比如保留发送到 Kuiper 的设备名称,在本例中为 demo, 还有 reading 数组中的其它元数据。在此情况下,Kuiper 更像是一个过滤器 - 将不关心的数据过滤掉,但是依然保留原有的数据。

参考以下的例子,

  1. 从 EdgeX 消息总线上的 events 主题上收到的消息,
  1. {
  2. "Device": "demo", "Created": 000,
  3. "readings":
  4. [
  5. {"Name": "Temperature", value: "30", "Created":123 …},
  6. {"Name": "Humidity", value: "20", "Created":456 …}
  7. ]
  8. }
  1. 使用如下规则,在edgex action 中,为 metadata 指定值 edgex_meta
  1. {
  2. "id": "rule1",
  3. "sql": "SELECT meta(*) AS edgex_meta, temperature * 3 AS t1, humidity FROM events WHERE temperature > 30",
  4. "actions": [
  5. {
  6. "edgex": {
  7. "protocol": "tcp",
  8. "host": "*",
  9. "port": 5571,
  10. "topic": "application",
  11. "metadata": "edgex_meta",
  12. "contentType": "application/json"
  13. }
  14. }
  15. ]
  16. }

请注意,

  • 用户需要在 SQL 子句中加 meta(*) AS edgex_meta ,函数 meta(*) 返回所有的元数据。
  • edgex action里, 属性 metadata 指定值 edgex_meta 。该属性指定哪个字段包含了元数据。
  1. 发送给 EdgeX 消息总线的数据
  1. {
  2. "Device": "demo", "Created": 000,
  3. "readings":
  4. [
  5. {"Name": "t1", value: "90" , "Created": 0 …},
  6. {"Name": "humidity", value: "20", "Created":456 …}
  7. ]
  8. }

请注意,

  • Events 结构体的元数据依然保留,例如 Device & Created.
  • 对于在原有消息中可以找到的 reading,元数据将继续保留。 比如 humidity 的元数据就是从 EdgeX 消息总线里接收到的原值 - 或者说是旧值
  • 对于在原有消息中无法找到的 reading,元数据将不会被设置。如例子中的 t1 的元数据被设置为 Kuiper 产生的缺省值。
  • 如果你的 SQL 包含了聚合函数,那保留原有的元数据就没有意义,但是 Kuiper 还是会使用时间窗口中的某一条记录的元数据。例如,在下面的 SQL 里, SELECT avg(temperature) AS temperature, meta(*) AS edgex_meta FROM ... GROUP BY TUMBLINGWINDOW(ss, 10). 这种情况下,在时间窗口中可能有几条数据,Kuiper 会使用窗口中的第一条数据的元数据来填充 temperature 的元数据。

结果发布到 MQTT 消息总线

以下是将分析结果发送到 MQTT 消息总线的规则,请注意在optional 中是如何指定 ClientId 的。

  1. {
  2. "id": "rule1",
  3. "sql": "SELECT meta(*) AS edgex_meta, temperature, humidity, humidity*2 as h1 FROM demo WHERE temperature = 20",
  4. "actions": [
  5. {
  6. "edgex": {
  7. "protocol": "tcp",
  8. "host": "127.0.0.1",
  9. "port": 1883,
  10. "topic": "result",
  11. "type": "mqtt",
  12. "metadata": "edgex_meta",
  13. "contentType": "application/json",
  14. "optional": {
  15. "ClientId": "edgex_message_bus_001"
  16. }
  17. }
  18. },
  19. {
  20. "log":{}
  21. }
  22. ]
  23. }