Redis 接口的二次封装(发布订阅)

其实这一小节完全可以放到上一个小节,只是这里用了完全不同的玩法,所以我还是决定单拿出来分享一下这方面的小细节。

上一小节有关订阅部分的代码,请看:

  1. function _M.subscribe( self, channel )
  2. local redis, err = redis_c:new()
  3. if not redis then
  4. return nil, err
  5. end
  6. local ok, err = self:connect_mod(redis)
  7. if not ok or err then
  8. return nil, err
  9. end
  10. local res, err = redis:subscribe(channel)
  11. if not res then
  12. return nil, err
  13. end
  14. res, err = redis:read_reply()
  15. if not res then
  16. return nil, err
  17. end
  18. redis:unsubscribe(channel)
  19. self.set_keepalive_mod(redis)
  20. return res, err
  21. end

其实这里的实现是有问题的,各位看官,你能发现这段代码的问题么?给个提示,在高并发订阅场景下,极有可能存在漏掉部分订阅信息。原因在于每次订阅到内容后,都会把 Redis 对象进行释放,处理完订阅信息后再次去连接 Redis,在这个时间差里面,很可能有消息已经漏掉了。

通过下面的代码可以解决这个问题:

  1. function _M.subscribe( self, channel )
  2. local redis, err = redis_c:new()
  3. if not redis then
  4. return nil, err
  5. end
  6. local ok, err = self:connect_mod(redis)
  7. if not ok or err then
  8. return nil, err
  9. end
  10. local res, err = redis:subscribe(channel)
  11. if not res then
  12. return nil, err
  13. end
  14. -- 封装成一个函数,开始
  15. local function do_read_func ( do_read )
  16. if do_read == nil or do_read == true then
  17. res, err = redis:read_reply()
  18. if not res then
  19. return nil, err
  20. end
  21. return res
  22. end
  23. redis:unsubscribe(channel)
  24. self.set_keepalive_mod(redis)
  25. return
  26. end
  27. -- 结束
  28. return do_read_func -- 返回上面封装的函数
  29. end

调用示例代码:

  1. local red = redis:new({timeout=1000})
  2. local func = red:subscribe( "channel" )
  3. if not func then
  4. return nil
  5. end
  6. while true do
  7. local res, err = func()
  8. if err then
  9. func(false)
  10. end
  11. ... ...
  12. end
  13. return cbfunc

另一个潜在的问题是,调用了 unsubscribe 之后,Redis 对象里面有可能还遗留没被读取的数据。在这种情况下,无法直接通过 set_keepalive_mod 复用连接。什么时候会发生这样的情况呢?

当 Redis 对象处于 subscribe 状态时,Redis 会给它推送订阅的消息,然后我们通过 read_reply 把消息读出来。调用 unsubscribe 的时候,只是退订了对应的频道,并不会把当前接收到的数据清空。如果要想复用该连接,我们就需要保证清空当前读取到的数据,保证它是干净的。就像这样:

  1. local res, err = red:unsubscribe("ch")
  2. if not res then
  3. ngx.log(ngx.ERR, err)
  4. return
  5. else
  6. -- redis 推送的消息格式,可能是
  7. -- {"message", ...}
  8. -- {"unsubscribe", $channel_name, $remain_channel_num}
  9. -- 如果返回的是前者,说明我们还在读取 Redis 推送过的数据
  10. if res[1] ~= "unsubscribe" then
  11. repeat
  12. -- 需要抽空已经接收到的消息
  13. res, err = red:read_reply()
  14. if not res then
  15. ngx.log(ngx.ERR, err)
  16. return
  17. end
  18. until res[1] == "unsubscribe"
  19. end
  20. -- 现在再复用连接,就足够安全了
  21. self.set_keepalive_mod(redis)
  22. end