控制协程的并发数量
最后更新于:2022-04-02 02:36:39
[TOC]
## 方法
### 利用 channel 的缓存区
可以控制协程最多创建三个
```
// main_chan.go
func main() {
var wg sync.WaitGroup
ch := make(chan struct{}, 3)
for i := 0; i < 10; i++ {
ch <- struct{}{}
wg.Add(1)
go func(i int) {
defer wg.Done()
log.Println(i)
time.Sleep(time.Second)
<-ch
}(i)
}
wg.Wait()
}
```
* `make(chan struct{}, 3)`创建缓冲区大小为 3 的 channel,在没有被接收的情况下,至多发送 3 个消息则被阻塞。
* 开启协程前,调用`ch <- struct{}{}`,若缓存区满,则阻塞。
* 协程任务结束,调用`<-ch`释放缓冲区。
* `sync.WaitGroup`并不是必须的,例如 http 服务,每个请求天然是并发的,此时使用 channel 控制并发处理的任务数量,就不需要`sync.WaitGroup`
### 利用第三方库
* [Jeffail/tunny](https://github.com/Jeffail/tunny)
* [panjf2000/ants](https://github.com/panjf2000/ants)
以 tunny 为例
```
package main
import (
"log"
"time"
"github.com/Jeffail/tunny"
)
func main() {
pool := tunny.NewFunc(3, func(i interface{}) interface{} {
log.Println(i)
time.Sleep(time.Second)
return nil
})
defer pool.Close()
for i := 0; i < 10; i++ {
go pool.Process(i)
}
time.Sleep(time.Second * 4)
}
```
* `tunny.NewFunc(3, f)`第一个参数是协程池的大小(poolSize),第二个参数是协程运行的函数(worker)。
* `pool.Process(i)`将参数 i 传递给协程池定义好的 worker 处理。
* `pool.Close()`关闭协程池。
';