3.Chan
最后更新于:2022-04-02 04:44:55
channel 是 CSP 模式的具体实现,用于多个 goroutine 通讯。
其内部实现了同步,确保并发安全。多个goroutine同时访问,不需要加锁。
由于管道容量是5,开启go写入10个数据,再写入5个数据,会阻塞,然而read每秒会读取一个,然后在会写入一个数据。
~~~
package main
import (
"fmt"
"time"
)
func write(ch chan int) {
for i := 0; i < 10; i++ {
ch <- i
fmt.Println("put data:", i)
}
}
func read(ch chan int) {
for {
var b int
b = <-ch
fmt.Println(b)
time.Sleep(time.Second)
}
}
func main() {
intChan := make(chan int, 5)
go write(intChan)
go read(intChan)
time.Sleep(10 * time.Second)
}
~~~
输出结果:
~~~
put data: 0
put data: 1
put data: 2
put data: 3
put data: 4
put data: 5
0
1
put data: 6
2
put data: 7
3
put data: 8
4
put data: 9
5
6
7
8
9
~~~
默认为同步模式,需要发送和接收配对。否则会被阻塞,直到另一方准备好后被唤醒。
~~~
package main
import "fmt"
func main() {
data := make(chan int) // 数据交换队列
exit := make(chan bool) // 退出通知
go func() {
for d := range data { // 从队列迭代接收数据,直到 close 。
fmt.Println(d)
}
fmt.Println("recv over.")
exit <- true // 发出退出通知。
}()
data <- 1 // 发送数据。
data <- 2
data <- 3
close(data) // 关闭队列。
fmt.Println("send over.")
<-exit // 等待退出通知。
}
~~~
输出结果:
~~~
1
2
send over.
3
recv over.
~~~
异步方式通过判断缓冲区来决定是否阻塞。如果缓冲区已满,发送被阻塞;缓冲区为空,接收被阻塞。
通常情况下,异步 channel 可减少排队阻塞,具备更高的效率。但应该考虑使用指针规避大对象拷贝,将多个元素打包,减小缓冲区大小等。
~~~
package main
import (
"fmt"
)
func main() {
data := make(chan int, 3) // 缓冲区可以存储 3 个元素
exit := make(chan bool)
data <- 1 // 在缓冲区未满前,不会阻塞。
data <- 2
data <- 3
go func() {
for d := range data { // 在缓冲区未空前,不会阻塞。
fmt.Println(d)
}
exit <- true
}()
data <- 4 // 如果缓冲区已满,阻塞。
data <- 5
close(data)
<-exit
}
~~~
输出结果:
~~~
1
2
3
4
5
~~~
channel选择 :
如果需要同时处理多个 channel,可使用 select 语句。它随机选择一个可用 channel 做收发操作,或执行 default case。
~~~
package main
import (
"fmt"
"os"
)
func main() {
a, b := make(chan int, 3), make(chan int)
go func() {
v, ok, s := 0, false, ""
for {
select {
case v, ok = <-a:
s = "a"
case v, ok = <-b:
s = "b"
}
if ok {
fmt.Println(s, v)
} else {
os.Exit(0)
}
}
}()
for i := 0; i < 5; i++ {
select { // 随机选择可 channel,接收数据。
case a <- i:
case b <- i:
}
}
close(a)
select {} // 没有可用 channel,阻塞 main goroutine。
}
~~~
输出结果:
~~~
// 每次运行输出结果都不同
b 3
a 0
a 1
a 2
b 4
~~~
在循环中使用 select default case 需要小心,避免形成洪水。
模式 :用简单工厂模式打包并发任务和 channel。
~~~
package main
import (
"math/rand"
"time"
)
func NewTest() chan int {
c := make(chan int)
rand.Seed(time.Now().UnixNano())
go func() {
time.Sleep(time.Second)
c <- rand.Int()
}()
return c
}
func main() {
t := NewTest()
println(<-t) // 等待 goroutine 结束返回。
}
~~~
用 channel 实现信号量 (semaphore)。
~~~
package main
import (
"fmt"
"runtime"
"sync"
)
func main() {
runtime.GOMAXPROCS(2)
wg := sync.WaitGroup{}
wg.Add(3)
sem := make(chan int, 1)
for i := 0; i < 3; i++ {
go func(id int) {
defer wg.Done()
sem <- 1 // 向 sem 发送数据,阻塞或者成功。
for x := 0; x < 3; x++ {
fmt.Println(id, x)
}
<-sem // 接收数据,使得其他阻塞 goroutine 可以发送数据。
}(i)
}
wg.Wait()
}
~~~
输出结果:
~~~
// 每次运行输出结果都不同
0 0
0 1
0 2
2 0
2 1
2 2
1 0
1 1
1 2
~~~
用 closed channel 发出退出通知。
~~~
package main
import (
"sync"
"time"
)
func main() {
var wg sync.WaitGroup
quit := make(chan bool)
for i := 0; i < 2; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
task := func() {
println(id, time.Now().Nanosecond())
time.Sleep(time.Second)
}
for {
select {
case <-quit: // closed channel 不会阻塞,因此可用作退出通知。
return
default: // 执行正常任务。
task()
}
}
}(i)
}
time.Sleep(time.Second * 5) // 让测试 goroutine 运行一会。
close(quit) // 发出退出通知。
wg.Wait()
}
~~~
用 select 实现超时 (timeout)。
~~~
package main
import (
"fmt"
"time"
)
func main() {
w := make(chan bool)
c := make(chan int, 2)
go func() {
select {
case v := <-c:
fmt.Println(v)
case <-time.After(time.Second * 3):
fmt.Println("timeout.")
}
w <- true
}()
// c <- 1 // 注释掉,引发 timeout。
<-w
}
~~~
';