EMQX Broker writing

MQTT is a popular IoT data transfer protocol. EMQX is an open-source MQTT Broker software. You can write MQTT data directly to TDengine without any code. You only need to setup “rules” in EMQX Dashboard to create a simple configuration. EMQX supports saving data to TDengine by sending data to a web service and provides a native TDengine driver for direct saving in the Enterprise Edition. Please refer to the EMQX official documentation for details on how to use it.).

Prerequisites

The following preparations are required for EMQX to add TDengine data sources correctly.

  • The TDengine cluster is deployed and working properly
  • taosAdapter is installed and running properly. Please refer to the taosAdapter manual for details.
  • If you use the emulated writers described later, you need to install the appropriate version of Node.js. V12 is recommended.

Install and start EMQX

Depending on the current operating system, users can download the installation package from the EMQX official website and execute the installation. After installation, use sudo emqx start or sudo systemctl start emqx to start the EMQX service.

Note: this chapter is based on EMQX v4.4.5. Other version of EMQX probably change its user interface, configuration methods or functions.

Create Database and Table

In this step we create the appropriate database and table schema in TDengine for receiving MQTT data. Open TDengine CLI and execute SQL bellow:

  1. CREATE DATABASE test;
  2. USE test;
  3. CREATE TABLE sensor_data (ts TIMESTAMP, temperature FLOAT, humidity FLOAT, volume FLOAT, pm10 FLOAT, pm25 FLOAT, so2 FLOAT, no2 FLOAT, co FLOAT, sensor_id NCHAR(255), area TINYINT, coll_time TIMESTAMP);

Configuring EMQX Rules

Since the configuration interface of EMQX differs from version to version, here is v4.4.5 as an example. For other versions, please refer to the corresponding official documentation.

Login EMQX Dashboard

Use your browser to open the URL http://IP:18083 and log in to EMQX Dashboard. The initial installation username is admin and the password is: public.

TDengine Database EMQX login dashboard

Creating Rule

Select “Rule” in the “Rule Engine” on the left and click the “Create” button: !

TDengine Database EMQX rule engine

Edit SQL fields

Copy SQL bellow and paste it to the SQL edit area:

  1. SELECT
  2. payload
  3. FROM
  4. "sensor/data"

TDengine Database EMQX create rule

Add “action handler”

TDengine Database EMQX add action handler

Add “Resource”

TDengine Database EMQX create resource

Select “Data to Web Service” and click the “New Resource” button.

Edit “Resource”

Select “WebHook” and fill in the request URL as the address and port of the server running taosAdapter (default is 6041). Leave the other properties at their default values.

TDengine Database EMQX edit resource

Edit “action”

Edit the resource configuration to add the key/value pairing for Authorization. If you use the default TDengine username and password then the value of key Authorization is:

  1. Basic cm9vdDp0YW9zZGF0YQ==

Please refer to the TDengine REST API documentation for the authorization in details.

Enter the rule engine replacement template in the message body:

  1. INSERT INTO test.sensor_data VALUES(
  2. now,
  3. ${payload.temperature},
  4. ${payload.humidity},
  5. ${payload.volume},
  6. ${payload.PM10},
  7. ${payload.pm25},
  8. ${payload.SO2},
  9. ${payload.NO2},
  10. ${payload.CO},
  11. '${payload.id}',
  12. ${payload.area},
  13. ${payload.ts}
  14. )

TDengine Database EMQX edit action

Finally, click the “Create” button at bottom left corner saving the rule.

Compose program to mock data

  1. // mock.js
  2. const mqtt = require('mqtt')
  3. const Mock = require('mockjs')
  4. const EMQX_SERVER = 'mqtt://localhost:1883'
  5. const CLIENT_NUM = 10
  6. const STEP = 5000 // Data interval in ms
  7. const AWAIT = 5000 // Sleep time after data be written once to avoid data writing too fast
  8. const CLIENT_POOL = []
  9. startMock()
  10. function sleep(timer = 100) {
  11. return new Promise(resolve => {
  12. setTimeout(resolve, timer)
  13. })
  14. }
  15. async function startMock() {
  16. const now = Date.now()
  17. for (let i = 0; i < CLIENT_NUM; i++) {
  18. const client = await createClient(`mock_client_${i}`)
  19. CLIENT_POOL.push(client)
  20. }
  21. // last 24h every 5s
  22. const last = 24 * 3600 * 1000
  23. for (let ts = now - last; ts <= now; ts += STEP) {
  24. for (const client of CLIENT_POOL) {
  25. const mockData = generateMockData()
  26. const data = {
  27. ...mockData,
  28. id: client.clientId,
  29. area: 0,
  30. ts,
  31. }
  32. client.publish('sensor/data', JSON.stringify(data))
  33. }
  34. const dateStr = new Date(ts).toLocaleTimeString()
  35. console.log(`${dateStr} send success.`)
  36. await sleep(AWAIT)
  37. }
  38. console.log(`Done, use ${(Date.now() - now) / 1000}s`)
  39. }
  40. /**
  41. * Init a virtual mqtt client
  42. * @param {string} clientId ClientID
  43. */
  44. function createClient(clientId) {
  45. return new Promise((resolve, reject) => {
  46. const client = mqtt.connect(EMQX_SERVER, {
  47. clientId,
  48. })
  49. client.on('connect', () => {
  50. console.log(`client ${clientId} connected`)
  51. resolve(client)
  52. })
  53. client.on('reconnect', () => {
  54. console.log('reconnect')
  55. })
  56. client.on('error', (e) => {
  57. console.error(e)
  58. reject(e)
  59. })
  60. })
  61. }
  62. /**
  63. * Generate mock data
  64. */
  65. function generateMockData() {
  66. return {
  67. "temperature": parseFloat(Mock.Random.float(22, 100).toFixed(2)),
  68. "humidity": parseFloat(Mock.Random.float(12, 86).toFixed(2)),
  69. "volume": parseFloat(Mock.Random.float(20, 200).toFixed(2)),
  70. "PM10": parseFloat(Mock.Random.float(0, 300).toFixed(2)),
  71. "pm25": parseFloat(Mock.Random.float(0, 300).toFixed(2)),
  72. "SO2": parseFloat(Mock.Random.float(0, 50).toFixed(2)),
  73. "NO2": parseFloat(Mock.Random.float(0, 50).toFixed(2)),
  74. "CO": parseFloat(Mock.Random.float(0, 50).toFixed(2)),
  75. "area": Mock.Random.integer(0, 20),
  76. "ts": 1596157444170,
  77. }
  78. }

view source code

Note: CLIENT_NUM in the code can be set to a smaller value at the beginning of the test to avoid hardware performance be not capable to handle a more significant number of concurrent clients.

TDengine Database EMQX client num

Execute tests to simulate sending MQTT data

  1. npm install mqtt mockjs --save ---registry=https://registry.npm.taobao.org
  2. node mock.js

TDengine Database EMQX run mock

Verify that EMQX is receiving data

Refresh the EMQX Dashboard rules engine interface to see how many records were received correctly:

TDengine Database EMQX rule matched

Verify that data writing to TDengine

Use the TDengine CLI program to log in and query the appropriate databases and tables to verify that the data is being written to TDengine correctly:

TDengine Database EMQX result in taos

Please refer to the TDengine official documentation for more details on how to use TDengine. EMQX Please refer to the EMQX official documentation for details on how to use EMQX.