MQ库

MQ是利用Redis/mqtt/stomp协议的subscribe与publish模拟的一种订阅/发布的消息队列.

使用方式

在MQ库中有三种对象可供使用者自行导入. 具体的导入方法为:

  • 导入redis协议的消息队列local mq = require "MQ.redis"

  • 导入mqtt协议的消息队列local mq = require "MQ.mqtt"

  • 导入stomp协议的消息队列local mq = require "MQ.stomp"

API介绍

凡是导入了上述库之后, 即可传入指定参数进行初始化.

1. mq:new(opt)

opt是一个table类型的参数, 接受如下传参:

  • host - 字符串类型, 消息队列的域名或者IP地址.

  • port - int类型, 消息队列监听的端口.

  • auth/db - 字符串类型, 仅在redis协议下用作登录认证(没有可以不填写).

  • username/password - 字符串类型, 仅在stomp/mqtt协议下用作登录认证(没有可以不填写).

  • vhost - 字符串类型, 仅在使用某些特定消息队列server的时候填写(例如:rabbit).

  • keepalive - int类型, 仅在使用mqtt的时候用来出发客户端主动发出心跳包的时间.

初始化后不会立刻建立与server连接, 需要与emiton一同使用的时候才会实时建立.

2. mq:on(pattern, func)

订阅消息

pattern - 一个字符串类型的匹配模式. 不同协议mq的实现写法也不一样, 具体写法请参照消息队列的订阅方法.

func - 订阅回调函数. 当订阅了指定的pattern后, 接手到消息将会触发此回调. 当与server断开后将返回一个nil或者字符串类型的错误.

可以同时订阅多个pattern, 并为其注册回调处理函数. 但是不可重复注册相同的pattern.

3. mq:emit(pattern, data)

发布消息

pattern - 一个字符串类型的匹配模式. 不同协议mq的实现写法也不一样, 具体写法请参照消息队列的订阅方法.

data - 一个字符串类型的数据. 用于向指定pattern发送数据, 返回才意味着写入成功或写入失败.

4. mq:start()

启动消息队列

此方法一般用cfadmin作为单独使用的mq消费者时使用, 在websocket内无需使用start方法.

start方法运行后, 将会进行消息队列循环模式.

5. mq:close()

关闭消息队列

无论是仅作为发送还是仅作为订阅(或者两者), 在任何情况下使用完毕都需要调用此方法来释放资源.

使用示例

mq作为整个cf的独立消费者的使用示例:

  1. local MQ = require "MQ.stomp"
  2. -- local MQ = require "MQ.redis"
  3. -- local MQ = require "MQ.mqtt"
  4.  
  5. local cf = require "cf"
  6. require "utils"
  7.  
  8. local mq = MQ:new {
  9. -- host = 'localhost',
  10. -- port = 61613,
  11. -- port = 1883,
  12. -- port = 6379,
  13. -- vhost = '/exchange',
  14. -- auth = "admin",
  15. -- username = "guest",
  16. -- password = "guest",
  17. }
  18.  
  19. mq:on('/test', function (msg)
  20. print("收到来自/test的消息.")
  21. var_dump(msg)
  22. end)
  23.  
  24. mq:on('/admin', function (msg)
  25. print("收到来自/admin的消息.")
  26. var_dump(msg)
  27. end)
  28.  
  29. cf.at(0.1, function (args)
  30. print(mq:emit('/test', '{"code":'..math.random(1, 100)..',"from":"/test"}'))
  31. print(mq:emit('/admin', '{"code":'..math.random(1, 100)..',"from":"/admin"}'))
  32. end)
  33.  
  34. mq:start()

mq作为聊天服务器的订阅推送使用示例:

  1. local class = require "class"
  2. local mq = require "MQ.redis"
  3. local cf = require "cf"
  4. local json = require "json"
  5. local websocket = class("websocket")
  6.  
  7. function websocket:ctor(opt)
  8. self.ws = opt.ws -- websocket对象
  9. self.send_masked = false -- 掩码(默认为false, 不建议修改或者使用)
  10. self.max_payload_len = 65535 -- 最大有效载荷长度(默认为65535, 不建议修改或者使用)
  11. self.timeout = 15 -- 默认为一直等待, number类型会导致异常.
  12. self.count = 0
  13. self.mq = mq:new {
  14. host = "localhost",
  15. port = 6479,
  16. -- auth = 'admin',
  17. -- db = 0,
  18. }
  19. end
  20.  
  21. function websocket:on_open()
  22. print('on_open')
  23. self.timer = cf.at(0.01, function ( ... ) -- 定时器
  24. self.count = self.count + 1
  25. self.ws:send(tostring(self.count))
  26. end)
  27. self.mq:on('/chat', function(msg)
  28. if not msg then
  29. return
  30. end
  31. self.ws:send(msg.payload)
  32. end)
  33. end
  34.  
  35. function websocket:on_message(data, typ)
  36. print('on_message', self.ws, data)
  37. self.ws:send('welcome')
  38. -- self.ws:close(data)
  39. end
  40.  
  41. function websocket:on_error(error)
  42. print('on_error', self.ws, error)
  43. end
  44.  
  45. function websocket:on_close(data)
  46. print('on_close', self.ws, data)
  47. if self.timer then -- 清理定时器
  48. print("清理定时器")
  49. self.timer:stop()
  50. self.timer = nil
  51. end
  52. if self.mq then
  53. self.mq:close()
  54. end
  55. end
  56.  
  57. return websocket

关于订阅后的回调返回值

消费者回调函数将在消息队列推送数据过来时被触发, msg的类型会根据具体协议不同会分为:

  • id - 用户id

  • session - 连接

  • parttern - 数据来源

  • payload - 数据载荷

等信息, 这些信息一般仅用作做debug时使用. 如果用户有消息区分要求, 应该尽可能的判断数据载体(载荷)并自行区分消息.

协议兼容性测试

mqtt - 在emqx 3.x上测试通过.

redis - 在redis 3.x以上版本测试通过.

stomp - 在rabbitmq 3.x上测试通过.

建议根据实际情况选择对应的协议与server端, 混合使用兼容协议会造成意想不到的问题.

最后

  • MQ主要是作为消费者与生产者来解决多个cf App之间无法进行数据通信的问题, 一般可以用在Websocket/MQ消费者节点.

  • MQ的发布(emit)/订阅(on)是同步非阻塞的, 返回才意味着数据读写完毕. 异步的读写设计将会导致消息队列过多的消息积压.

  • MQ没有为用户做断线重连、数据重发的操作, 用户应该从架构设计角度来解决这个问题(冗余、故障转移).