MQTT 数据上送应用示例

代码:

  1. local class = require 'middleclass'
  2. local mosq = require 'mosquitto'
  3. local cjson = require 'cjson.safe'
  4. local sub_topics = {
  5. "app/#",
  6. "sys/#",
  7. "output/#",
  8. "command/#",
  9. }
  10. local mqtt_reconnect_timeout = 100
  11. --- 注册对象(请尽量使用唯一的标识字符串)
  12. local app = class("BAIDU_IOT_CLOUD")
  13. --- 设定应用最小运行接口版本(目前版本为1,为了以后的接口兼容性)
  14. app.API_VER = 1
  15. ---
  16. -- 应用对象初始化函数
  17. -- @param name: 应用本地安装名称。 modbus_com_1
  18. -- @param sys: 系统sys接口对象。参考API文档中的sys接口说明
  19. -- @param conf: 应用配置参数。由安装配置中的json数据转换出来的数据对象
  20. function app:initialize(name, sys, conf)
  21. self._name = name
  22. self._sys = sys
  23. self._conf = conf
  24. --- 获取数据接口
  25. self._api = sys:data_api()
  26. --- 获取日志接口
  27. self._log = sys:logger()
  28. self._nodes = {}
  29. self._mqtt_id = conf.mqtt_it or sys:id() -- using system iot id
  30. self._username = conf.username or "symlinkdemo/demo"
  31. self._password = conf.password or "q1Z/lxXqz2W33NZir6MW13RpCPAFELSiirVvGDfaaQw="
  32. self._mqtt_host = conf.server or "symlinkdemo.mqtt.iot.bj.baidubce.com"
  33. self._mqtt_port = conf.port or "1883"
  34. self._enable_tls = conf.enable_tls or false
  35. self._close_connection = false
  36. end
  37. -- @param app: 应用实例对象
  38. local function create_handler(app)
  39. local api = app._api
  40. local server = app._server
  41. local log = app._log
  42. local self = app
  43. return {
  44. --- 处理设备对象添加消息
  45. on_add_device = function(app, sn, props)
  46. return self:fire_devices(1000)
  47. end,
  48. --- 处理设备对象删除消息
  49. on_del_device = function(app, sn)
  50. return self:fire_devices(1000)
  51. end,
  52. --- 处理设备对象修改消息
  53. on_mod_device = function(app, sn, props)
  54. return self:fire_devices()
  55. end,
  56. --- 处理设备输入项数值变更消息
  57. on_input = function(app, sn, input, prop, value, timestamp, quality)
  58. return self:handle_input(app, sn, input, prop, value, timestamp, quality)
  59. end,
  60. on_event = function(app, sn, level, data, timestamp)
  61. return self:handle_event(app, sn, level, data, timestamp)
  62. end,
  63. on_stat = function(app, sn, stat, prop, value, timestamp)
  64. return self:handle_stat(app, sn, stat, prop, value, timestamp)
  65. end,
  66. }
  67. end
  68. function app:start_reconnect()
  69. self._mqtt_client = nil
  70. self._sys:timeout(mqtt_reconnect_timeout, function() self:connect_proc() end)
  71. mqtt_reconnect_timeout = mqtt_reconnect_timeout * 2
  72. if mqtt_reconnect_timeout > 10 * 60 * 100 then
  73. mqtt_reconnect_timeout = 100
  74. end
  75. end
  76. function app:handle_input(app, sn, input, prop, value, timestamp, quality)
  77. local msg = {
  78. app = app,
  79. sn = sn,
  80. input = input,
  81. prop = prop,
  82. value = value,
  83. timestamp = timestamp,
  84. quality = quality
  85. }
  86. if self._mqtt_client then
  87. self._mqtt_client:publish("/data", cjson.encode(msg), 1, false)
  88. end
  89. end
  90. function app:handle_event(app, sn, level, data, timestamp)
  91. local msg = {
  92. app = app,
  93. sn = sn,
  94. level = level,
  95. data = data,
  96. timestamp = timestamp,
  97. }
  98. if self._mqtt_client then
  99. self._mqtt_client:publish("/event", cjson.encode(msg), 1, false)
  100. end
  101. end
  102. function app:handle_stat(app, sn, stat, prop, value, timestamp)
  103. local msg = {
  104. app = app,
  105. sn = sn,
  106. stat = stat,
  107. prop = prop,
  108. value = value,
  109. timestamp = timestamp,
  110. }
  111. if self._mqtt_client then
  112. self._mqtt_client:publish("/statistics", cjson.encode(msg), 1, false)
  113. end
  114. end
  115. function app:fire_devices(timeout)
  116. local timeout = timeout or 100
  117. if self._fire_device_timer then
  118. return
  119. end
  120. self._fire_device_timer = function()
  121. local devs = self._api:list_devices() or {}
  122. if self._mqtt_client then
  123. self._mqtt_client:publish("/devices", cjson.encode(devs), 1, true)
  124. end
  125. end
  126. self._sys:timeout(timeout, function()
  127. if self._fire_device_timer then
  128. self._fire_device_timer()
  129. self._fire_device_timer = nil
  130. end
  131. end)
  132. end
  133. function app:connect_proc()
  134. local log = self._log
  135. local sys = self._sys
  136. local mqtt_id = self._mqtt_id
  137. local mqtt_host = self._mqtt_host
  138. local mqtt_port = self._mqtt_port
  139. local clean_session = self._clean_session or true
  140. local username = self._username
  141. local password = self._password
  142. -- 创建MQTT客户端实例
  143. log:debug("Baidu Cloud MQTT", mqtt_id, mqtt_host, mqtt_port, username, password)
  144. local client = assert(mosq.new(mqtt_id, clean_session))
  145. client:version_set(mosq.PROTOCOL_V311)
  146. client:login_set(username, password)
  147. if self._enable_tls then
  148. client:tls_set(sys:app_dir().."/root_cert.pem")
  149. end
  150. -- 注册回调函数
  151. client.ON_CONNECT = function(success, rc, msg)
  152. if success then
  153. log:notice("ON_CONNECT", success, rc, msg)
  154. client:publish("/status", cjson.encode({device=mqtt_id, status="ONLINE"}), 1, true)
  155. self._mqtt_client = client
  156. self._mqtt_client_last = sys:time()
  157. for _, v in ipairs(sub_topics) do
  158. client:subscribe("/"..v, 1)
  159. end
  160. --client:subscribe("+/#", 1)
  161. --
  162. mqtt_reconnect_timeout = 100
  163. self:fire_devices(1000)
  164. else
  165. log:warning("ON_CONNECT", success, rc, msg)
  166. self:start_reconnect()
  167. end
  168. end
  169. client.ON_DISCONNECT = function(success, rc, msg)
  170. log:warning("ON_DISCONNECT", success, rc, msg)
  171. if self._mqtt_client then
  172. self:start_reconnect()
  173. end
  174. end
  175. client.ON_LOG = function(...)
  176. --print(...)
  177. end
  178. client.ON_MESSAGE = function(...)
  179. print(...)
  180. end
  181. client:will_set("/status", cjson.encode({device=mqtt_id, status="OFFLINE"}), 1, true)
  182. self._close_connection = false
  183. local r, err
  184. local ts = 1
  185. while not r do
  186. r, err = client:connect(mqtt_host, mqtt_port, mqtt_keepalive)
  187. if not r then
  188. log:error(string.format("Connect to broker %s:%d failed!", mqtt_host, mqtt_port), err)
  189. sys:sleep(ts * 500)
  190. ts = ts * 2
  191. if ts >= 64 then
  192. client:destroy()
  193. sys:timeout(100, function() self:connect_proc() end)
  194. -- We meet bug that if client reconnect to broker with lots of failures, it's socket will be broken.
  195. -- So we will re-create the client
  196. return
  197. end
  198. end
  199. end
  200. self._mqtt_client = client
  201. --- Worker thread
  202. while self._mqtt_client and not self._close_connection do
  203. sys:sleep(0)
  204. if self._mqtt_client then
  205. self._mqtt_client:loop(50, 1)
  206. else
  207. sys:sleep(50)
  208. end
  209. end
  210. if self._mqtt_client then
  211. self._mqtt_client:disconnect()
  212. self._mqtt_client = nil
  213. log:notice("Cloud Connection Closed!")
  214. end
  215. end
  216. function app:disconnect()
  217. if not self._mqtt_client then
  218. return
  219. end
  220. self._log:debug("Cloud Connection Closing!")
  221. self._close_connection = true
  222. while self._mqtt_client do
  223. self._sys:sleep(10)
  224. end
  225. return true
  226. end
  227. --- 应用启动函数
  228. function app:start()
  229. --- 设定回调处理对象
  230. self._handler = create_handler(self)
  231. self._api:set_handler(self._handler, true)
  232. self._sys:fork(function()
  233. self:connect_proc()
  234. end)
  235. self._log:debug("Baidu Cloud connector started!")
  236. return true
  237. end
  238. --- 应用退出函数
  239. function app:close(reason)
  240. mosq.cleanup()
  241. end
  242. --- 应用运行入口
  243. function app:run(tms)
  244. return 1000 * 10 -- 10 seconds
  245. end
  246. --- 返回应用对象
  247. return app