Integrate with EMQX

This page introduces how to use the Neuron southbound driver to collect data, then report the data to EMQX through the northbound Sparkplug B plug-in, and the correct and complete data results are obtained after decoding through the codec function. The process is shown in the figure:

Sparkplug B

Configure Neuron

Southbound Device

Collect the Modbus TCP simulator point value through the southbound drive to simulate the actual device point value, the configuration is as follows:

Add device

image-20230421141232191

Device Configuration

image-20230421141356669

image-20230421141441471

Create group

image-20230421141538315

Create Tag

image-20230421141639201

Northbound application

Add application

image-20230421141821812

Application configuration

image-20230421141912355

image-20230421142020855

Add subscription

image-20230421142109283

Configure EMQX

You can use MQTTX to view the forwarded messages to EMQX from Neuron. However, if you directly subscribe to the EMQX data reported by Neuron northbound to SparkPlugB, there will be garbled strings, as shown in the figure:

image-20230421151918685

Therefore, through the encoding and decoding capabilities of the EMQX rule engine, a corresponding proto file can be written and combined with the rule engine to decode the reported data and obtain correct and complete data results.

Create Schema Registry

image-20230421142801332

image-20230421142920299

  1. // Complete proto file
  2. syntax = "proto2";
  3. //
  4. // To compile:
  5. // cd client_libraries/java
  6. // protoc --proto_path=../../ --java_out=src/main/java ../../sparkplug_b.proto
  7. //
  8. message Payload {
  9. /*
  10. // Indexes of Data Types
  11. // Unknown placeholder for future expansion.
  12. Unknown = 0;
  13. // Basic Types
  14. Int8 = 1;
  15. Int16 = 2;
  16. Int32 = 3;
  17. Int64 = 4;
  18. UInt8 = 5;
  19. UInt16 = 6;
  20. UInt32 = 7;
  21. UInt64 = 8;
  22. Float = 9;
  23. Double = 10;
  24. Boolean = 11;
  25. String = 12;
  26. DateTime = 13;
  27. Text = 14;
  28. // Additional Metric Types
  29. UUID = 15;
  30. DataSet = 16;
  31. Bytes = 17;
  32. File = 18;
  33. Template = 19;
  34. // Additional PropertyValue Types
  35. PropertySet = 20;
  36. PropertySetList = 21;
  37. */
  38. message Template {
  39. message Parameter {
  40. optional string name = 1;
  41. optional uint32 type = 2;
  42. oneof value {
  43. uint32 int_value = 3;
  44. uint64 long_value = 4;
  45. float float_value = 5;
  46. double double_value = 6;
  47. bool boolean_value = 7;
  48. string string_value = 8;
  49. ParameterValueExtension extension_value = 9;
  50. }
  51. message ParameterValueExtension {
  52. extensions 1 to max;
  53. }
  54. }
  55. optional string version = 1; // The version of the Template to prevent mismatches
  56. repeated Metric metrics = 2; // Each metric is the name of the metric and the datatype of the member but does not contain a value
  57. repeated Parameter parameters = 3;
  58. optional string template_ref = 4; // Reference to a template if this is extending a Template or an instance - must exist if an instance
  59. optional bool is_definition = 5;
  60. extensions 6 to max;
  61. }
  62. message DataSet {
  63. message DataSetValue {
  64. oneof value {
  65. uint32 int_value = 1;
  66. uint64 long_value = 2;
  67. float float_value = 3;
  68. double double_value = 4;
  69. bool boolean_value = 5;
  70. string string_value = 6;
  71. DataSetValueExtension extension_value = 7;
  72. }
  73. message DataSetValueExtension {
  74. extensions 1 to max;
  75. }
  76. }
  77. message Row {
  78. repeated DataSetValue elements = 1;
  79. extensions 2 to max; // For third party extensions
  80. }
  81. optional uint64 num_of_columns = 1;
  82. repeated string columns = 2;
  83. repeated uint32 types = 3;
  84. repeated Row rows = 4;
  85. extensions 5 to max; // For third party extensions
  86. }
  87. message PropertyValue {
  88. optional uint32 type = 1;
  89. optional bool is_null = 2;
  90. oneof value {
  91. uint32 int_value = 3;
  92. uint64 long_value = 4;
  93. float float_value = 5;
  94. double double_value = 6;
  95. bool boolean_value = 7;
  96. string string_value = 8;
  97. PropertySet propertyset_value = 9;
  98. PropertySetList propertysets_value = 10; // List of Property Values
  99. PropertyValueExtension extension_value = 11;
  100. }
  101. message PropertyValueExtension {
  102. extensions 1 to max;
  103. }
  104. }
  105. message PropertySet {
  106. repeated string keys = 1; // Names of the properties
  107. repeated PropertyValue values = 2;
  108. extensions 3 to max;
  109. }
  110. message PropertySetList {
  111. repeated PropertySet propertyset = 1;
  112. extensions 2 to max;
  113. }
  114. message MetaData {
  115. // Bytes specific metadata
  116. optional bool is_multi_part = 1;
  117. // General metadata
  118. optional string content_type = 2; // Content/Media type
  119. optional uint64 size = 3; // File size, String size, Multi-part size, etc
  120. optional uint64 seq = 4; // Sequence number for multi-part messages
  121. // File metadata
  122. optional string file_name = 5; // File name
  123. optional string file_type = 6; // File type (i.e. xml, json, txt, cpp, etc)
  124. optional string md5 = 7; // md5 of data
  125. // Catchalls and future expansion
  126. optional string description = 8; // Could be anything such as json or xml of custom properties
  127. extensions 9 to max;
  128. }
  129. message Metric {
  130. optional string name = 1; // Metric name - should only be included on birth
  131. optional uint64 alias = 2; // Metric alias - tied to name on birth and included in all later DATA messages
  132. optional uint64 timestamp = 3; // Timestamp associated with data acquisition time
  133. optional uint32 datatype = 4; // DataType of the metric/tag value
  134. optional bool is_historical = 5; // If this is historical data and should not update real time tag
  135. optional bool is_transient = 6; // Tells consuming clients such as MQTT Engine to not store this as a tag
  136. optional bool is_null = 7; // If this is null - explicitly say so rather than using -1, false, etc for some datatypes.
  137. optional MetaData metadata = 8; // Metadata for the payload
  138. optional PropertySet properties = 9;
  139. oneof value {
  140. uint32 int_value = 10;
  141. uint64 long_value = 11;
  142. float float_value = 12;
  143. double double_value = 13;
  144. bool boolean_value = 14;
  145. string string_value = 15;
  146. bytes bytes_value = 16; // Bytes, File
  147. DataSet dataset_value = 17;
  148. Template template_value = 18;
  149. MetricValueExtension extension_value = 19;
  150. }
  151. message MetricValueExtension {
  152. extensions 1 to max;
  153. }
  154. }
  155. optional uint64 timestamp = 1; // Timestamp at message sending time
  156. repeated Metric metrics = 2; // Repeated forever - no limit in Google Protobufs
  157. optional uint64 seq = 3; // Sequence number
  158. optional string uuid = 4; // UUID to track message type in terms of schema definitions
  159. optional bytes body = 5; // To optionally bypass the whole definition above
  160. extensions 6 to max; // For third party extensions
  161. }

Create Rule

SQL statement

  1. SELECT
  2. schema_decode('neuron', payload, 'Payload') as SparkPlugB
  3. FROM
  4. "spBv1.0/group1/DDATA/node1/modbus"

The key point here is schema_decode('neuron', payload, 'Payload'):

  • schema_decode The function decodes the content of the payload field according to the Schema ‘protobuf_person’;
  • as SparkPlugB Store the decoded value in the variable “SparkPlugB”;
  • The last parameter Payload indicates that the message type in the payload is the ‘Payload’ type defined in the protobuf schema.

image-20230421143952695

Then add the action with the following parameters:

  • Action Type: Message republish
  • Purpose topic: SparkPlugB/test

This action sends the decoded “Payload” to the SparkPlugB/test topic in JSON format.

image-20230421143453011

Verify with MQTTX

Here, the MQTTX tool is used to subscribe to the data decoded by the codec function of the EMQX rule engine, as shown in the figure:

image-20230421151817047

As shown in the figure above, it can be seen that the original data before decoding is garbled, and the complete and correct data result is obtained after decoding; so far, the point value of the device is collected in the south direction of Neuron, and reported to EMQX in the north direction of SparkPlugB, and the complete data is obtained by decoding through the codec function Data results are complete.

Further Reading

The topic that Neuron reports data to EMQX is namespace/group_id/DDATA/edge_node_id/device_id defined according to the Sparkplug B protocol specification, as shown in the figure:

image-20230419143059088

As for how to define more Neuron northbound Sparkplug B plug-in related standards, you can refer to the Sparkplug B protocol specification🔗