MQTT Protocol
MQTT (opens new window) is a machine-to-machine (M2M)/“Internet of Things” connectivity protocol. It was designed as an extremely lightweight publish/subscribe messaging transport. It is useful for connections with remote locations where a small code footprint is required and/or network bandwidth is at a premium.
IoTDB supports the MQTT v3.1(an OASIS Standard) protocol. IoTDB server includes a built-in MQTT service that allows remote devices send messages into IoTDB server directly.
Built-in MQTT Service
The Built-in MQTT Service provide the ability of direct connection to IoTDB through MQTT. It listen the publish messages from MQTT clients and then write the data into storage immediately. The MQTT topic corresponds to IoTDB timeseries. The messages payload can be format to events by PayloadFormatter
which loaded by java SPI, and the default implementation is JSONPayloadFormatter
. The default json
formatter support two json format, and the following is an MQTT message payload example:
{
"device":"root.sg.d1",
"timestamp":1586076045524,
"measurements":["s1","s2"],
"values":[0.530635,0.530635]
}
or
{
"device":"root.sg.d1",
"timestamps":[1586076045524,1586076065526],
"measurements":["s1","s2"],
"values":[[0.530635,0.530635], [0.530655,0.530695]]
}
MQTT Configurations
The IoTDB MQTT service load configurations from ${IOTDB_HOME}/${IOTDB_CONF}/iotdb-engine.properties
by default.
Configurations are as follows:
NAME | DESCRIPTION | DEFAULT |
---|---|---|
enable_mqtt_service | whether to enable the mqtt service | false |
mqtt_host | the mqtt service binding host | 0.0.0.0 |
mqtt_port | the mqtt service binding port | 1883 |
mqtt_handler_pool_size | the handler pool size for handing the mqtt messages | 1 |
mqtt_payload_formatter | the mqtt message payload formatter | json |
mqtt_max_message_size | the max mqtt message size in byte | 1048576 |
Examples
The following is an example which a mqtt client send messages to IoTDB server.
MQTT mqtt = new MQTT();
mqtt.setHost("127.0.0.1", 1883);
mqtt.setUserName("root");
mqtt.setPassword("root");
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
Random random = new Random();
for (int i = 0; i < 10; i++) {
String payload = String.format("{\n" +
"\"device\":\"root.sg.d1\",\n" +
"\"timestamp\":%d,\n" +
"\"measurements\":[\"s1\"],\n" +
"\"values\":[%f]\n" +
"}", System.currentTimeMillis(), random.nextDouble());
connection.publish("root.sg.d1.s1", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
}
connection.disconnect();
}