Sparkplug B

概览

Sparkplug B 是一种建立在 MQTT 3.1.1 基础上的工业物联网数据传输规范。Sparkplug B 在保证灵活性和效率的前提下,使 MQTT 网络具备状态感知和互操作性,为设备制造商和软件提供商提供了统一的数据共享方式。

Neuron 从设备采集到的数据可以通过 Sparkplug B 协议从边缘端传输到 Sparkplug B 应用中,用户也可以从应用程序向 Neuron 发送数据修改指令。Sparkplug B 是运行在 MQTT 之上的应用型协议,所以在 Neuron 中的设置与 MQTT 驱动相似。

示例

这里以通过Neuron南向采集设备实际点位数据,通过北向Sparkplug B插件将数据上报到EMQX在通过编解码功能解码后得到正确完整的数据结果,流程如图:

Sparkplug B

Neuron

南向

通过南向驱动采集Modbus TCP模拟器点位值去模拟实际设备点位值,配置如下:

添加设备

image-20230419133807028

设备配置

image-20230419134424414

image-20230419134500446

Group创建

image-20230419134630777

点位创建

image-20230419134741736

北向

添加应用

image-20230419134941116

应用配置

image-20230419135022571

image-20230419135247025

添加订阅

image-20230419135416517

EMQX

如果直接订阅Neuron北向SparkPlugB上报到的EMQX数据,则会出现字符串乱码的情况,如图:

image-20230419140026304

所以可以通过EMQX规则引擎编解码的能力,编写一个相应的proto文件结合规则引擎将上报的数据进行解码后得到正确完整的数据结果。

创建编解码

image-20230419140540482

image-20230419140724126

  1. ## 完整proto文件
  2. syntax = "proto2";
  3. //
  4. // To compile:
  5. // cd client_libraries/java
  6. // protoc --proto_path=../../ --java_out=src/main/java ../../sparkplug_b.proto
  7. //
  8. message Payload {
  9. /*
  10. // Indexes of Data Types
  11. // Unknown placeholder for future expansion.
  12. Unknown = 0;
  13. // Basic Types
  14. Int8 = 1;
  15. Int16 = 2;
  16. Int32 = 3;
  17. Int64 = 4;
  18. UInt8 = 5;
  19. UInt16 = 6;
  20. UInt32 = 7;
  21. UInt64 = 8;
  22. Float = 9;
  23. Double = 10;
  24. Boolean = 11;
  25. String = 12;
  26. DateTime = 13;
  27. Text = 14;
  28. // Additional Metric Types
  29. UUID = 15;
  30. DataSet = 16;
  31. Bytes = 17;
  32. File = 18;
  33. Template = 19;
  34. // Additional PropertyValue Types
  35. PropertySet = 20;
  36. PropertySetList = 21;
  37. */
  38. message Template {
  39. message Parameter {
  40. optional string name = 1;
  41. optional uint32 type = 2;
  42. oneof value {
  43. uint32 int_value = 3;
  44. uint64 long_value = 4;
  45. float float_value = 5;
  46. double double_value = 6;
  47. bool boolean_value = 7;
  48. string string_value = 8;
  49. ParameterValueExtension extension_value = 9;
  50. }
  51. message ParameterValueExtension {
  52. extensions 1 to max;
  53. }
  54. }
  55. optional string version = 1; // The version of the Template to prevent mismatches
  56. repeated Metric metrics = 2; // Each metric is the name of the metric and the datatype of the member but does not contain a value
  57. repeated Parameter parameters = 3;
  58. optional string template_ref = 4; // Reference to a template if this is extending a Template or an instance - must exist if an instance
  59. optional bool is_definition = 5;
  60. extensions 6 to max;
  61. }
  62. message DataSet {
  63. message DataSetValue {
  64. oneof value {
  65. uint32 int_value = 1;
  66. uint64 long_value = 2;
  67. float float_value = 3;
  68. double double_value = 4;
  69. bool boolean_value = 5;
  70. string string_value = 6;
  71. DataSetValueExtension extension_value = 7;
  72. }
  73. message DataSetValueExtension {
  74. extensions 1 to max;
  75. }
  76. }
  77. message Row {
  78. repeated DataSetValue elements = 1;
  79. extensions 2 to max; // For third party extensions
  80. }
  81. optional uint64 num_of_columns = 1;
  82. repeated string columns = 2;
  83. repeated uint32 types = 3;
  84. repeated Row rows = 4;
  85. extensions 5 to max; // For third party extensions
  86. }
  87. message PropertyValue {
  88. optional uint32 type = 1;
  89. optional bool is_null = 2;
  90. oneof value {
  91. uint32 int_value = 3;
  92. uint64 long_value = 4;
  93. float float_value = 5;
  94. double double_value = 6;
  95. bool boolean_value = 7;
  96. string string_value = 8;
  97. PropertySet propertyset_value = 9;
  98. PropertySetList propertysets_value = 10; // List of Property Values
  99. PropertyValueExtension extension_value = 11;
  100. }
  101. message PropertyValueExtension {
  102. extensions 1 to max;
  103. }
  104. }
  105. message PropertySet {
  106. repeated string keys = 1; // Names of the properties
  107. repeated PropertyValue values = 2;
  108. extensions 3 to max;
  109. }
  110. message PropertySetList {
  111. repeated PropertySet propertyset = 1;
  112. extensions 2 to max;
  113. }
  114. message MetaData {
  115. // Bytes specific metadata
  116. optional bool is_multi_part = 1;
  117. // General metadata
  118. optional string content_type = 2; // Content/Media type
  119. optional uint64 size = 3; // File size, String size, Multi-part size, etc
  120. optional uint64 seq = 4; // Sequence number for multi-part messages
  121. // File metadata
  122. optional string file_name = 5; // File name
  123. optional string file_type = 6; // File type (i.e. xml, json, txt, cpp, etc)
  124. optional string md5 = 7; // md5 of data
  125. // Catchalls and future expansion
  126. optional string description = 8; // Could be anything such as json or xml of custom properties
  127. extensions 9 to max;
  128. }
  129. message Metric {
  130. optional string name = 1; // Metric name - should only be included on birth
  131. optional uint64 alias = 2; // Metric alias - tied to name on birth and included in all later DATA messages
  132. optional uint64 timestamp = 3; // Timestamp associated with data acquisition time
  133. optional uint32 datatype = 4; // DataType of the metric/tag value
  134. optional bool is_historical = 5; // If this is historical data and should not update real time tag
  135. optional bool is_transient = 6; // Tells consuming clients such as MQTT Engine to not store this as a tag
  136. optional bool is_null = 7; // If this is null - explicitly say so rather than using -1, false, etc for some datatypes.
  137. optional MetaData metadata = 8; // Metadata for the payload
  138. optional PropertySet properties = 9;
  139. oneof value {
  140. uint32 int_value = 10;
  141. uint64 long_value = 11;
  142. float float_value = 12;
  143. double double_value = 13;
  144. bool boolean_value = 14;
  145. string string_value = 15;
  146. bytes bytes_value = 16; // Bytes, File
  147. DataSet dataset_value = 17;
  148. Template template_value = 18;
  149. MetricValueExtension extension_value = 19;
  150. }
  151. message MetricValueExtension {
  152. extensions 1 to max;
  153. }
  154. }
  155. optional uint64 timestamp = 1; // Timestamp at message sending time
  156. repeated Metric metrics = 2; // Repeated forever - no limit in Google Protobufs
  157. optional uint64 seq = 3; // Sequence number
  158. optional string uuid = 4; // UUID to track message type in terms of schema definitions
  159. optional bytes body = 5; // To optionally bypass the whole definition above
  160. extensions 6 to max; // For third party extensions
  161. }

创建规则

SQL语句

  1. SELECT
  2. schema_decode('neuron', payload, 'Payload') as SparkPlugB
  3. FROM
  4. "spBv1.0/group1/DDATA/node1/modbus"

这里的关键点在于 schema_decode('neuron', payload, 'Payload'):

  • schema_decode 函数将 payload 字段的内容按照 ‘protobuf_person’ 这个 Schema 来做解码;
  • as SparkPlugB 将解码后的值保存到变量 “SparkPlugB” 里;
  • 最后一个参数 Payload 指明了 payload 中的消息的类型是 protobuf schema 里定义的 ‘Payload’ 类型。

image-20230419141423179

然后使用以下参数添加动作:

  • 动作类型:消息重新发布
  • 目的主题:SparkPlugB/test

这个动作将解码之后的 “Payload” 以 JSON 的格式发送到 SparkPlugB/test 这个主题。

image-20230419141513303

验证

这里通过MQTTX工具去订阅通过EMQX规则引擎编解码功能解码后的数据,如图:

image-20230419141831882

如上图,可以看到解码前的原数据是乱码的,解码后得到完整正确的数据结果;至此,通过Neuron南向采集设备点位值,北向SparkPlugB上报到EMQX,通过编解码功能解码得到完整的数据结果已完成。

附件

Neuron上报数据到EMQX的Topic是根据Sparkplug B协议规范定义的namespace/group_id/DDATA/edge_node_id/device_id,如图:

image-20230419143059088

至于更多Neuron北向Sparkplug B插件相关标准如何定义的,可以参考Sparkplug B协议规范🔗Sparkplug 应用 - 图18 (opens new window)