完善的tcp 服务端/客户端管理

最后更新于:2022-04-02 02:50:29

[TOC] > [参考 github/godis](https://github.com/HDT3213/godis) > [说明](https://www.cnblogs.com/Finley/p/12590718.html#ttl) ## 实例
tcp/atomic.go ``` package tcp import ( "sync/atomic" ) type AtomicBool uint32 func (b *AtomicBool)Get()bool { return atomic.LoadUint32((*uint32)(b)) != 0 } func (b *AtomicBool)Set(v bool) { if v { atomic.StoreUint32((*uint32)(b), 1) } else { atomic.StoreUint32((*uint32)(b), 0) } } type AtomicInt32 struct{ v int32 } func (i *AtomicInt32) Load() int32 { return atomic.LoadInt32(&i.v) } func (i *AtomicInt32) Add(n int32) int32 { return atomic.AddInt32(&i.v, n) } func (i *AtomicInt32) Sub(n int32) int32 { return atomic.AddInt32(&i.v, -n) } func (i *AtomicInt32) Inc() int32 { return i.Add(1) } func (i *AtomicInt32) Dec() int32 { return i.Sub(1) } func (i *AtomicInt32) CAS(old, new int32) bool { return atomic.CompareAndSwapInt32(&i.v, old, new) } func (i *AtomicInt32) Store(n int32) { atomic.StoreInt32(&i.v, n) } func (i *AtomicInt32) Swap(n int32) int32 { return atomic.SwapInt32(&i.v, n) } ```

tcp/client.go ``` package tcp import ( "bufio" "context" "fmt" "io" logger "log" "net" "sync" "time" ) // 客户端连接的抽象 type Client struct { // tcp 连接 Conn net.Conn // 当服务端开始发送数据时进入waiting, 阻止其它goroutine关闭连接 // wait.Wait是作者编写的带有最大等待时间的封装: // https://github.com/HDT3213/godis/blob/master/src/lib/sync/wait/wait.go Waiting Wait } // 关闭客户端连接 func (c *Client)Close()error { // 等待数据发送完成或超时 c.Waiting.WaitWithTimeout(10 * time.Second) c.Conn.Close() return nil } type ClientMap struct { // 保存所有工作状态client的集合(把map当set用) // 需使用并发安全的容器 activeConn sync.Map // 和 tcp server 中作用相同的关闭状态标识位 closing AtomicBool count AtomicInt32 } func NewHandlerMap() *ClientMap { return &ClientMap{ } } func (h *ClientMap)Handle(ctx context.Context, conn net.Conn) { logger.Println("creat new conn",conn.RemoteAddr()) if h.closing.Get() { // closing handler refuse new connection fmt.Println("closing handler refuse new connection") conn.Close() } client := &Client { Conn: conn, } h.activeConn.Store(client, 1) h.count.Inc() fmt.Printf("count client is %+v\n",h.count.Load() ) reader := bufio.NewReader(conn) for { msg, err := reader.ReadBytes('\n') if err != nil { if err == io.EOF { logger.Println("connection close") } else { logger.Println("close connection", err) } h.delete(client) return } // 发送数据前先置为waiting状态 client.Waiting.Add(1) // 模拟关闭时未完成发送的情况 //logger.Println("sleeping") //time.Sleep(10 * time.Second) conn.Write(msg) // 发送完毕, 结束waiting client.Waiting.Done() } } func (h *ClientMap) delete(client *Client) { h.activeConn.Delete(client) h.count.Dec() client.Close() } // deprecated func (h *ClientMap)Close()error { logger.Println("handler shuting down...") h.closing.Set(true) // TODO: concurrent wait h.activeConn.Range(func(key interface{}, val interface{})bool { client := key.(*Client) client.Close() return true }) return nil } ```

tcp/server.go ``` package tcp import ( "context" "fmt" logger "log" "net" "os" "os/signal" "sync" "syscall" "time" ) type Config struct { Address string `yaml:"address"` MaxConnect uint32 `yaml:"max-connect"` Timeout time.Duration `yaml:"timeout"` } func ListenAndServe(cfg *Config, clientMap *ClientMap) { listener, err := net.Listen("tcp", cfg.Address) if err != nil { logger.Fatal(fmt.Sprintf("listen err: %v", err)) } // 监听中断信号 // atomic.AtomicBool 是作者写的封装: https://github.com/HDT3213/godis/blob/master/src/lib/sync/atomic/bool.go var closing AtomicBool sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT) go func() { sig := <-sigCh switch sig { case syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT: // 收到中断信号后开始关闭流程 logger.Println("shuting down...") // 设置标志位为关闭中, 使用原子操作保证线程可见性 closing.Set(true) // 先关闭 listener 阻止新连接进入 // listener 关闭后 listener.Accept() 会立即返回错误 _ = listener.Close() // 逐个关闭已建立链接 _ = clientMap.Close() } }() logger.Println(fmt.Sprintf("bind: %s, start listening...", cfg.Address)) defer func() { // 在出现未知错误或panic后保证正常关闭 // 这里存在一个问题是: 当应用正常关闭后会再次执行关闭操作 _ = listener.Close() _ = clientMap.Close() }() ctx, _ := context.WithCancel(context.Background()) // waitGroup 的计数是当前仍存在的连接数 // 进入关闭流程时,主协程应该等待所有连接都关闭后再退出 var waitDone sync.WaitGroup for { conn, err := listener.Accept() if err != nil { if closing.Get() { // 收到关闭信号后进入此流程,此时listener已被监听系统信号的 goroutine 关闭 logger.Println("waiting disconnect...") // 主协程应等待应用层服务器完成工作并关闭链接 waitDone.Wait() return } logger.Println(fmt.Sprintf("accept err: %v", err)) continue } // 创建一个新协程处理链接 logger.Println("accept link") go func() { defer func() { waitDone.Done() }() waitDone.Add(1) clientMap.Handle(ctx, conn) }() } } ```

tcp/wait.go ``` package tcp import ( "sync" "time" ) type Wait struct { wg sync.WaitGroup } func (w *Wait) Add(delta int) { w.wg.Add(delta) } func (w *Wait) Done() { w.wg.Done() } func (w *Wait) Wait() { w.wg.Wait() } // return isTimeout func (w *Wait) WaitWithTimeout(timeout time.Duration) bool { c := make(chan bool) go func() { defer close(c) w.wg.Wait() c <- true }() select { // 正常 case <-c: return false // 超时 case <-time.After(timeout): return true } } ```

server.go ``` package main import ( "idcpj/tcp" ) func main() { conf :=&tcp.Config{ Address: "127.0.0.1:7000", MaxConnect: 10, Timeout: 0, } handler := tcp.NewHandlerMap() tcp.ListenAndServe(conf,handler) } ```

client.php ``` package main import ( "io" "log" "net" "strconv" "time" ) func main() { for i:=0; i<50; i++ { go client(i) time.Sleep(1*time.Microsecond) } select{} } func client(i int) { conn, err := net.Dial("tcp", "127.0.0.1:7000") defer conn.Close() if err != nil { panic(err) } go func() { for { conn.Write([]byte("hello word "+strconv.Itoa(i)+"\n")) time.Sleep(100000 * time.Millisecond) } }() b := make([]byte, 2048) for { n, err := conn.Read(b) if err != nil { if err == io.EOF { return } else { log.Print("conn read error", err.Error()) } } log.Println(string(b[:n])) } } ```

执行 ``` > go run server.go > go run client.go ```
';