MQTT协议

MQTTMQTT - 图1是机器对机器(M2M)/“物联网”连接协议。

它被设计为一种非常轻量级的发布/订阅消息传递。

对于与需要较小代码占用和/或网络带宽非常宝贵的远程位置的连接很有用。

IoTDB支持MQTT v3.1(OASIS标准)协议。 IoTDB服务器包括内置的MQTT服务,该服务允许远程设备将消息直接发送到IoTDB服务器。

MQTT - 图2

内置MQTT服务

内置的MQTT服务提供了通过MQTT直接连接到IoTDB的能力。 它侦听来自MQTT客户端的发布消息,然后立即将数据写入存储。 MQTT主题与IoTDB时间序列相对应。 消息有效载荷可以由Java SPI加载的PayloadFormatter格式化为事件,默认实现为JSONPayloadFormatter 默认的json格式化程序支持两种json格式,以下是MQTT消息有效负载示例:

  1. {
  2. "device":"root.sg.d1",
  3. "timestamp":1586076045524,
  4. "measurements":["s1","s2"],
  5. "values":[0.530635,0.530635]
  6. }

或者

  1. {
  2. "device":"root.sg.d1",
  3. "timestamps":[1586076045524,1586076065526],
  4. "measurements":["s1","s2"],
  5. "values":[[0.530635,0.530635], [0.530655,0.530695]]
  6. }

MQTT - 图3

MQTT配置

默认情况下,IoTDB MQTT服务从${IOTDB_HOME}/${IOTDB_CONF}/iotdbengine.properties加载配置。

配置如下:

名称描述默认
enable_mqtt_service是否启用mqtt服务false
mqtt_hostmqtt服务绑定主机0.0.0.0
mqtt_portmqtt服务绑定端口1883
mqtt_handler_pool_size处理mqtt消息的处理程序池大小1
mqtt_payload_formattermqtt消息有效负载格式化程序json
mqtt_max_message_sizemqtt消息最大长度(字节)1048576

例子

以下是mqtt客户端将消息发送到IoTDB服务器的示例。

  1. MQTT mqtt = new MQTT();
  2. mqtt.setHost("127.0.0.1", 1883);
  3. mqtt.setUserName("root");
  4. mqtt.setPassword("root");
  5. BlockingConnection connection = mqtt.blockingConnection();
  6. connection.connect();
  7. Random random = new Random();
  8. for (int i = 0; i < 10; i++) {
  9. String payload = String.format("{\n" +
  10. "\"device\":\"root.sg.d1\",\n" +
  11. "\"timestamp\":%d,\n" +
  12. "\"measurements\":[\"s1\"],\n" +
  13. "\"values\":[%f]\n" +
  14. "}", System.currentTimeMillis(), random.nextDouble());
  15. connection.publish("root.sg.d1.s1", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
  16. }
  17. connection.disconnect();
  18. }