MQTT 协议

MQTTMQTT - 图1open in new window 是机器对机器(M2M)/“物联网”连接协议。

它被设计为一种非常轻量级的发布/订阅消息传递。

对于与需要较小代码占用和/或网络带宽非常宝贵的远程位置的连接很有用。

IoTDB 支持 MQTT v3.1(OASIS 标准)协议。 IoTDB 服务器包括内置的 MQTT 服务,该服务允许远程设备将消息直接发送到 IoTDB 服务器。

MQTT - 图2

内置 MQTT 服务

内置的 MQTT 服务提供了通过 MQTT 直接连接到 IoTDB 的能力。 它侦听来自 MQTT 客户端的发布消息,然后立即将数据写入存储。 MQTT 主题与 IoTDB 时间序列相对应。 消息有效载荷可以由 Java SPI 加载的PayloadFormatter格式化为事件,默认实现为JSONPayloadFormatter 默认的json格式化程序支持两种 json 格式以及由他们组成的json数组,以下是 MQTT 消息有效负载示例:

  1. {
  2. "device":"root.sg.d1",
  3. "timestamp":1586076045524,
  4. "measurements":["s1","s2"],
  5. "values":[0.530635,0.530635]
  6. }

或者

  1. {
  2. "device":"root.sg.d1",
  3. "timestamps":[1586076045524,1586076065526],
  4. "measurements":["s1","s2"],
  5. "values":[[0.530635,0.530635], [0.530655,0.530695]]
  6. }

或者以上两者的JSON数组形式。

MQTT - 图3

MQTT 配置

默认情况下,IoTDB MQTT 服务从${IOTDB_HOME}/${IOTDB_CONF}/iotdb-datanode.properties加载配置。

配置如下:

名称描述默认
enable_mqtt_service是否启用 mqtt 服务false
mqtt_hostmqtt 服务绑定主机127.0.0.1
mqtt_portmqtt 服务绑定端口1883
mqtt_handler_pool_size处理 mqtt 消息的处理程序池大小1
mqtt_payload_formattermqtt 消息有效负载格式化程序json
mqtt_max_message_sizemqtt 消息最大长度(字节)1048576

示例代码 以下是 mqtt 客户端将消息发送到 IoTDB 服务器的示例。

  1. MQTT mqtt = new MQTT();
  2. mqtt.setHost("127.0.0.1", 1883);
  3. mqtt.setUserName("root");
  4. mqtt.setPassword("root");
  5. BlockingConnection connection = mqtt.blockingConnection();
  6. connection.connect();
  7. Random random = new Random();
  8. for (int i = 0; i < 10; i++) {
  9. String payload = String.format("{\n" +
  10. "\"device\":\"root.sg.d1\",\n" +
  11. "\"timestamp\":%d,\n" +
  12. "\"measurements\":[\"s1\"],\n" +
  13. "\"values\":[%f]\n" +
  14. "}", System.currentTimeMillis(), random.nextDouble());
  15. connection.publish("root.sg.d1.s1", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
  16. }
  17. connection.disconnect();

自定义 MQTT 消息格式

事实上可以通过简单编程来实现 MQTT 消息的格式自定义。 可以在源码的 example/mqtt-customize 项目中找到一个简单示例。

步骤:

  • 创建一个 Java 项目,增加如下依赖
  1. <dependency>
  2. <groupId>org.apache.iotdb</groupId>
  3. <artifactId>iotdb-server</artifactId>
  4. <version>${project.version}</version>
  5. </dependency>
  • 创建一个实现类,实现接口 org.apache.iotdb.db.protocol.mqtt.PayloadFormatter
  1. package org.apache.iotdb.mqtt.server;
  2. import io.netty.buffer.ByteBuf;
  3. import org.apache.iotdb.db.protocol.mqtt.Message;
  4. import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter;
  5. import java.nio.charset.StandardCharsets;
  6. import java.util.ArrayList;
  7. import java.util.Arrays;
  8. import java.util.List;
  9. public class CustomizedJsonPayloadFormatter implements PayloadFormatter {
  10. @Override
  11. public List<Message> format(ByteBuf payload) {
  12. // Suppose the payload is a json format
  13. if (payload == null) {
  14. return null;
  15. }
  16. String json = payload.toString(StandardCharsets.UTF_8);
  17. // parse data from the json and generate Messages and put them into List<Meesage> ret
  18. List<Message> ret = new ArrayList<>();
  19. // this is just an example, so we just generate some Messages directly
  20. for (int i = 0; i < 2; i++) {
  21. long ts = i;
  22. Message message = new Message();
  23. message.setDevice("d" + i);
  24. message.setTimestamp(ts);
  25. message.setMeasurements(Arrays.asList("s1", "s2"));
  26. message.setValues(Arrays.asList("4.0" + i, "5.0" + i));
  27. ret.add(message);
  28. }
  29. return ret;
  30. }
  31. @Override
  32. public String getName() {
  33. // set the value of mqtt_payload_formatter in iotdb-datanode.properties as the following string:
  34. return "CustomizedJson";
  35. }
  36. }
  • 修改项目中的 src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter 文件: 将示例中的文件内容清除,并将刚才的实现类的全名(包名.类名)写入文件中。注意,这个文件中只有一行。 在本例中,文件内容为: org.apache.iotdb.mqtt.server.CustomizedJsonPayloadFormatter
  • 编译项目生成一个 jar 包: mvn package -DskipTests

在 IoTDB 服务端:

  • 创建 ${IOTDB_HOME}/ext/mqtt/ 文件夹, 将刚才的 jar 包放入此文件夹。
  • 打开 MQTT 服务参数. (enable_mqtt_service=true in conf/iotdb-datanode.properties)
  • 用刚才的实现类中的 getName() 方法的返回值 设置为 conf/iotdb-datanode.propertiesmqtt_payload_formatter 的值, , 在本例中,为 CustomizedJson
  • 启动 IoTDB
  • 搞定.

More: MQTT 协议的消息不限于 json,你还可以用任意二进制。通过如下函数获得: payload.forEachByte() or payload.array