gopush业务流程

最后更新于:2022-04-02 04:56:05

用户连接流程: 客户端: ![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/7b6831b06ba6a50edc74f63f9d692300_1078x858.png) ~~~ ***获取节点 getScript() http://localhost:8090/1/server/get?k=Terry-Mao&p=2 [先根据用户的key获取连接节点,然后再根据对应的节点创建长连接(参数p=1 websocket,p=2 tcp)] 创建websocket连接 获取离线消息 建立心跳任务 初始化完成(等待接收在线消息 or 心跳应答) ~~~ 服务端: ~~~ 建立连接: SubscribeHandle() 监听websocket请求 addr := ws.Request().RemoteAddr 获取用户ip地址 params := ws.Request().URL.Query() 获取url参数 key、heartbeat、token、version UserChannel.Get(key, true) 在ChannelList上添加一个Channel,并返回这个用户的 Channel c.AuthToken(key, token) token验证(默认不验证) c.AddConn(key, &Connection{Conn: ws, Proto: WebsocketProto, Version: version})创建一个用户连接, 1.判断是否超过最大连接数 2.用户链接应答 3.conn.HandleWrite(key) 开启 goroutine 从 Connection.Buf chan []byte 中读取消息,推送给用户 4.c.conn.PushFront(conn) root *Element 上添加一个 *Element 5.ConnStat.IncrAdd() 总连接数+1 for{} 阻塞等待心跳->心跳应答 获取离线消息: GetOfflineMsg() 获取离线消息 r.Method != "GET" 判断请求方式 params := r.URL.Query() 获取url参数 key、msgId、callback myrpc.MessageRPC.Get() 随机获取一个rpc连接 client.Call(myrpc.MessageServiceGetPrivate, args, reply) rpc 到message GetPrivate()方法 UseStorage.GetPrivate(m.Key, m.MsgId) 根据key和msgId从redis or mysql中获取消息 redis: 1.conn := s.getConn(key) 根据key从连接池获取一个redis连接(hash算法) 2.redis.Values(conn.Do("ZRANGEBYSCORE", key, fmt.Sprintf("(%d", mid), "+inf", "WITHSCORES")) 返回所有符合条件 mid < msgId <= +inf(最大值)) 的成员及成员的 message 3.redis.Scan(values, &b, &cmid) 遍历消息 4.json.Unmarshal(b, rm) 消息反序列化 5.rm.Expire < now 判断消息是否过期 6.s.delCH <- &RedisDelMessage{Key: key, MIds: delMsgs}: 删除unmarshal失败的消息和过期消息(clean() 方法) mysql: 1.s.getConn(key) 获取mysql连接 2.db.Query(getPrivateMsgSQL, key, mid) 查询skey=key and mid>msgId 的消息 "SELECT mid, ttl, msg FROM private_msg WHERE skey=? AND mid>? ORDER BY mid" 3.for rows.Next() 遍历消息 4.now > expire 判断消息是否过期 ~~~ 推送单个私信流程: ![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/c0318cdbc8d3deb6b45f24f1aec906f1_1046x1070.png) ~~~ PushPrivate() 推送单个私信 r.Method != "POST" 判断是否是post请求 ioutil.ReadAll(r.Body) 读取请求内容 params := r.URL.Query() 获取url参数 key、expire node := myrpc.GetComet(key) 根据key获取comet节点 ???[判断key连接的comet] client := node.Rpc.Get() 随机获取一个rpc连接 client.Call(myrpc.CometServicePushPrivate, args, &ret) rpc 调用 comet 推送私信 PushPrivate() UserChannel.New(args.Key) 获取用户channel ch.PushMsg(args.Key, m, args.Expire) 推送私信 1.client := myrpc.MessageRPC.Get() 随机获取一个RPC连接 2.m.MsgId = id.Get() 生成msgId (时间戳/100) 3.m.GroupId != myrpc.PublicGroupId && expire > 0 判断是是否需要message保存(私信+过期时间>0) 4.client.Call(myrpc.MessageServiceSavePrivate, args, &ret) rpc 调用 message 模块,保存消息 SavePrivate() 保存私信 UseStorage.SavePrivate(m.Key, m.Msg, m.MsgId, m.Expire) redis: 1.conn := s.getConn(key) 根据key 通过hash算法从连接池获取一个连接 2.conn.Send("ZADD", key, mid, m) 操作写入缓冲区 3.conn.Send("ZREMRANGEBYRANK", key, 0, -1*(Conf.RedisMaxStore+1)) // conn.Send("ZREMRANGEBYRANK", key, 0, -21) 有序集只剩下最后写入的20个成员 4.conn.Flush() 提交操作 5.conn.Receive() 接受redis应答 mysql: 1.db := s.getConn(key) 根据key 通过hash算法从连接池获取一个连接 2.b.Exec(savePrivateMsgSQL, key, mid, now.Unix()+int64(expire), []byte(msg), now, now) 存mysql 5. c.writeMsg(key, m) 推送在线消息 ~~~ 批量推送私信流程: ![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/f5a22da3945d948ef58e6c43fe3ba2df_1072x1040.png) ~~~ PushMultiPrivate() 批量推送私信 r.Method != "POST" 判断是否是post请求 ioutil.ReadAll(r.Body) 读取请求内容 parseMultiPrivate(bodyBytes) 获取 keys 、 message 和 ret(错误码) 根据key获取node,通过node获取rpc 链接 rpc -> comet PushPrivates() 向多个key推送私信 UserChannel.New(key) 根据key获取一个channel 和 ChannelBucket 并保存到 bucketMap 遍历 bucketMap 开启 goroutine 存储消息 和 推送消息 1.获取请求消息 2.根据请求消息解析出keys和message 3.根据key匹配节点,存储到 map[node]keys 4.rpc 调用用节点的 PushPrivates() 方法 5.判断 key 所在的bucketChannel 返回 map[*ChannelBucket]*batchChannel 6.遍历 ChannelBucket 给keys 发消息,并rpc 调用 Message模块 存储消息 ~~~ writeMsg: 1.调用每一个 用户 Element 元素 2.将消息写入Connection.Buf chan []byte 3.HandleWrite() 消费 Buf chan 将消息推给用户 MsgId: 通过当前时间的纳秒时间戳(除) / 100 获得 存储时随存储到redis中,用于在用户获取离线消息时,匹配 MsgId 大于 用户所传参的值
';