MQTT 示例程序

示例代码讲解

下面讲解 RT-Thread 提供的 MQTT 示例代码,测试服务器使用 Eclipse 的测试服务器,地址 iot.eclipse.org ,端口 1883,MQTT 功能示例代码如下:

  1. #include <stdlib.h>
  2. #include <string.h>
  3. #include <stdint.h>
  4.  
  5. #include <rtthread.h>
  6.  
  7. #define DBG_ENABLE
  8. #define DBG_SECTION_NAME "[MQTT] "
  9. #define DBG_LEVEL DBG_LOG
  10. #define DBG_COLOR
  11. #include <rtdbg.h>
  12.  
  13. #include "paho_mqtt.h"
  14. #define MQTT_URI "tcp://iot.eclipse.org:1883" // 配置测试服务器地址
  15. #define MQTT_USERNAME "admin"
  16. #define MQTT_PASSWORD "admin"
  17. #define MQTT_SUBTOPIC "/mqtt/test" // 设置订阅主题
  18. #define MQTT_PUBTOPIC "/mqtt/test" // 设置推送主题
  19. #define MQTT_WILLMSG "Goodbye!" // 设置遗言消息
  20.  
  21. /* 定义 MQTT 客户端环境结构体 */
  22. static MQTTClient client;
  23.  
  24. /* MQTT 订阅事件自定义回调函数 */
  25. static void mqtt_sub_callback(MQTTClient *c, MessageData *msg_data)
  26. {
  27. *((char *)msg_data->message->payload + msg_data->message->payloadlen) = '\0';
  28. LOG_D("mqtt sub callback: %.*s %.*s",
  29. msg_data->topicName->lenstring.len,
  30. msg_data->topicName->lenstring.data,
  31. msg_data->message->payloadlen,
  32. (char *)msg_data->message->payload);
  33.  
  34. return;
  35. }
  36.  
  37. /* MQTT 订阅事件默认回调函数 */
  38. static void mqtt_sub_default_callback(MQTTClient *c, MessageData *msg_data)
  39. {
  40. *((char *)msg_data->message->payload + msg_data->message->payloadlen) = '\0';
  41. LOG_D("mqtt sub default callback: %.*s %.*s",
  42. msg_data->topicName->lenstring.len,
  43. msg_data->topicName->lenstring.data,
  44. msg_data->message->payloadlen,
  45. (char *)msg_data->message->payload);
  46. return;
  47. }
  48.  
  49. /* MQTT 连接事件回调函数 */
  50. static void mqtt_connect_callback(MQTTClient *c)
  51. {
  52. LOG_D("inter mqtt_connect_callback!");
  53. }
  54.  
  55. /* MQTT 上线事件回调函数 */
  56. static void mqtt_online_callback(MQTTClient *c)
  57. {
  58. LOG_D("inter mqtt_online_callback!");
  59. }
  60.  
  61. /* MQTT 下线事件回调函数 */
  62. static void mqtt_offline_callback(MQTTClient *c)
  63. {
  64. LOG_D("inter mqtt_offline_callback!");
  65. }
  66.  
  67. /**
  68. * 这个函数创建并配置 MQTT 客户端。
  69. *
  70. * @param void
  71. *
  72. * @return none
  73. */
  74. static void mq_start(void)
  75. {
  76. /* 使用 MQTTPacket_connectData_initializer 初始化 condata 参数 */
  77. MQTTPacket_connectData condata = MQTTPacket_connectData_initializer;
  78. static char cid[20] = { 0 };
  79.  
  80. static int is_started = 0;
  81. if (is_started)
  82. {
  83. return;
  84. }
  85. /* 配置 MQTT 结构体内容参数 */
  86. {
  87. client.uri = MQTT_URI;
  88.  
  89. /* 产生随机的客户端 ID */
  90. rt_snprintf(cid, sizeof(cid), "rtthread%d", rt_tick_get());
  91.  
  92. /* 配置连接参数 */
  93. memcpy(&client.condata, &condata, sizeof(condata));
  94. client.condata.clientID.cstring = cid;
  95. client.condata.keepAliveInterval = 60;
  96. client.condata.cleansession = 1;
  97. client.condata.username.cstring = MQTT_USERNAME;
  98. client.condata.password.cstring = MQTT_PASSWORD;
  99.  
  100. /* 配置 MQTT 遗言参数 */
  101. client.condata.willFlag = 1;
  102. client.condata.will.qos = 1;
  103. client.condata.will.retained = 0;
  104. client.condata.will.topicName.cstring = MQTT_PUBTOPIC;
  105. client.condata.will.message.cstring = MQTT_WILLMSG;
  106.  
  107. /* 分配缓冲区 */
  108. client.buf_size = client.readbuf_size = 1024;
  109. client.buf = malloc(client.buf_size);
  110. client.readbuf = malloc(client.readbuf_size);
  111. if (!(client.buf && client.readbuf))
  112. {
  113. LOG_E("no memory for MQTT client buffer!");
  114. goto _exit;
  115. }
  116.  
  117. /* 设置事件回调函数 */
  118. client.connect_callback = mqtt_connect_callback;
  119. client.online_callback = mqtt_online_callback;
  120. client.offline_callback = mqtt_offline_callback;
  121.  
  122. /* 设置订阅表和事件回调函数*/
  123. client.messageHandlers[0].topicFilter = MQTT_SUBTOPIC;
  124. client.messageHandlers[0].callback = mqtt_sub_callback;
  125. client.messageHandlers[0].qos = QOS1;
  126.  
  127. /* 设置默认的订阅主题*/
  128. client.defaultMessageHandler = mqtt_sub_default_callback;
  129. }
  130.  
  131. /* 运行 MQTT 客户端 */
  132. paho_mqtt_start(&client);
  133. is_started = 1;
  134.  
  135. _exit:
  136. return;
  137. }
  138.  
  139. /**
  140. * 这个函数推送消息给特定的 MQTT 主题。
  141. *
  142. * @param send_str publish message
  143. *
  144. * @return none
  145. */
  146. static void mq_publish(const char *send_str)
  147. {
  148. MQTTMessage message;
  149. const char *msg_str = send_str;
  150. const char *topic = MQTT_PUBTOPIC;
  151. message.qos = QOS1; //消息等级
  152. message.retained = 0;
  153. message.payload = (void *)msg_str;
  154. message.payloadlen = strlen(message.payload);
  155.  
  156. MQTTPublish(&client, topic, &message);
  157.  
  158. return;
  159. }
  160.  
  161. #ifdef RT_USING_FINSH
  162. #include <finsh.h>
  163. FINSH_FUNCTION_EXPORT(mq_start, startup mqtt client);
  164. FINSH_FUNCTION_EXPORT(mq_publish, publish mqtt msg);
  165. #ifdef FINSH_USING_MSH
  166. MSH_CMD_EXPORT(mq_start, startup mqtt client);
  167.  
  168. int mq_pub(int argc, char **argv)
  169. {
  170. if (argc != 2)
  171. {
  172. rt_kprintf("More than two input parameters err!!\n");
  173. return 0;
  174. }
  175. mq_publish(argv[1]);
  176.  
  177. return 0;
  178. }
  179. MSH_CMD_EXPORT(mq_pub, publish mqtt msg);
  180. #endif /* FINSH_USING_MSH */
  181. #endif /* RT_USING_FINSH */

运行示例

在 msh 中运行上述功能示例代码,可以实现向服务器订阅主题和向特定主题推送消息的功能,功能示例代码运行效果如下:

  • 启动 MQTT 客户端,连接代理服务器并订阅主题:
  1. msh />mq_start /* 启动 MQTT 客户端连接 Eclipse 服务器 */
  2. inter mqtt_connect_callback! /* 服务器连接成功,调用连接回调函数打印服务器信息 */
  3. ipv4 address port: 1883
  4. [MQTT] HOST = 'iot.eclipse.org'
  5. msh />[MQTT] Subscribe #0 /mqtt/test OK! /* 订阅主题 /mqtt/test 成功 */
  6. inter mqtt_online_callback! /* MQTT 上线成功,调用上线回调函数 */
  7. msh />
  • 作为发布者向指定主题发布消息:
  1. msh />mq_pub hello-rtthread /* 向指定主题发送 hello-rtthread 消息 */
  2. msh />mqtt sub callback: /mqtt/test hello-rtthread /* 收到消息,执行回调函数 */
  3. msh />

原文: https://www.rt-thread.org/document/site/submodules/paho-mqtt/docs/samples/