redis接口的二次封装(发布订阅)

最后更新于:2022-04-01 02:16:19

# redis接口的二次封装(发布订阅) 其实这一小节完全可以放到上一个小结,只是这里用了完全不同的玩法,所以我还是决定单拿出来分享一下这个小细节。 上一小结有关订阅部分的代码,请看: ~~~ function _M.subscribe( self, channel ) local redis, err = redis_c:new() if not redis then return nil, err end local ok, err = self:connect_mod(redis) if not ok or err then return nil, err end local res, err = redis:subscribe(channel) if not res then return nil, err end res, err = redis:read_reply() if not res then return nil, err end redis:unsubscribe(channel) self.set_keepalive_mod(redis) return res, err end ~~~ 其实这里的实现是有问题的,各位看官,你能发现这段代码的问题么?给个提示,在高并发订阅场景下,极有可能存在漏掉部分订阅信息。原因在与每次订阅到内容后,都会把redis对象进行释放,处理完订阅信息后再次去连接redis,在这个时间差里面,很可能有消息已经漏掉了。 正确的代码应该是这样的: ~~~ function _M.subscribe( self, channel ) local redis, err = redis_c:new() if not redis then return nil, err end local ok, err = self:connect_mod(redis) if not ok or err then return nil, err end local res, err = redis:subscribe(channel) if not res then return nil, err end local function do_read_func ( do_read ) if do_read == nil or do_read == true then res, err = redis:read_reply() if not res then return nil, err end return res end redis:unsubscribe(channel) self.set_keepalive_mod(redis) return end return do_read_func end ~~~ 调用示例代码: ~~~ local red = redis:new({timeout=1000}) local func = red:subscribe( "channel" ) if not func then return nil end while true do local res, err = func() if err then func(false) end ... ... end return cbfunc ~~~
';