Sparkplug B 是一种建立在 MQTT 3.1.1 基础上的工业物联网数据传输规范。Sparkplug B 在保证灵活性和效率的前提下,使 MQTT 网络具备状态感知和互操作性,为设备制造商和软件提供商提供了统一的数据共享方式。

Neuron 从设备采集到的数据可以通过 Sparkplug B 协议从边缘端传输到 Sparkplug B 应用中,用户也可以从应用程序向 Neuron 发送数据修改指令。Sparkplug B 是运行在 MQTT 之上的应用型协议,所以在 Neuron 中的设置与 MQTT 驱动相似。


这里以通过Neuron南向采集设备实际点位数据,通过北向Sparkplug B插件将数据上报到EMQX在通过编解码功能解码后得到正确完整的数据结果,流程如图:

通过南向驱动采集Modbus TCP模拟器点位值去模拟实际设备点位值,配置如下:

























  1. ## 完整proto文件
  2. syntax = "proto2";
  3. //
  4. // To compile:
  5. // cd client_libraries/java
  6. // protoc --proto_path=../../ --java_out=src/main/java ../../sparkplug_b.proto
  7. //
  8. message Payload {
  9. /*
  10. // Indexes of Data Types
  11. // Unknown placeholder for future expansion.
  12. Unknown = 0;
  13. // Basic Types
  14. Int8 = 1;
  15. Int16 = 2;
  16. Int32 = 3;
  17. Int64 = 4;
  18. UInt8 = 5;
  19. UInt16 = 6;
  20. UInt32 = 7;
  21. UInt64 = 8;
  22. Float = 9;
  23. Double = 10;
  24. Boolean = 11;
  25. String = 12;
  26. DateTime = 13;
  27. Text = 14;
  28. // Additional Metric Types
  29. UUID = 15;
  30. DataSet = 16;
  31. Bytes = 17;
  32. File = 18;
  33. Template = 19;
  34. // Additional PropertyValue Types
  35. PropertySet = 20;
  36. PropertySetList = 21;
  37. */
  38. message Template {
  39. message Parameter {
  40. optional string name = 1;
  41. optional uint32 type = 2;
  42. oneof value {
  43. uint32 int_value = 3;
  44. uint64 long_value = 4;
  45. float float_value = 5;
  46. double double_value = 6;
  47. bool boolean_value = 7;
  48. string string_value = 8;
  49. ParameterValueExtension extension_value = 9;
  50. }
  51. message ParameterValueExtension {
  52. extensions 1 to max;
  53. }
  54. }
  55. optional string version = 1; // The version of the Template to prevent mismatches
  56. repeated Metric metrics = 2; // Each metric is the name of the metric and the datatype of the member but does not contain a value
  57. repeated Parameter parameters = 3;
  58. optional string template_ref = 4; // Reference to a template if this is extending a Template or an instance - must exist if an instance
  59. optional bool is_definition = 5;
  60. extensions 6 to max;
  61. }
  62. message DataSet {
  63. message DataSetValue {
  64. oneof value {
  65. uint32 int_value = 1;
  66. uint64 long_value = 2;
  67. float float_value = 3;
  68. double double_value = 4;
  69. bool boolean_value = 5;
  70. string string_value = 6;
  71. DataSetValueExtension extension_value = 7;
  72. }
  73. message DataSetValueExtension {
  74. extensions 1 to max;
  75. }
  76. }
  77. message Row {
  78. repeated DataSetValue elements = 1;
  79. extensions 2 to max; // For third party extensions
  80. }
  81. optional uint64 num_of_columns = 1;
  82. repeated string columns = 2;
  83. repeated uint32 types = 3;
  84. repeated Row rows = 4;
  85. extensions 5 to max; // For third party extensions
  86. }
  87. message PropertyValue {
  88. optional uint32 type = 1;
  89. optional bool is_null = 2;
  90. oneof value {
  91. uint32 int_value = 3;
  92. uint64 long_value = 4;
  93. float float_value = 5;
  94. double double_value = 6;
  95. bool boolean_value = 7;
  96. string string_value = 8;
  97. PropertySet propertyset_value = 9;
  98. PropertySetList propertysets_value = 10; // List of Property Values
  99. PropertyValueExtension extension_value = 11;
  100. }
  101. message PropertyValueExtension {
  102. extensions 1 to max;
  103. }
  104. }
  105. message PropertySet {
  106. repeated string keys = 1; // Names of the properties
  107. repeated PropertyValue values = 2;
  108. extensions 3 to max;
  109. }
  110. message PropertySetList {
  111. repeated PropertySet propertyset = 1;
  112. extensions 2 to max;
  113. }
  114. message MetaData {
  115. // Bytes specific metadata
  116. optional bool is_multi_part = 1;
  117. // General metadata
  118. optional string content_type = 2; // Content/Media type
  119. optional uint64 size = 3; // File size, String size, Multi-part size, etc
  120. optional uint64 seq = 4; // Sequence number for multi-part messages
  121. // File metadata
  122. optional string file_name = 5; // File name
  123. optional string file_type = 6; // File type (i.e. xml, json, txt, cpp, etc)
  124. optional string md5 = 7; // md5 of data
  125. // Catchalls and future expansion
  126. optional string description = 8; // Could be anything such as json or xml of custom properties
  127. extensions 9 to max;
  128. }
  129. message Metric {
  130. optional string name = 1; // Metric name - should only be included on birth
  131. optional uint64 alias = 2; // Metric alias - tied to name on birth and included in all later DATA messages
  132. optional uint64 timestamp = 3; // Timestamp associated with data acquisition time
  133. optional uint32 datatype = 4; // DataType of the metric/tag value
  134. optional bool is_historical = 5; // If this is historical data and should not update real time tag
  135. optional bool is_transient = 6; // Tells consuming clients such as MQTT Engine to not store this as a tag
  136. optional bool is_null = 7; // If this is null - explicitly say so rather than using -1, false, etc for some datatypes.
  137. optional MetaData metadata = 8; // Metadata for the payload
  138. optional PropertySet properties = 9;
  139. oneof value {
  140. uint32 int_value = 10;
  141. uint64 long_value = 11;
  142. float float_value = 12;
  143. double double_value = 13;
  144. bool boolean_value = 14;
  145. string string_value = 15;
  146. bytes bytes_value = 16; // Bytes, File
  147. DataSet dataset_value = 17;
  148. Template template_value = 18;
  149. MetricValueExtension extension_value = 19;
  150. }
  151. message MetricValueExtension {
  152. extensions 1 to max;
  153. }
  154. }
  155. optional uint64 timestamp = 1; // Timestamp at message sending time
  156. repeated Metric metrics = 2; // Repeated forever - no limit in Google Protobufs
  157. optional uint64 seq = 3; // Sequence number
  158. optional string uuid = 4; // UUID to track message type in terms of schema definitions
  159. optional bytes body = 5; // To optionally bypass the whole definition above
  160. extensions 6 to max; // For third party extensions
  161. }



  2. schema_decode('neuron', payload, 'Payload') as SparkPlugB
  3. FROM
  4. "spBv1.0/group1/DDATA/node1/modbus"

这里的关键点在于 schema_decode('neuron', payload, 'Payload'):

  • schema_decode 函数将 payload 字段的内容按照 ‘protobuf_person’ 这个 Schema 来做解码;
  • as SparkPlugB 将解码后的值保存到变量 “SparkPlugB” 里;
  • 最后一个参数 Payload 指明了 payload 中的消息的类型是 protobuf schema 里定义的 ‘Payload’ 类型。



  • 动作类型:消息重新发布
  • 目的主题:SparkPlugB/test

这个动作将解码之后的 “Payload” 以 JSON 的格式发送到 SparkPlugB/test 这个主题。







Neuron上报数据到EMQX的Topic是根据Sparkplug B协议规范定义的namespace/group_id/DDATA/edge_node_id/device_id,如图:


