MQ库
MQ是利用Redis/mqtt/stomp协议的subscribe与publish模拟的一种订阅/发布的消息队列.
使用方式
在MQ库中有三种对象可供使用者自行导入. 具体的导入方法为:
导入redis协议的消息队列
local mq = require "MQ.redis"
导入mqtt协议的消息队列
local mq = require "MQ.mqtt"
导入stomp协议的消息队列
local mq = require "MQ.stomp"
API介绍
凡是导入了上述库之后, 即可传入指定参数进行初始化.
1. mq:new(opt)
opt是一个table类型的参数, 接受如下传参:
host - 字符串类型, 消息队列的域名或者IP地址.
port - int类型, 消息队列监听的端口.
auth/db - 字符串类型, 仅在redis协议下用作登录认证(没有可以不填写).
username/password - 字符串类型, 仅在stomp/mqtt协议下用作登录认证(没有可以不填写).
vhost - 字符串类型, 仅在使用某些特定消息队列server的时候填写(例如:rabbit).
keepalive - int类型, 仅在使用mqtt的时候用来出发客户端主动发出心跳包的时间.
初始化后不会立刻建立与server连接, 需要与emit
与on
一同使用的时候才会实时建立.
2. mq:on(pattern, func)
订阅消息
pattern - 一个字符串类型的匹配模式. 不同协议mq的实现写法也不一样, 具体写法请参照消息队列的订阅方法.
func - 订阅回调函数. 当订阅了指定的pattern后, 接手到消息将会触发此回调. 当与server断开后将返回一个nil或者字符串类型的错误.
可以同时订阅多个pattern, 并为其注册回调处理函数. 但是不可重复注册相同的pattern.
3. mq:emit(pattern, data)
发布消息
pattern - 一个字符串类型的匹配模式. 不同协议mq的实现写法也不一样, 具体写法请参照消息队列的订阅方法.
data - 一个字符串类型的数据. 用于向指定pattern发送数据, 返回才意味着写入成功或写入失败.
4. mq:start()
启动消息队列
此方法一般用cfadmin作为单独使用的mq消费者时使用, 在websocket内无需使用start方法.
start方法运行后, 将会进行消息队列循环模式.
5. mq:close()
关闭消息队列
无论是仅作为发送还是仅作为订阅(或者两者), 在任何情况下使用完毕都需要调用此方法来释放资源.
使用示例
mq作为整个cf的独立消费者的使用示例:
- local MQ = require "MQ.stomp"
- -- local MQ = require "MQ.redis"
- -- local MQ = require "MQ.mqtt"
- local cf = require "cf"
- require "utils"
- local mq = MQ:new {
- -- host = 'localhost',
- -- port = 61613,
- -- port = 1883,
- -- port = 6379,
- -- vhost = '/exchange',
- -- auth = "admin",
- -- username = "guest",
- -- password = "guest",
- }
- mq:on('/test', function (msg)
- print("收到来自/test的消息.")
- var_dump(msg)
- end)
- mq:on('/admin', function (msg)
- print("收到来自/admin的消息.")
- var_dump(msg)
- end)
- cf.at(0.1, function (args)
- print(mq:emit('/test', '{"code":'..math.random(1, 100)..',"from":"/test"}'))
- print(mq:emit('/admin', '{"code":'..math.random(1, 100)..',"from":"/admin"}'))
- end)
- mq:start()
mq作为聊天服务器的订阅推送使用示例:
- local class = require "class"
- local mq = require "MQ.redis"
- local cf = require "cf"
- local json = require "json"
- local websocket = class("websocket")
- function websocket:ctor(opt)
- self.ws = opt.ws -- websocket对象
- self.send_masked = false -- 掩码(默认为false, 不建议修改或者使用)
- self.max_payload_len = 65535 -- 最大有效载荷长度(默认为65535, 不建议修改或者使用)
- self.timeout = 15 -- 默认为一直等待, 非number类型会导致异常.
- self.count = 0
- self.mq = mq:new {
- host = "localhost",
- port = 6479,
- -- auth = 'admin',
- -- db = 0,
- }
- end
- function websocket:on_open()
- print('on_open')
- self.timer = cf.at(0.01, function ( ... ) -- 定时器
- self.count = self.count + 1
- self.ws:send(tostring(self.count))
- end)
- self.mq:on('/chat', function(msg)
- if not msg then
- return
- end
- self.ws:send(msg.payload)
- end)
- end
- function websocket:on_message(data, typ)
- print('on_message', self.ws, data)
- self.ws:send('welcome')
- -- self.ws:close(data)
- end
- function websocket:on_error(error)
- print('on_error', self.ws, error)
- end
- function websocket:on_close(data)
- print('on_close', self.ws, data)
- if self.timer then -- 清理定时器
- print("清理定时器")
- self.timer:stop()
- self.timer = nil
- end
- if self.mq then
- self.mq:close()
- end
- end
- return websocket
关于订阅后的回调返回值
消费者回调函数将在消息队列推送数据过来时被触发, msg的类型会根据具体协议不同会分为:
id - 用户id
session - 连接
parttern - 数据来源
payload - 数据载荷
等信息, 这些信息一般仅用作做debug时使用. 如果用户有消息区分要求, 应该尽可能的判断数据载体(载荷)并自行区分消息.
协议兼容性测试
mqtt - 在emqx 3.x上测试通过.
redis - 在redis 3.x以上版本测试通过.
stomp - 在rabbitmq 3.x上测试通过.
建议根据实际情况选择对应的协议与server端, 混合使用兼容协议会造成意想不到的问题.
最后
MQ主要是作为消费者与生产者来解决多个cf App之间无法进行数据通信的问题, 一般可以用在Websocket/MQ消费者节点.
MQ的发布(emit)/订阅(on)是同步非阻塞的, 返回才意味着数据读写完毕. 异步的读写设计将会导致消息队列过多的消息积压.
MQ没有为用户做断线重连、数据重发的操作, 用户应该从架构设计角度来解决这个问题(冗余、故障转移).