任务调度

最后更新于:2022-04-02 06:51:52

就像我们在"Goroutines"章节中提到了,Go语言会帮你处理goroutine到系统线程上。它使用的算法被称为工作窃取策略(work stealing strategy)。这是什么意思? 首先,让我们看看在许多处理器之间共享工作的天然策略,有时称为公平调度。为了确保所有处理器的平均利用率,我们可以在所有可用处理器之间平均分配负载。想象一下,有n个处理器和x个任务需要执行。在公平调度策略中,每个处理器都会得到x/n个任务: ``` ``` :-: ![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/cd732c09b6cde622206c15d68017e78a_332x189.png) 不幸的是,这种方法存在问题。 如果你还记得“Goroutines”章节中我们提到Go使用fork-join模型来并发建模。在fork-join范例中,任务可能依赖于另一个,并且事实证明,在处理器之间分裂它们可能会导致其中一个处理器未充分利用。不仅如此,它还可能导致局部性缓存较差,因为在其他处理器上调度需要相同的数据任务。我们来看一个例子。 考虑一个程序,可以产生前面所述的工作分配。 如果第2项任务比第1项和第3项结合需要更长的时间,会发生什么? :-: ![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/4a87f72fa6430c6f7ab2cced47f61b37_584x119.jpg) 无论a和b之间的时间有多久,处理器一会闲置。 如果任务之间存在相互依存关系,如果分配给一个处理器的任务需要分配给另一个处理器的任务的结果,会发生什么情况? 例如,如果任务一依赖任务4呢? ![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/988a6ddff4447aed35c9dec0193a1dd6_583x182.jpg) 在这种情况下,处理器1完全空闲,而任务2和4正在计算中。 虽然处理器1在任务1中被阻塞,处理器2在任务2中被占用,但处理器1可能已经在处理任务4以解除其自身阻塞。 * * * * * 学识浅薄,错误在所难免。我是长风,欢迎来Golang中国的群(211938256)就本书提出修改意见。
';

第六章 Goroutines和Go运行时

最后更新于:2022-04-02 06:51:50

在使用Go进行开发时,直接利用并发是很有趣的,因为这种语言让它变得如此简单。我很少需要了解运行时如何将所有内容联系在一起。尽管如此,有些时候了解这些信息是有益处的。第2章中讨论的所有内容都是由运行时实现的,所以值得花点时间来看看运行时的工作方式。 Go运行时为你做的所有事情中,生成和管理goroutines可能是对你和你的软件最有利的一件事。谷歌有将计算机科学理论和白皮书运用于工作的历史,所以Go包含来自学术界的一些想法并不令人惊讶。令人惊讶的是每个goroutine背后的复杂程度。Go使用了一些强大的算法,使你的程序更加高效,同时Go抽象出了这些细节,为开发人员提供了非常简单的接口。 * * * * * 学识浅薄,错误在所难免。我是长风,欢迎来Golang中国的群(211938256)就本书提出修改意见。
';

本章小结

最后更新于:2022-04-02 06:51:47

在本章中,我们介绍了一些在日常开发中保持系统稳定性和可读性的方法。本章还演示了Go的并发基元如何创建更高阶的抽象。凡事皆有利弊,这些模式也可能会更麻烦,或降低系统健壮性,需要使用者仔细权衡。 在最后一章中,我们将探索Go的一些运行时的内部结构,以帮助你深入理解事情的工作方式。 我们还将探索一些有用的工具,使开发和调试变得更容易一些。 * * * * * 学识浅薄,错误在所难免。我是长风,欢迎来Golang中国的群(211938256)就本书提出修改意见。
';

Goroutines异常行为修复

最后更新于:2022-04-02 06:51:45

在诸如守护进程这样的长期进程中,拥有一组长生命周期的goroutines非常普遍。这些goroutines通常被阻塞,等待被某种方式唤醒以继续工作。有时候,这些例程依赖于你没有很好控制的资源。也许一个goroutine会接收到Web服务中希望获取数据的请求,或者它正在监视一个临时文件。 如果程序处理不够健壮,goroutine会很容易陷入一个糟糕的状态。在长期运行的过程中,如果能创建一种机制来确保goroutine的健康状况良好,并在健康状况不佳时重新启动,那么我们的项目想必能活得久一点。 我们将在本节讨论对goroutines异常行为进行修复的话题。 我们将使用心跳来检查正在监测的goroutine的活跃程度。心跳的类型将取决于你想要监控的内容,但是如果你的goroutine可能会产生活锁,请确保心跳包含某种信息,以表明该goroutine不仅没死掉,而且还可以正常执行任务。在本节中,为了简单起见,我们只会考虑goroutines是活的还是死的。 下面这段代码建立一个管理者监视一个goroutine的健康状况,以及它的子例程。如果例程变得不健康,管理者将重新启动子例程。为此,它需要引用一个可以启动goroutine的函数。让我们看看管理程序是什么样子的: ``` type startGoroutineFn func(done <-chan interface{}, pulseInterval time.Duration) (heartbeat <-chan interface{}) //1 newSteward := func(timeout time.Duration, startGoroutine startGoroutineFn) startGoroutineFn { //2 return func(done <-chan interface{}, pulseInterval time.Duration) <-chan interface{} { heartbeat := make(chan interface{}) go func() { defer close(heartbeat) var wardDone chan interface{} var wardHeartbeat <-chan interface{} startWard := func() { //3 wardDone = make(chan interface{}) //4 wardHeartbeat = startGoroutine(or(wardDone, done), timeout/2) //5 } startWard() pulse := time.Tick(pulseInterval) monitorLoop: for { //6 timeoutSignal := time.After(timeout) for { select { case <-pulse: select { case heartbeat <- struct{}{}: default: } case <-wardHeartbeat: //7 continue monitorLoop case <-timeoutSignal: //8 log.Println("steward: ward unhealthy; restarting") close(wardDone) startWard() continue monitorLoop case <-done: return } } } }() return heartbeat } } ``` 1. 这里我们定义一个可以监控和重新启动的goroutine的函数签名。 我们看到熟悉的done通道,以及熟悉的心跳模式写法。 2. 在这里我们设置了超时时间,并使用函数startGoroutine来启动它正在监控的goroutine。有趣的是,监控器本身返回一个startGoroutineFn,表示监控器自身也是可监控的。 3. 在这里我们定义一个闭包,它以同样的的方式来启动我们正在监视的goroutine。 4. 这是我们创建一个新通道,我们会将其传递给监控通道,以响应发出的停止信号。 5. 在这里,我们开启对目标goroutine的监控。如果监控器停止工作,或者监控器想要停止被监控区域,我们希望监控者也停止,因此我们将两个done通道都包含在逻辑中。我们传入的心跳间隔是超时时间的一半,但正如我们在“心跳”中讨论的那样,这可以调整。 6. 这是我们的内部循环,它确保监控者可以发出自己的心跳。 7. 在这里我们如果接收到监控者的心跳,就会知道它还处于正常工作状态,程序会继续监测循环。 8. 这里如果我们发现监控者超时,我们要求监控者停下来,并开始一个新的goroutine。然后开始新的监测。 我们的for循环有点杂乱,但如果你阅读过前面的章节,熟悉其中的模式,那么理解起来会相对简单。 接下来让我们试试看如果监控一个行为异常的goroutine,会发生什么: ``` log.SetOutput(os.Stdout) log.SetFlags(log.Ltime | log.LUTC) doWork := func(done <-chan interface{}, _ time.Duration) <-chan interface{} { log.Println("ward: Hello, I'm irresponsible!") go func() { <-done // 1 log.Println("ward: I am halting.") }() return nil } doWorkWithSteward := newSteward(4*time.Second, doWork) // 2 done := make(chan interface{}) time.AfterFunc(9*time.Second, func() { // 3 log.Println("main: halting steward and ward.") close(done) }) for range doWorkWithSteward(done, 4*time.Second) { // 4 } log.Println("Done") ``` 1. 可以看到这个goroutine什么都没干,持续阻塞等待被取消,它同样不会发出任何表明自己正常信号。 2. 这里开始建立被监控的例程,其4秒后会超时。 3. 这里我们9秒后向done通道发出信号停止整个程序。 4. 最后,我们启动监控器并在其心跳范围内防止示例停止。 这会输出: ``` 18:28:07 ward: Hello, I'm irresponsible! 18:28:11 steward: ward unhealthy; restarting 18:28:11 ward: Hello, I'm irresponsible! 18:28:11 ward: I am halting. 18:28:15 steward: ward unhealthy; restarting 18:28:15 ward: Hello, I'm irresponsible! 18:28:15 ward: I am halting. 18:28:16 main: halting steward and ward. 18:28:16 ward: I am halting. 18:28:16 Done ``` 看起来工作正常。我们的监控器比较简单,除了取消操作和心跳所需信息之外不接收也不返回任何参数。我们可以用闭包强化一下: ``` doWorkFn := func(done <-chan interface{}, intList ...int) (startGoroutineFn, <-chan interface{}) {//1 intChanStream := make(chan (<-chan interface{}))//2 intStream := bridge(done, intChanStream) doWork := func(done <-chan interface{}, pulseInterval time.Duration) <-chan interface{} {//3 intStream := make(chan interface{})//4 heartbeat := make(chan interface{}) go func() { defer close(intStream) select { case intChanStream <- intStream://5 case <-done: return } pulse := time.Tick(pulseInterval) for { valueLoop: for _, intVal := range intList { if intVal < 0 { log.Printf("negative value: %v\n", intVal)//6 return } for { select { case <-pulse: select { case heartbeat <- struct{}{}: default: } case intStream <- intVal: continue valueLoop case <-done: return } } } } }() return heartbeat } return doWork, intStream } ``` 1. 我们将监控器关闭的内容放入返回值,并返回所有监控器用来交流数据的通道。 2. 我们建立通道的通道,这是我们在前面章节中"bridge"模式的应用。 3. 这里我们建立闭包控制监控器的启动和关闭。 4. 这是各通道与监控器交互数据的实例。 5. 这里我们向起数据交互作用的通道传入数据。 6. 这里我们返回负数并从goroutine返回以模拟不正常的工作状态。 由于我们可能会启动监控器的多个副本,因此我们使用"bridge"模式来帮助向doWorkFn的调用者呈现单个不间断的通道。通过这样的方式,我们的监控器可以简单地通过组成模式而变得任意复杂。让我们看看如何调用: ``` log.SetFlags(log.Ltime | log.LUTC) log.SetOutput(os.Stdout) done := make(chan interface{}) defer close(done) doWork, intStream := doWorkFn(done, 1, 2, -1, 3, 4, 5) //1 doWorkWithSteward := newSteward(1*time.Millisecond, doWork) //2 doWorkWithSteward(done, 1*time.Hour) //3 for intVal := range take(done, intStream, 6) { //4 fmt.Printf("Received: %v\n", intVal) } ``` 1. 这里我们调用该函数,它会将传入的不定长整数参数转换为可通信的流。 2. 在这里,我们创建了一个检查doWork关闭的监视器。我们预计这里会极快的进入失败流程,所以将监控时间设置为一毫秒。 3. 我们通知 steward 开启监测。 4. 最后,我们使用该管道,并从intStream中取出前六个值。 这会输出: ``` Received: 1 23:25:33 negative value: -1 Received: 2 23:25:33 steward: ward unhealthy; restarting Received: 1 23:25:33 negative value: -1 Received: 2 23:25:33 steward: ward unhealthy; restarting Received: 1 23:25:33 negative value: -1 Received: 2 ``` 我们可以看到监控器发现错误并重启。你可能还会注意到我们只接收到了1和2,这证明了重启功能正常。如果你的系统对重复值很敏感,一定要考虑对其进行处理。你也可以考虑在一定次数的失败后退出。比如在这样的位置: ``` valueLoop: for _, intVal := range intList { // ... } ``` 稍作修改: ``` valueLoop: for { intVal := intList[0] intList = intList[1:] // ... } ``` 尽管我们依然停留在返回的无效负数上,尽管我们的监控器将继续失败,但这会记录在重新启动前的位置,你可以在这个思路上扩展。 使用这样的方式可以确保你的系统保持健康,此外,相信系统崩溃的减少也能大幅度降低开发过程中猝死的几率。 愿诸君健康工作,准点下班。 * * * * * 学识浅薄,错误在所难免。我是长风,欢迎来Golang中国的群(211938256)就本书提出修改意见。
';

速率限制

最后更新于:2022-04-02 06:51:43

如果你曾经使用过API来获取服务,那么你可能经受过与速率限制相抗衡。速率限制使得某种资源每次访问的次数受限。资源可以是任何东西:API连接,磁盘I/O,网络包,错误。 你有没有想过为什么会需要制定速率限制?为什么不允许无限制地访问系统?最明显的答案是,通过对系统进行速率限制,可以防止整个系统遭受攻击。如果恶意用户可以在他们的资源允许的情况下极快地访问你的系统,那么他们可以做各种事情。 例如,他们可以用日志消息或有效的请求填满服务器的磁盘。如果你的日志配置错误,他们甚至可能会执行恶意的操作,然后提交足够的请求,将任何活动记录从日志中移出并放入/dev/null中导致日志系统完全崩溃。他们可能试图暴力访问资源,或者执行分布式拒绝服务攻击。如果你没有对系统进行请求限制,你的系统就成了一个走在街上不穿衣服的大美女。 更糟糕的是,非恶意请求有时也会造成上述结果。恶意使用并不是唯一的原因。 在分布式系统中,如果合法用户正在以足够高的速度执行操作,或者他们正在运行的代码有问题,则合法用户可能会降低系统的性能。这甚至可能导致我们之前讨论的死亡螺旋。 从产品的角度来看,这太糟糕了!通常情况下,你希望向用户提供某种类型的性能保证,以确保他们可以在一致的基础上达到不错的性能。如果一个用户可以影响该协议,那么你的日子就不好过了。用户对系统的访问通常被沙盒化,既不会影响其他用户的活动也不会受其他用户影响。如果你打破了这种思维模式,你的系统会表现出糟糕的设计使用感,甚至会导致用户生气或离开。 >我曾经在一个分布式系统上工作,通过启动新的进程来并行扩展(这允许它水平扩展到多台机器)。每个进程都会打开数据库连接,读取一些数据并进行一些计算。一段时间以来,我们在以这种方式扩展系统以满足客户需求方面取得了巨大成功。 但是,一段时间后,我们发现大量的数据库中读取数据超时。 >我们的数据库管理员仔细查看了日志,试图找出问题所在。最后他们发现,由于系统中没有设定任何速率限制,所以进程彼此重叠。由于不同的进程试图从磁盘的不同部分读取数据,因此磁盘竞争将达到100%并一直保持在这个水平。这反过来导致了一系列的循环——超时——重试——循环。工作永远不会完成。 >我们设计了一个系统来限制数据库上可能的连接数量,并且速率限制被放在连接可以读取的秒级单位,问题消失了。虽然客户不得不等待更长的时间,但毕竟他们的工作完成了,我们能够在接下来的时间里进行适当的容量规划,以系统化的方式扩展容量。 速率限制使得你可以通过某个界限来推断系统的性能和稳定性。如果你需要扩展这些边界,可以在大量测试后以可控的方式进行。 在密集用户操作的情况下,速率限制可以使你的系统与用户之间保持良好的关系。你可以允许他们在严格限制的速率下尝试系统的特性。Google的云产品证明了这一点,这种限制在某种意义上是可以保护用户的。 速率限制通常由资源的构建者角度来考虑,但对于用户来说,能够减少速率限制的影响会让其感到非常欣慰。 那么我们怎样用Go来实现呢? 大多数速率限制是通过使用称为令牌桶的算法完成的。这很容易理解,而且相对容易实施。 我们来看看它背后的理论。 假设要使用资源,你必须拥有资源的访问令牌。没有令牌,请求会被拒绝。想象这些令牌存储在等待被检索以供使用的桶中。该桶的深度为d,表示它一次可以容纳d个访问令牌。 现在,每次你需要访问资源时,你都会进入存储桶并删除令牌。如果你的存储桶包含五个令牌,那么您可以访问五次资源。在第六次访问时,没有访问令牌可用,那么必须将请求加入队列,直到令牌变为可用,或拒绝请求。 这里有一个时间表来帮助观察这个概念。time表示以秒为单位的时间增量,bucket表示桶中请求令牌的数量,并且请求列中的tok表示成功的请求。(在这个和未来的时间表中,我们假设这些请求是即时的。) :-: ![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/6345e8d9af8548890551b7633a774811_677x295.jpg) 可以看到,在第一秒之前,我们能够完成五个请求,然后我们被阻塞,因为没有更多的令牌可供使用。 到目前为止,这些都很容易理解。那么如何补充令牌呢?在令牌桶算法中,我们将r定义为将令牌添加回桶的速率。 它可以是一纳秒或一分钟。它就是我们通常认为的速率限制:因为我们必须等待新的令牌可用,我们将操作限制为令牌的刷新率。 以下是深度为1的令牌桶示例,速率为1令牌/秒: :-: ![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/6947416d0a4c667389ba741b182ae838_678x294.jpg) 可以看到我们立即可以提出请求,但是只能隔一秒钟将请求一次。速率限制执行的非常好! 所以我们现在有两个设置可以摆弄:有多少令牌可用于立即使用,即桶的深度d,以及它们补充的速率r。另外我们还可以考虑下当存储桶已满时可以进行多少次请求。 以下是深度为5的令牌桶示例,速率为0.5令牌/秒: :-: ![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/76ba3052e0be5e33bb6a2ea89363a9ea_680x430.jpg) 在这里,我们能够立即提出五个请求,在这之后,每两秒限制一次请求。请求的爆发是在桶最初装满的时候。 请注意,用户可能不会消耗一个长数据流中的整个令牌桶。桶的深度只控制桶的容量。这里有一个用户爆发两次请求的例子,然后四秒钟后爆发了五次: :-: ![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/1ffecd7885323d1caad92b79869b1137_683x291.jpg) :-: ![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2c0311f69d82a680c36853d3a56f021d_679x169.jpg) 虽然用户有可用的令牌,但这种爆发性仅允许根据调用者的能力限制访问系统。对于只能间歇性访问系统但希望尽快往返的用户来说,这种爆发性的存在是很好的。你只需要确保系统能够同时处理所有用户的爆发操作,或者确保操作上不可能有足够的用户同时爆发操作以影响你的系统。无论哪种方式,速率限制都可以让你规避风险。 让我们使用这个算法来看看一个Go程序在执行令牌桶算法的时如何表现。 假设我们可以访问API,并且已经提供了客户端来使用它。该API有两个操作:一个用于读取文件,另一个用于将域名解析为IP地址。为了简单起见,我将忽略任何参数并返回实际访问服务所需的值。 所以这是我们的客户端: ``` func Open() *APIConnection { return &APIConnection{} } type APIConnection struct{} func (a *APIConnection) ReadFile(ctx context.Context) error { // Pretend we do work here return nil } func (a *APIConnection) ResolveAddress(ctx context.Context) error { // Pretend we do work here return nil } ``` 由于理论上这个请求正在通过网络传递,所以我们将context.Context作为第一个参数,以防我们需要取消请求或将值传递给服务器。 通过前面的章节讨论,想必大家已经熟悉了这样的用法。 现在我们将创建一个简单的程序来访问这个API。程序需要读取10个文件并解析10个地址,但这些文件和地址彼此没有关系,因此程序可以并发调用。稍后这将有助于添加利率限制。 ``` func main() { defer log.Printf("Done.") log.SetOutput(os.Stdout) log.SetFlags(log.Ltime | log.LUTC) apiConnection := Open() var wg sync.WaitGroup wg.Add(20) for i := 0; i < 10; i++ { go func() { defer wg.Done() err := apiConnection.ReadFile(context.Background()) if err != nil { log.Printf("cannot ReadFile: %v", err) } }() } log.Printf("ReadFile") for i := 0; i < 10; i++ { go func() { defer wg.Done() err := apiConnection.ResolveAddress(context.Background()) if err != nil { log.Printf("cannot ResolveAddress: %v", err) } }() } log.Printf("ResolveAddress") wg.Wait() } ``` 这会输出: ``` 20:13:13 ResolveAddress 20:13:13 ReadFile 20:13:13 ResolveAddress 20:13:13 ReadFile 20:13:13 ReadFile 20:13:13 ReadFile 20:13:13 ReadFile 20:13:13 ResolveAddress 20:13:13 ResolveAddress 20:13:13 ReadFile 20:13:13 ResolveAddress 20:13:13 ResolveAddress 20:13:13 ResolveAddress 20:13:13 ResolveAddress 20:13:13 ResolveAddress 20:13:13 ResolveAddress 20:13:13 ReadFile 20:13:13 ReadFile 20:13:13 ReadFile 20:13:13 ReadFile 20:13:13 Done. ``` 我们可以看到所有的API请求几乎同时进行。由于没有设定速率限制,所以用户可以随意随意访问系统。现在我需要提醒你,当前个可能导致无限循环的错误。 好的,让我们来介绍一个限速器。我打算在APIConnection中这样做,但通常会在服务器上运行速率限制器,这样用户就无法绕过它。生产系统还可能包括一个客户端速率限制器,以防止用户因不必要的调用而被拒绝,但这是一种优化,并不是必需的。就我们的目的而言,限速器使事情变得简单。 我们将看看golang.org/x/time/rate 中的令牌桶速率限制器实现的示例。我选择了这个包,因为这跟我所能得到的标准库较为相近。当然还有其他的软件包可以做更多的花里胡哨的工作。 golang.org/x/time/rate 非常简单,而且它适用于我们的目的。 我们将与这个包交互的前两种方法是Limit类型和NewLimiter函数,在这里定义: ``` // Limit 定义了事件的最大频率。 // Limit 被表示为每秒事件的数量。 // 值为0的Limit不允许任何事件。 type Limit float64 // ewLimiter返回一个新的Limiter实例, // 件发生率为r,并允许至多b个令牌爆发。 func NewLimiter(r Limit, b int) *Limiter ``` 在NewLimiter中,我们看到了两个熟悉的参数:r ,以及 b.r。b是我们之前讨论过的桶的深度。 rate 包还定义了一个有用的函数,Every,帮助将time.Duration转换为Limit: ``` // Every将事件之间的最小时间间隔转换为Limit。 func Every(interval time.Duration) Limit ``` Every函数是有意义的,但我想讨论每次rate限制了操作次数,而不是请求之间的时间间隔。 我们可以将其表述如下: ``` rate.Limit(events/timePeriod.Seconds()) ``` 但是我不想每次都输入这个值,Every函数有一些特殊的逻辑会返回rate.Inf——表示没有限制——如果提供的时间间隔为零。 正因为如此,我们将用这个词来表达我们的帮助函数: ``` func Per(eventCount int, duration time.Duration) rate.Limit { return rate.Every(duration/time.Duration(eventCount)) } ``` 在我们建立rate.Limiter后,我们希望使用它来阻塞我们的请求,直到获得访问令牌。 我们可以用Wait方法来做到这一点,它只是用参数1来调用WaitN: ``` // Wait 是 WaitN(ctx, 1)的简写。 func (lim *Limiter) Wait(ctx context.Context) // WaitN 会发生阻塞直到 lim 允许的 n 个事件执行。 // 它返回一个 error 如果 n 超过了 Limiter的桶大小, // Context会被取消, 或等待的时间超过了 Context 的 Deadline。 func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) ``` 我们已经集齐了限制API请求的所有要素。让我们修改APIConnection类型并尝试一下: ``` func Open() *APIConnection { return &APIConnection{ rateLimiter: rate.NewLimiter(rate.Limit(1), 1) //1 } } type APIConnection struct { rateLimiter *rate.Limiter } func (a *APIConnection) ReadFile(ctx context.Context) error { if err := a.rateLimiter.Wait(ctx); err != nil { //2 return err } // Pretend we do work here return nil } func (a *APIConnection) ResolveAddress(ctx context.Context) error { if err := a.rateLimiter.Wait(ctx); err != nil { //2 return err } // Pretend we do work here return nil } ``` 1. 我们将所有API连接的速率限制设置为每秒一个事件。 2. 我们等待速率限制器有足够的权限来完成我们的请求。 这会输出: ``` 22:08:30 ResolveAddress 22:08:31 ReadFile 22:08:32 ReadFile 22:08:33 ReadFile 22:08:34 ResolveAddress 22:08:35 ResolveAddress 22:08:36 ResolveAddress 22:08:37 ResolveAddress 22:08:38 ResolveAddress 22:08:39 ReadFile 22:08:40 ResolveAddress 22:08:41 ResolveAddress 22:08:42 ResolveAddress 22:08:43 ResolveAddress 22:08:44 ReadFile 22:08:45 ReadFile 22:08:46 ReadFile 22:08:47 ReadFile 22:08:48 ReadFile 22:08:49 ReadFile 22:08:49 Done. ``` 在生产中,我们可能会想要一些更复杂的东西。我们可能希望建立多层限制:细粒度控制以限制每秒请求数,粗粒度控制限制每分钟,每小时或每天的请求数。 在某些情况下,可以通过速率限制器来实现; 然而,在所有情况下都这么干是不可能的,并且通过尝试将单位时间的限制语义转换为单一层,你会因速率限制器而丢失大量信息。出于这些原因,我发现将限制器分开并将它们组合成一个限速器来管理交互更容易。 为此,我创建了一个简单的聚合速率限制器multiLimiter。 这是定义: ``` type RateLimiter interface { //1 Wait(context.Context) error Limit() rate.Limit } func MultiLimiter(limiters ...RateLimiter) *multiLimiter { byLimit := func(i, j int) bool { return limiters[i].Limit() < limiters[j].Limit() } sort.Slice(limiters, byLimit) //2 return &multiLimiter{limiters: limiters} } type multiLimiter struct { limiters []RateLimiter } func (l *multiLimiter) Wait(ctx context.Context) error { for _, l := range l.limiters { if err := l.Wait(ctx); err != nil { return err } } return nil } func (l *multiLimiter) Limit() rate.Limit { return l.limiters[0].Limit() //3 } ``` 1. 这里我们定义一个RateLimiter接口,以便MultiLimiter可以递归地定义其他MultiLimiter实例。 2. 这里我们实现一个优化,并按照每个RateLimiter的Limit()行排序。 3. 因为我们在multiLimiter实例化时对子RateLimiter实例进行排序,所以我们可以简单地返回限制性最高的limit,这将是切片中的第一个元素。 Wait方法遍历所有子限制器,并在每一个子限制器上调用Wait。这些调用可能会也可能不会阻塞,但我们需要通知每个子限制器,以便减少令牌桶内的令牌数量。通过等待每个限制器,我们保证最长的等待时间。这是因为如果我们执行时间较小的等待,那么最长的等待时间将被重新计算为剩余时间。在较早的等待被阻塞时,后者会等待令牌桶的填充。 经过思考,让我们重新定义APIConnection,对每秒和每分钟都进行限制: ``` func Open() *APIConnection { secondLimit := rate.NewLimiter(Per(2, time.Second), 1) //1 minuteLimit := rate.NewLimiter(Per(10, time.Minute), 10) //2 return &APIConnection{ rateLimiter: MultiLimiter(secondLimit, minuteLimit) //3 } } type APIConnection struct { rateLimiter RateLimiter } func (a *APIConnection) ReadFile(ctx context.Context) error { if err := a.rateLimiter.Wait(ctx); err != nil { return err } // Pretend we do work here return nil } func (a *APIConnection) ResolveAddress(ctx context.Context) error { if err := a.rateLimiter.Wait(ctx); err != nil { return err } // Pretend we do work here return nil } ``` 1. 我们定义了每秒的极限。 2. 我们定义每分钟的突发极限为10,为用户提供初始池。每秒的限制将确保我们不会因请求而使系统过载。 3. 我们结合这两个限制,并将其设置为APIConnection的主限制器。 这会输出: ``` 22:46:10 ResolveAddress 22:46:10 ReadFile 22:46:11 ReadFile 22:46:11 ReadFile 22:46:12 ReadFile 22:46:12 ReadFile 22:46:13 ReadFile 22:46:13 ReadFile 22:46:14 ReadFile 22:46:14 ReadFile 22:46:16 ResolveAddress 22:46:22 ResolveAddress 22:46:28 ReadFile 22:46:34 ResolveAddress 22:46:40 ResolveAddress 22:46:46 ResolveAddress 22:46:52 ResolveAddress 22:46:58 ResolveAddress 22:47:04 ResolveAddress 22:47:10 ResolveAddress 22:47:10 Done. ``` 正如您所看到的,我们每秒发出两个请求,直到请求#11,此时我们开始每隔六秒发出一次请求。 这是因为我们耗尽了我们可用的每分钟请求令牌池,并受此限制。 为什么请求#11在两秒内发生而不是像其他请求那样发生,这可能有点违反直觉。 请记住,尽管我们将API请求限制为每分钟10个,但一分钟是一个动态的时间窗口。 当我们达到第十一个要求时,我们的每分钟限制器已经累积了另一个令牌。 通过定义这样的限制,我们可以清楚地表达了粗粒度限制,同时仍然以更详细的细节水平限制请求数量。 这种技术也使我们能够开始思考除时间之外的其他维度。 当你对系统进行限制时,你可能会限制不止一件事。 也可能对API请求的数量有一些限制,也会对其他资源(如磁盘访问,网络访问等)有限制。让我们稍微充实一下示例并设置磁盘和网络限制 ``` func Open() *APIConnection { return &APIConnection{ apiLimit: MultiLimiter( //1 rate.NewLimiter(Per(2, time.Second), 2), rate.NewLimiter(Per(10, time.Minute), 10)), diskLimit: MultiLimiter(rate.NewLimiter(rate.Limit(1), 1)), //2 networkLimit: MultiLimiter(rate.NewLimiter(Per(3, time.Second), 3)) //3 } } type APIConnection struct { networkLimit, diskLimit, apiLimit RateLimiter } func (a *APIConnection) ReadFile(ctx context.Context) error { err := MultiLimiter(a.apiLimit, a.diskLimit).Wait(ctx) //4 if err != nil { return err } // Pretend we do work here return nil } func (a *APIConnection) ResolveAddress(ctx context.Context) error { err := MultiLimiter(a.apiLimit, a.networkLimit).Wait(ctx) //5 if err != nil { return err } // Pretend we do work here return nil } ``` 1. 我们为API调用设置了一个限制器。 每秒请求数和每分钟请求数都有限制 2. 我们为磁盘读取设置一个限制器。将其限制为每秒一次读取。 3. 对于网络,我们将设置每秒三个请求的限制。 4. 当我们读取文件时,将结合来自API限制器和磁盘限制器的限制。 5. 当我们需要网络访问时,将结合来自API限制器和网络限制器的限制。 这会输出: ``` 01:40:15 ResolveAddress 01:40:15 ReadFile 01:40:16 ReadFile 01:40:17 ResolveAddress 01:40:17 ResolveAddress 01:40:17 ReadFile 01:40:18 ResolveAddress 01:40:18 ResolveAddress 01:40:19 ResolveAddress 01:40:19 ResolveAddress 01:40:21 ResolveAddress 01:40:27 ResolveAddress 01:40:33 ResolveAddress 01:40:39 ReadFile 01:40:45 ReadFile 01:40:51 ReadFile 01:40:57 ReadFile 01:41:03 ReadFile 01:41:09 ReadFile 01:41:15 ReadFile 01:41:15 Done. ``` 让我们关注一下这样的事实,即我们可以将限制器组合成对每个调用都有意义的组,并且让APIClient执行正确的操作。如果我们想随便观察一下它是如何工作的,会注意到涉及网络访问的API调用似乎以更加规律的方式发生,并在前三分之二的调用中完成。这可能与goroutine调度有关,也是我们的限速器正在执行各自的工作。 我还应该提到的是,rate.Limiter类型有一些其他的技巧来优化不同的用例。这里只讨论了它等待令牌桶接收另一个令牌的能力,但如果你有兴趣使用它,可以查阅文档。 在本节中,我们考察了使用速率限制的理由,构建了令牌桶算法的Go实现,以及如何将令牌桶限制器组合为更大,更复杂的速率限制器。这应该能让你很好地了解速率限制,并帮助你开始使用它们。 * * * * * 学识浅薄,错误在所难免。我是长风,欢迎来Golang中国的群(211938256)就本书提出修改意见。
';

请求并发复制处理

最后更新于:2022-04-02 06:51:41

对于大部分应用,尽可能快地响应请求是首要任务。例如,应用程序可能正在服务用户的HTTP请求,或者检索复制的数据块。 在这些情况下,你需要做出权衡:是将请求复制到多个处理程序(无论是goutoutine,进程还是服务器),并且其中一个将比其他处理程序返回更快呢,还是立即返回结果——缺点是必须考虑如何高效利用资源来保持处理程序的多个副本同时运行。 如果这种复制是在内存中完成的,它耗费的资源可能并不是那么昂贵,但如果处理程序需要复制进程,服务器甚至数据中心,这就完全不一样了。你必须考虑这样做成本是否值得。 我们来看看如何在单个进程中复制请求。我们将使用多个goroutines作为请求处理程序,并且goroutine将在1到6纳秒之间随机休眠一段时间以模拟负载。 这将使处理程序在不同时间返回结果,并让我们看到这样做能否更高效。 下面这个例子通过10个处理程序复制模拟请求: ``` doWork := func(done <-chan interface{}, id int, wg *sync.WaitGroup, result chan<- int) { started := time.Now() defer wg.Done() // 模拟随机加载 simulatedLoadTime := time.Duration(1+rand.Intn(5)) * time.Second select { case <-done: case <-time.After(simulatedLoadTime): } select { case <-done: case result <- id: } took := time.Since(started) // 显示处理程序将花费多长时间 if took < simulatedLoadTime { took = simulatedLoadTime } fmt.Printf("%v took %v\n", id, took) } done := make(chan interface{}) result := make(chan int) var wg sync.WaitGroup wg.Add(10) for i := 0; i < 10; i++ { //1 go doWork(done, i, &wg, result) } firstReturned := <-result //2 close(done) //3 wg.Wait() fmt.Printf("Received an answer from #%v\n", firstReturned) ``` 1. 我们开启10个处理程序以处理请求。 2. 抓取处理程序第一个返回的值。 3. 取消所有剩余的处理程序。这确保他们不会继续做不必要的工作。 这会输出: ``` 4 took 1s 3 took 1s 6 took 2s 8 took 1s 2 took 2s 0 took 2s 7 took 4s 5 took 5s 1 took 3s 9 took 3s Received an answer from #3 ``` 在输出中,我们显示每个处理程序花费了多久,以便你了解此技术可以节省多少时间。 唯一需要注意的是,所有的处理程序都需要有相同且平等的请求。换句话说,不会出现从无法处理请求的处理程序接收响应时间。正如我所提到的那样,所有处理者用来完成工作的资源都需要复制。 如果你的处理程序太相似了,那么任何一个出现异常的几率都会更小。你只应该将这样的请求复制到具有不同运行时条件的处理程序:不同的进程,计算机,数据存储路径或完全访问不同的数据存储区。 这样做的代价可能是昂贵且难以维护。如果速度是你的目标,那这个技术就很有价值了。当然你还需要考虑容错和可扩展性。 * * * * * 学识浅薄,错误在所难免。我是长风,欢迎来Golang中国的群(211938256)就本书提出修改意见。
';

心跳

最后更新于:2022-04-02 06:51:38

心跳是并发进程向外界发出信号的一种方式。命名者从人体解剖学中受到启发,使用心跳一词表示被观察者的生命体征。心跳在Go语言出现前就已被广泛使用。 在并发中使用心跳是有原因的。心跳能够让我们更加深入的了解系统,并且在系统存在不确定性的时候对其测试。 我们将在本节中讨论两种不同类型的心跳: * 以固定时间间隔产生的心跳。 * 在工作单元开始时产生的心跳。 固定时间间隔产生的心跳对于并发来说很有用,它可能在等待处理某个工作单元执行某个任务时发生。由于你不知道这项工作什么时候会进行,所以你的goroutine可能会持续等待。心跳是一种向监听者发出信号的方式,即一切都很好,当前静默是正常的。 以下代码演示了会产生心跳的goroutine: ``` doWork := func(done <-chan interface{}, pulseInterval time.Duration) (<-chan interface{}, <-chan time.Time) { heartbeat := make(chan interface{}) //1 results := make(chan time.Time) go func() { defer close(heartbeat) defer close(results) pulse := time.Tick(pulseInterval) //2 workGen := time.Tick(2 * pulseInterval) //3 sendPulse := func() { select { case heartbeat <- struct{}{}: default: //4 } } sendResult := func(r time.Time) { for { select { case <-done: return case <-pulse: //5 sendPulse() case results <- r: return } } } for { select { case <-done: return case <-pulse: //5 sendPulse() case r := <-workGen: sendResult(r) } } }() return heartbeat, results } ``` 1. 在这里,我们设置了一个发送心跳信号的通道。doWork会返回该通道。 2. 我们按传入的pulseInterval值定时发送心跳,每次心跳都意味着可以从该通道上读取到内容。 3. 这只是用来模拟进入的工作的另一处代码。我们选择一个比pulseInterval更长的持续时间,以便我们可以看到来自goroutine的心跳。 4. 请注意,我们包含一个default子句。我们必须考虑如果没有人接受到心跳的情况。从goroutine发出的结果是至关重要的,但心跳不是。 5. 就像done通道,无论何时执行发送或接收,你都需要考虑心跳发送的情况。 请注意,由于我们可能在等待输入时发送多个pulse,或者在等待发送结果时发送多个pulse,所有select语句都需要在for循环内。 目前看起来不错; 我们如何利用这个函数并消费它发出的事件? 让我们来看看: ``` done := make(chan interface{}) time.AfterFunc(10*time.Second, func() { close(done) }) //1 const timeout = 2 * time.Second //2 heartbeat, results := doWork(done, timeout/2) //3 for { select { case _, ok := <-heartbeat: //4 if ok == false { return } fmt.Println("pulse") case r, ok := <-results: //5 if ok == false { return } fmt.Printf("results %v\n", r.Second()) case <-time.After(timeout): //6 return } } ``` 1. 我们设置done通道并在10秒后关闭它。 2. 我们在这里设定超时时间 我们将用它将心跳间隔与超时时间相耦合。 3. 我们向dowork传入超时时间的一半。 4. 我们将hearbeat的读取放入select语句中。每间隔 timeout/2 获取一次来自心跳通道的消息。如果我们没有收到消息,那就说明该goroutine存在问题。 5. 我们从result通道获取数据,没有什么特别的。 6. 如果我们没有收到心跳或result,程序就会超时结束。 这会输出: ``` pulse pulse results 52 pulse pulse results 54 pulse pulse results 56 pulse pulse results 58 pulse ``` 和预期的一样,每次从result中接收到信息,都会收到两次心跳。 我们可能会使用这样的功能来收集系统的统计参数,当你的goroutine没有像预期那样运行,那么基于固定时间的心跳信号的作用会非常明显。 考虑下一个例子。 我们将在两次迭代后停止goroutine来模拟循环中断,然后不关闭任何一个通道; ``` doWork := func(done <-chan interface{}, pulseInterval time.Duration) (<-chan interface{}, <-chan time.Time) { heartbeat := make(chan interface{}) results := make(chan time.Time) go func() { pulse := time.Tick(pulseInterval) workGen := time.Tick(2 * pulseInterval) sendPulse := func() { select { case heartbeat <- struct{}{}: default: } } sendResult := func(r time.Time) { for { select { case <-pulse: sendPulse() case results <- r: return } } } for i := 0; i < 2; i++ { //1 select { case <-done: return case <-pulse: sendPulse() case r := <-workGen: sendResult(r) } } }() return heartbeat, results } done := make(chan interface{}) time.AfterFunc(10*time.Second, func() { close(done) }) const timeout = 2 * time.Second heartbeat, results := doWork(done, timeout/2) for { select { case _, ok := <-heartbeat: if ok == false { return } fmt.Println("pulse") case r, ok := <-results: if ok == false { return } fmt.Printf("results %v\n", r) case <-time.After(timeout): fmt.Println("worker goroutine is not healthy!") return } } ``` 1. 这里我们简单模拟循环中断。前面的例子中,未收到通知会无限循环。这里我们只循环两次。 这会输出: ``` pulse pulse worker goroutine is not healthy! ``` 效果很不错。在两秒钟之内,我们的系统意识到goroutine未能正确读取,并且打破了for-select循环。通过使用心跳,我们已经成功地避免了死锁,并且不必通过依赖较长的超时而保持稳定性。 我们将在“Goroutines异常行为修复”中进一步理解这个概念。 另外请注意,心跳会帮助处理相反的情况:它让我们知道长时间运行的goroutine依然存在,但花了一段时间才产生一个值并发送至通道。 接下来让我们看看另一个场景:在工作单元开始时产生的心跳。这对测试非常有用。下面是个例子: ``` doWork := func(done <-chan interface{}) (<-chan interface{}, <-chan int) { heartbeatStream := make(chan interface{}, 1) //1 workStream := make(chan int) go func() { defer close(heartbeatStream) defer close(workStream) for i := 0; i < 10; i++ { select { //2 case heartbeatStream <- struct{}{}: default: //3 } select { case <-done: return case workStream <- rand.Intn(10): } } }() return heartbeatStream, workStream } done := make(chan interface{}) defer close(done) heartbeat, results := doWork(done) for { select { case _, ok := <-heartbeat: if ok { fmt.Println("pulse") } else { return } case r, ok := <-results: if ok { fmt.Printf("results %v\n", r) } else { return } } } ``` 1. 这里我们用一个缓冲区创建心跳通道。这确保即使没有人及时监听发送,也总会发送至少一个pulse。 2. 在这里,我们为心跳设置了一个单独的select块。我们不希望将它与发送结果一起包含在同一个select块中,因为如果接收器未准备好,它们将接收到一个pulse,而result的当前值将会丢失。我们也没有为done通道提供case语句,因为我们有一个default可以处理这种情况。 3. 我们再次处理如果没有人监听到心头。因为我们的心跳通道是用缓冲区创建的,如果有人在监听,但没有及时处理第一个心跳,仍会被通知。 这会输出: ``` pulse results 1 pulse results 7 pulse results 7 pulse results 9 pulse results 1 pulse results 8 pulse results 5 pulse results 0 pulse results 6 pulse results 0 ``` 如预期一致,每个结果都会有一个心跳。 至于测试的编写。考虑下面的代码: ``` func DoWork( done <-chan interface {}, nums ...int ) (<-chan interface{}, <-chan int) { heartbeat := make(chan interface{}, 1) intStream := make(chan int) go func () { defer close(heartbeat) defer close(intStream) time.Sleep(2*time.Second) // 1 for _, n := range nums { select { case heartbeat <- struct{}{}: default: } select { case <-done: return case intStream <- n: } } }() return heartbeat, intStream } ``` 1. 我们在goroutine开始工作之前模拟延迟。在实践中,延迟可以由各种各样的原因导致,例如CPU负载,磁盘争用,网络延迟和bug。 DoWork函数是一个相当简单的生成器,它将传入的数字转换为它返回通道上的数据流。我们来试试这个函数。下面提供了一个测试的反例: ``` func TestDoWork_GeneratesAllNumbers(t *testing.T) { done := make(chan interface{}) defer close(done) intSlice := []int{0, 1, 2, 3, 5} _, results := DoWork(done, intSlice...) for i, expected := range intSlice { select { case r := <-results: if r != expected { t.Errorf( "index %v: expected %v, but received %v,", i, expected, r, ) } case <-time.After(1 * time.Second): // 1 t.Fatal("test timed out") } } } ``` 1. 在这里,我们设置超时,以防止goroutine出现问题导致死锁。 运行结果为: ``` go test ./bad_concurrent_test.go --- FAIL: TestDoWork_GeneratesAllNumbers (1.00s) bad_concurrent_test.go:46: test timed out FAIL FAIL command-line-arguments 1.002s ``` 这个测试之所以不好,是因为它的不确定性。如果移除time.Sleep情况会变得更糟:这个测试会有时通过,有时失败。 我们之前提到过程中的外部因素可能会导致goroutine花费更长的时间才能完成第一次迭代。关键在于我们不能保证在超时之前第一次迭代会完成,所以我们开始考虑:这时候超时会有多大意义?我们可以增加超时时间,但这意味着测试时失败也需要很长时间,从而减慢我们的测试效率。 这种情况很可怕,项目组甚至会对测试的正确性及必要性产生怀疑。 幸运的是这种情况并非无解。这是一个正确的测试: ``` func TestDoWork_GeneratesAllNumbers(t *testing.T) { done := make(chan interface{}) defer close(done) intSlice := []int{0, 1, 2, 3, 5} heartbeat, results := DoWork(done, intSlice...) <-heartbeat //1 i := 0 for r := range results { if expected := intSlice[i]; r != expected { t.Errorf("index %v: expected %v, but received %v,", i, expected, r) } i++ } } ``` 1. 在这里,我们等待goroutine发出信号表示它正在开始处理迭代。 运行此测试会产生以下输出 ``` ok command-line-arguments 2.002s ``` 使用心跳我们可以安全地编写该测试,而不会超时。运行的唯一风险是我们的一次迭代花费了过多的时间。 如果这对我们很重要,我们可以利用更安全的、基于间隔的心跳。 以下是使用基于间隔的心跳的测试示例: ``` func DoWork(done <-chan interface{}, pulseInterval time.Duration, nums ...int) (<-chan interface{}, <-chan int) { heartbeat := make(chan interface{}, 1) intStream := make(chan int) go func() { defer close(heartbeat) defer close(intStream) time.Sleep(2 * time.Second) pulse := time.Tick(pulseInterval) numLoop: //2 for _, n := range nums { for { //1 select { case <-done: return case <-pulse: select { case heartbeat <- struct{}{}: default: } case intStream <- n: continue numLoop //3 } } } }() return heartbeat, intStream } func TestDoWork_GeneratesAllNumbers(t *testing.T) { done := make(chan interface{}) defer close(done) intSlice := []int{0, 1, 2, 3, 5} const timeout = 2 * time.Second heartbeat, results := DoWork(done, timeout/2, intSlice...) <-heartbeat //4 i := 0 for { select { case r, ok := <-results: if ok == false { return } else if expected := intSlice[i]; r != expected { t.Errorf( "index %v: expected %v, but received %v,", i, expected, r, ) } i++ case <-heartbeat: //5 case <-time.After(timeout): t.Fatal("test timed out") } } } ``` 1. 我们需要两个循环:一个用来覆盖我们的数字列表,并且这个内部循环会运行直到intStream上的数字成功发送。 2. 我们在这里使用一个标签来使内部循环继续更简单一些。 3. 这里我们继续执行外部循环。 4. 我们仍然等待第一次心跳出现,表明我们已经进入了goroutine的循环。 5. 我们在这里获取心跳以实现超时。 运行此测试会输出: ``` ok command-line-arguments 3.002s ``` 你可能已经注意到这个版本的逻辑有点混乱。如果你确信goroutine的循环在启动后不会停止执行,我建议只阻塞第一次心跳,然后进入循环语句。你可以编写单独的测试,专门来测试如未能关闭通道,循环迭代耗时过长以及其他与时间相关的情况。 在编写并发代码时,心跳不是绝对必要的,但本节将展示其的实用性。对于任何需要测试的长期运行的goroutines,我强烈推荐这种模式。 * * * * * 学识浅薄,错误在所难免。我是长风,欢迎来Golang中国的群(211938256)就本书提出修改意见。
';

超时和取消

最后更新于:2022-04-02 06:51:36

在编写并发代码时,超时和取消会频繁出现。我们将在本节中看到,超时对于创建一个可以健壮易读的程序至关重要。取消是对超时的回应。我们还将探讨在并发进程中引发取消的其他原因。 那么,我们为什么需要并发程序支持超时呢? ***系统饱和*** 正如我们在“队列”部分所讨论的那样,如果系统已经达到最大负荷(即,它的处理请求的能力达到了极限),我们可能希望系统的请求超时而不是花很长时间等待。你选择哪条路线取决于你的实际业务,但这里有一些关于何时触发超时的一般性指导: * 如果请求在超时情况下不太可能重复发送。 * 如果没有资源来存储请求(例如,临时队列的内存,持久队列的磁盘空间)。 * •如果请求或其发送的数据的过期(我们将在下面讨论)。 如果一个请求可能会重复发生,那么系统将会接受并超时请求。 如果开销超过我们系统的容量,这可能导致死亡螺旋。 但是,如果我们缺乏将请求存储在队列中所需的系统资源,这是一个有争议的问题。 即使我们符合这两条准则,将该请求加入队列也没有什么意义,只要我们可以处理请求,请求就会过期。 这给我们带来了超时的下一个理由。 ***数据过期*** 有时数据有一个窗口,在这个窗口中必须优先处理部分相关数据,或者处理数据的需求已过期。如果一个并发进程比这个窗口花费更长的时间来处理数据,我们希望超时并取消该进程。例如,如果某个并发进程在长时间等待后发起请求,则在队列中请求或其数据可能已过期。 如果这个窗口已经事先知晓,那么将context.WithDeadline或context.WithTimeout创建的context.Context传递给我们的并发进程是有意义的。 如果不是,我们希望并发进程的父节点能够在需求不再需要时取消并发进程。 context.WithCancel完美适用于此目的。 ***防止死锁*** 在大型系统中,尤其是分布式系统中,有时难以理解数据流动的方式或系统边界可能出现的情况。这并非毫无道理,甚至有人建议将超时放置在所有并发操作上,以确保系统不会发生死锁。 超时时间不一定要接近执行并发操作所需的实际时间。 设置超时时间的目的仅仅是为了防止死锁,所以它只需要足够短以满足死锁系统会在合理的时间内解锁即可。 我们在“死锁,活锁和锁的饥饿问题”章节中提到过,通过设置超时来避免死锁可能会将问题从死锁变为活锁。在大型系统中,由于存在更多的移动部件,因此与死锁相比,系统遇到不同的时序配置文件的可能性更大。因此,最好有机会锁定并修复进程,而非直接让系统死锁最终不得不重启。 请注意,这不是关于如何正确构建系统的建议。而是建议你考思考在开发和测试期间的时间、时序问题。我建议你使用超时,但是目标应该集中在一个没有死锁的系统上,在这种系统中,超时基本不会触发。 现在我们了解了何时使用超时,让我们将注意力转向取消,以及如何构建并发进程以优雅地处理取消。并发进程可能被取消的原因有很多: ***超时*** 超时是隐式的取消操作。 ***用户干预*** 为了获得良好的用户体验,通常建议如果启动长时间运行的进程时,向服务器做轮询将状态报告给用户,或允许用户查看他们的状态。当面向用户的并发操作时,有时需要允许用户取消他们已经开始的操作。 ***父节点取消*** 如果作为子节点的任何父节点停止,我们应当执行取消。 ***重复请求*** 我们可能希望将数据发送到多个并发进程,以尝试从其中一个进程获得更快的响应。 当收到响应时,需要取消其余的处理。 我们将在“重复请求”一节中详细讨论。 此外,也可能有其他的原因。然而,“为什么”这个问题并不像“如何”这样的问题那么困难或有趣。在第4章中,我们探讨了两种取消并发进程的方法:使用done通道和context.Context类型。 但这里我们要探索更复杂的问题:当一个并发进程被取消时,这对正在执行的算法及其下游消费者意味着什么?在编写可随时终止的并发代码时,需要考虑哪些事项? 为了回答这些问题,我们需要探索的第一件事是并发进程的可抢占性。下面是一个简单的例子: ``` var value interface{} select { case <-done: return case value = <-valueStream: } result := reallyLongCalculation(value) select { case <-done: return case resultStream <- result: } ``` 我们已经将valueStream的读取和resultStream的写入耦合起来,并检查done通道,看看goroutine是否已被取消,但这里存在问题。reallyLongCalculation看起来并不会执行抢占操作,而且根据名字,它看起来可能需要很长时间。这意味着,如果在reallyLongCalculation正在执行时某些事件试图取消这个goroutine,则可能需要很长时间才能确认取消并停止。让我们试着让reallyLongCalculation抢占进程,看看会发生什么: ``` reallyLongCalculation := func(done <-chan interface{}, value interface{}) interface{} { intermediateResult := longCalculation(value) select { case <-done: return nil default: } return longCaluclation(intermediateResult) } ``` 我们已经取得了一些进展:reallyLongCalculation现在可以抢占进程。但问题依然存在:我们只能对调用该函数的地方进行抢占。为了解决这个问题,我们需要继续调整 ``` reallyLongCalculation := func(done <-chan interface{}, value interface{}) interface{} { intermediateResult := longCalculation(done, value) return longCaluclation(done, intermediateResult) } ``` 如果将这一推理结果归纳一下,我们会看到当前必须做两件事:定义并发进程可抢占的时间段,并确保任何花费比此时间段更多时间的函数本身是可抢占的。一个简单的方法就是将你的goroutine分解成更小的部分。 你应该瞄准所有不可抢占的原子操作,以便在更短的时间内完成。 这里还存在另一个问题:如果goroutine恰好修改共享状态(例如,数据库,文件,内存数据结构),那么当goroutine被取消时会发生什么?goroutine是否尝试回滚? 这项工作需要多长时间?既然goroutine已经开始运行,它应该停下来,所以这个时间不应该花太长的时间来执行回滚,对吧? 如何处理这个问题很难给出一般性的建议,因为算法的性质决定了你如何处理这种情况; 如果在较小的范围内保留对任何共享状态的修改,无论是否需要确保这些修改回滚,通常都可以很好地处理。如果可能的话,在内存中建立临时存储,然后尽可能快地修改状态。作为一个例子,这是错误的做法: ``` result := add(1, 2, 3) writeTallyToState(result) result = add(result, 4, 5, 6) writeTallyToState(result) result = add(result, 7, 8, 9) writeTallyToState(result) ``` 我们在这里向state写如三次。如果运行此代码的goroutine在最终写入之前被取消,我们需要以某种方式回滚之前的写入。对比这种方法: ``` result := add(1, 2, 3, 4, 5, 6, 7, 8, 9) writeTallyToState(result) ``` 这里需要担心的回滚范围要小得多。 如果在我们调用writeToState之后取消,仍然需要一种方法来退出更改,但发生这种情况的可能性会很小,因为我们只修改一次状态。 你需要关心的另一个问题是消息重复。假设你的管道有三个阶段:generator阶段,A阶段和B阶段。generator阶段监控A阶段,跟踪自上次从其通道读取以来的时间长度,如果当前实例不正常,就创建一个新实例 A2。如果发生这种情况,阶段B可能会收到重复的消息(图5-1)。 :-: ![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/799e0e498079aebc27fa94cb3aa2dc92_482x331.png) 可以在此看到,如果在阶段A已经将阶段B的结果发送到阶段B后取消消息进入,则阶段B可能会收到重复的消息。 有几种方法可以避免这样的情况发生。最简单的方法(以及我推荐的方法)是,在子例程已经报告结果后,父例程不再发送取消信号。这需要各个阶段之间的双向通信,我们将在“心跳”一节中详细介绍。其他方法是: ***接受返回的第一个或最后一个结果*** 如果你的算法允许,或者你的并发进程是幂等的,那么可以简单地在下游进程中允许重复消息,并选择是否接受你收到的第一条或最后一条消息。 ***检查goroutine许可*** 可以使用与父节点的双向通信来明确请求发送消息的权限。这种方法类似于心跳。它看起来像这样。 :-: ![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/4e79aebb5d0ca77fe1c8f91bf7bcdcf8_458x241.png) 因为我们明确要求许可执行写入B的通道,所以这是比心跳更安全的方法。然而在实践中很少这样做,因为它比心跳更复杂,所以我建议你只是使用心跳。 在设计并发进程时,一定要考虑超时和取消。像软件工程中的许多其他技术问题一样,如果在项目初期忽略超时和取消,然后尝试将它们放在项目后期加入,有点像试图在蛋糕烘烤后再将蛋添加到蛋糕中。 * * * * * 学识浅薄,错误在所难免。我是长风,欢迎来Golang中国的群(211938256)就本书提出修改意见。
';

错误传递

最后更新于:2022-04-02 06:51:34

使用并发代码,特别是分布式系统,在系统中很容易出现问题,而且很难确认发生这种问题的原因。仔细考虑问题是如何通过系统传播的,以及如何最终呈现给用户,你会为自己,团队和用户减少很多痛苦。 在“错误处理”一节中,我们讨论了如何从goroutine处理错误,但我们没有花时间讨论这些错误应该是什么样子,或者错误应该如何流经一个庞大而复杂的系统。让我们花点时间来讨论错误传递的哲学。 许多开发人员认为错误传递是不值得关注的,或者,至少不是首先需要关注的。 Go试图通过强制开发者在调用堆栈中的每一帧处理错误来纠正这种不良做法。 首先让我们看看错误的定义。错误何时发生,以及错误会提供什么。 错误表明您的系统已进入无法完成用户明确或隐含请求的操作的状态。 因此,它需要传递一些关键信息: ***发生了什么*** 这是错误的一部分,其中包含有关所发生事件的信息,例如“磁盘已满”,“套接字已关闭”或“凭证过期”。尽管生成错误的内容可能会隐式生成此信息,你可以用一些能够帮助用户的上下文来完善它。 ***何时何处发生*** 错误应始终包含一个完整的堆栈跟踪,从调用的启动方式开始,直到实例化错误。 此外,错误应该包含有关它正在运行的上下文的信息。 例如,在分布式系统中,它应该有一些方法来识别发生错误的机器。当试图了解系统中发生的情况时,这些信息将具有无法估量的价值。 另外,错误应该包含错误实例化的机器上的时间,以UTC表示。 ***有效的信息说明*** 显示给用户的消息应该进行自定义以适合你的系统及其用户。它只应包含前两点的简短和相关信息。 一个友好的信息是以人为中心的,给出一些关于这个问题的指示,并且应该是关于一行文本。 ***如何获取更详细的错误信息*** 在某个时刻,有人可能想详细了解发生错误时的系统状态。提供给用户的错误信息应该包含一个ID,该ID可以与相应的日志交叉引用,该日志显示错误的完整信息:发生错误的时间(不是错误记录的时间),堆栈跟踪——包括你在代码中自定义的信息。包含堆栈跟踪的哈希也是有帮助的,以帮助在bug跟踪器中汇总类似的问题。 默认情况下,没有人工干预,错误所能提供的信息少得可怜。 因此,我们可以认为:在没有详细信息的情况下传播给用户任何错误的行为都是错误的。因为我们可以使用搭建框架的思路来对待错误处理。可以将所有错误归纳为两个类别: * Bug。 * 已知业务及系统意外(例如,网络连接断开,磁盘写入失败等)。 Bug是你没有为系统定制的错误,或者是“原始”错误。有时这是故意的,如果在系统多次迭代时出现的错误,尽快不可避免的传递给了用户,但接受到用户反馈后对提高系统健壮性并不是坏处。有时这是偶然的。在确定如何传播错误,系统随着时间的推移如何增长以及最终向用户展示什么时,这种区别将证明是有用的。 想象下一个巨大的系统,包含了很多模块: :-: ![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/c512c21a4020dfac548d57a5479315a0_1044x144.png) 假设在“Low Level Component”中发生错误,并且我们已经制作了一个格式良好的错误,并传递给堆栈。 在“Low Level Component”的背景下,这个错误可能被认为是合理的,但在我们的系统中,它可能不是。 让我们看看在每个组件的边界处,所有传入的错误都必须包含在我们代码所在组件的格式错误中。 例如,如果我们处于“Intermediary Component”,并且我们从“Low Level Component”调用代码,这可能会出错,我们可以使用: ``` func PostReport(id string) error { result, err := lowlevel.DoWork() if err != nil { if _, ok := err.(lowlevel.Error); ok { //1 err = WrapErr(err, "cannot post report with id %q", id) //2 } // ... } } ``` 1. 我们在这里断言以确定是我们自定义的错误。如果不是,我们会简单的把err传递给堆栈表明这里发生的错误是个bug。 2. 在这里,我们使用函数将传入的错误与我们模块的相关信息进行封装,并给它一个新类型。请注意,包装错误可能隐藏一些底层细节。 在错误最初被实例化时,错误发生时的底层细节是存在于错误信息中的。在我们的示例中,模块的边界处我们将错误包装起来,不属于我们定义的错误类型的错误都被视为格式错误。请注意,实际工作中建议只以你自己的模块边界(公共函数/方法)或代码添加有价值的上下文时以这种方式包装错误。 采取这种立场可以让我们的系统有机地发展。 我们可以确定传入的错误是正确的,反过来可以确保考虑错误如何离开模块。错误正确性成为我们系统的一个新特性。通过这样做,我们给出了一个思想上的框架,通过呈现给用户的内容明确划分了错误的类型。 所有的错误都应该记录下尽可能多的信息。 但是,当向用户显示错误时,就需要尽可能的清晰明了。 当我们的代码发现到一个格式良好的错误时,我们可以确信,在代码中的所有级别上,都意识到了该错误的存在,而且已经将其记录下来并打印出来供用户查看。 当错误传播给用户时,我们记录错误,同时向用户显示一条友好的消息,指出发生了意外事件。如果我们的系统中支持自动错误报告,那是最好不过的事情。如果不支持,应当建议用户提交一个错误报告。请注意,任何微小的错误都会包含有用的信息,即使我们无法保证面面俱到。 请记住,在任何一种情况下,如果出现错误或格式错误,我们将在邮件中包含一个日志ID,以便在需要更多信息时可以参考。 我们来看一个完整的例子。 这个例子不会非常健壮(例如,错误类型可能是简单化的),并且调用堆栈是线性的,但不妨碍大家来理清思路: ``` type MyError struct { Inner error Message string StackTrace string Misc map[string]interface{} } func wrapError(err error, messagef string, msgArgs ...interface{}) MyError { return MyError{ Inner: err, //1 Message: fmt.Sprintf(messagef, msgArgs...), StackTrace: string(debug.Stack()), //2 Misc: make(map[string]interface{}), //3 } } func (err MyError) Error() string { return err.Message } ``` 1. 在这里存储我们正在包装的错误。 如果需要调查发生的事情,我们总是希望能够查看到最低级别的错误。 2. 这行代码记录了创建错误时的堆栈跟踪。 3. 这里我们创建一个杂项信息存储字段。可以存储并发ID,堆栈跟踪的hash或可能有助于诊断错误的其他上下文信息。 接下来,我们建立一个名为 lowlevel 的模块: ``` // "lowlevel" module type LowLevelErr struct { error } func isGloballyExec(path string) (bool, error) { info, err := os.Stat(path) if err != nil { return false, LowLevelErr{wrapError(err, err.Error())} // 1 } return info.Mode().Perm()&0100 == 0100, nil } ``` 1. 在这里,我们用自定义错误来封装os.Stat中的原始错误。在这种情况下,我们不会掩盖这个错误产生的信息。 然后我们建立另一个名为 intermediate 的模块,它会调用 lowlevel 所在的包: ``` // "intermediate" module type IntermediateErr struct { error } func runJob(id string) error { const jobBinPath = "/bad/job/binary" isExecutable, err := isGloballyExec(jobBinPath) if err != nil { return err //1 } else if isExecutable == false { return wrapError(nil, "job binary is not executable") } return exec.Command(jobBinPath, "--id="+id).Run() //1 } ``` 1. 我们传递来自 lowlevel 模块的错误,由于我们接收从其他模块传递的错误而没有将它们包装在我们自己的错误类型中,这将会产生问题。 最后,让我们创建一个调用intermediate包函数的顶级main函数: ``` func handleError(key int, err error, message string) { log.SetPrefix(fmt.Sprintf("[logID: %v]: ", key)) log.Printf("%#v", err) //3 fmt.Printf("[%v] %v", key, message) } func main() { log.SetOutput(os.Stdout) log.SetFlags(log.Ltime | log.LUTC) err := runJob("1") if err != nil { msg := "There was an unexpected issue; please report this as a bug." if _, ok := err.(IntermediateErr); ok { //1 msg = err.Error() } handleError(1, err, msg) //2 } } ``` 1. 在这里我们检查是否错误是预期的类型。 如果是,可以简单地将其消息传递给用户。 2. 在这一行中,将日志和错误消息与ID绑定在一起。我们可以很容易增加这个增量,或者使用一个GUID来确保一个唯一的ID。 3. 在这里我们记录完整的错误,以备需要深入了解发生了什么。 我们在运行后会在日志中发现: ``` [logID: 1]: 21:46:07 main.LowLevelErr{error:main.MyError{Inner: (*os.PathError)(0xc4200123f0), Message:"stat /bad/job/binary: no such file or directory", StackTrace:"goroutine 1 [running]: runtime/debug.Stack(0xc420012420, 0x2f, 0xc420045d80) /home/kate/.guix-profile/src/runtime/debug/stack.go:24 +0x79 main.wrapError(0x530200, 0xc4200123f0, 0xc420012420, 0x2f, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, ...) /tmp/babel-79540aE/go-src-7954NTK.go:22 +0x62 main.isGloballyExec(0x4d1313, 0xf, 0xc420045eb8, 0x487649, 0xc420056050) /tmp/babel-79540aE/go-src-7954NTK.go:37 +0xaa main.runJob(0x4cfada, 0x1, 0x4d4c35, 0x22) /tmp/babel-79540aE/go-src-7954NTK.go:47 +0x48 main.main() /tmp/babel-79540aE/go-src-7954NTK.go:67 +0x63 ", Misc:map[string]interface {}{}}} ``` 并且标准输出会打印: ``` [1]There was an unexpected issue; please report this as a bug. ``` 我们可以看到,在这个错误路径的某处,它没有正确处理,并且因为我们无法确定错误信息是否适合用户自行处理,所以我们输出一个简单的错误信息,指出意外事件发生了。如果回顾 lowlevel 模块,我们会发现错误发生的原因:我们没有包装来自 lowlevel 模块的错误。让我们纠正它: ``` // "intermediate" module type IntermediateErr struct { error } func runJob(id string) error { const jobBinPath = "/bad/job/binary" isExecutable, err := isGloballyExec(jobBinPath) if err != nil { return IntermediateErr{wrapError(err, "cannot run job %q: requisite binaries not available", id)} //1 } else if isExecutable == false { return wrapError( nil, "cannot run job %q: requisite binaries are not executable", id, ) } return exec.Command(jobBinPath, "--id="+id).Run() } ``` 1. 在这里,我们现在使用自定义错误。我们想隐藏工作未运行原因的底层细节,因为这对于用户并不重要。 ``` func handleError(key int, err error, message string) { log.SetPrefix(fmt.Sprintf("[logID: %v]: ", key)) log.Printf("%#v", err) fmt.Printf("[%v] %v", key, message) } func main() { log.SetOutput(os.Stdout) log.SetFlags(log.Ltime | log.LUTC) err := runJob("1") if err != nil { msg := "There was an unexpected issue; please report this as a bug." if _, ok := err.(IntermediateErr); ok { msg = err.Error() } handleError(1, err, msg) } } ``` 现在,当我们运行更新后的代码,会得到类似的日志: ``` [logID: 1]: 22:11:04 main.IntermediateErr{error:main.MyError {Inner:main.LowLevelErr{error:main.MyError{Inner:(*os.PathError) (0xc4200123f0), Message:"stat /bad/job/binary: no such file or directory", StackTrace:"goroutine 1 [running]: runtime/debug.Stack(0xc420012420, 0x2f, 0x0) /home/kate/.guix-profile/src/runtime/debug/stack.go:24 +0x79 main.wrapError(0x530200, 0xc4200123f0, 0xc420012420, 0x2f, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, ...) /tmp/babel-79540aE/go -src-7954DTN.go:22 +0xbb main.isGloballyExec(0x4d1313, 0xf, 0x4daecc, 0x30, 0x4c5800) /tmp/babel-79540aE/go -src-7954DTN.go:39 +0xc5 main.runJob(0x4cfada, 0x1, 0x4d4c19, 0x22) /tmp/babel-79540aE/go-src-7954DTN.go:51 +0x4b main.main() /tmp/babel-79540aE/go -src-7954DTN.go:71 +0x63 ", Misc:map[string]interface {}{}}}, Message:"cannot run job \"1\": requisite binaries not available", StackTrace:"goroutine 1 [running]: runtime/debug.Stack(0x4d63f0, 0x33, 0xc420045e40) /home/kate/.guix-profile/src/runtime/debug/stack.go:24 +0x79 main.wrapError(0x530380, 0xc42000a370, 0x4d63f0, 0x33, 0xc420045e40, 0x1, 0x1, 0x0, 0x0, 0x0, ...) /tmp/babel-79540aE/go -src-7954DTN.go:22 +0xbb main.runJob(0x4cfada, 0x1, 0x4d4c19, 0x22) /tmp/babel-79540aE/go -src-7954DTN.go:53 +0x356 main.main() /tmp/babel-79540aE/go -src-7954DTN.go:71 +0x63 ", Misc:map[string]interface {}{}}} ``` 错误信息变得十分明白: ``` [1]cannot run job "1": requisite binaries not available ``` 这种实现方法与标准库的错误包兼容,此外你可以用你喜欢的任何方式来进行包装,并且自由度非常大。 * * * * * 学识浅薄,错误在所难免。我是长风,欢迎来Golang中国的群(211938256)就本书提出修改意见。
';

第五章 可伸缩并发设计

最后更新于:2022-04-02 06:51:32

你已经学习了在Go中使用并发的一些常见模式,现在让我们将注意力集中在将这些模式组合成一系列实践,这些实践将使你能够编写可扩展的大型可组合系统。 在本章中,我们将讨论在单个进程中扩展并发操作的方法,并开始研究处理多个进程时如何发挥并发性。 * * * * * 学识浅薄,错误在所难免。我是长风,欢迎来Golang中国的群(211938256)就本书提出修改意见。
';

小结

最后更新于:2022-04-02 06:51:29

本章介绍了很多内容。 我们结合了Go的并发原语来介绍各种模式,以帮助编写可维护的并发代码。如果你熟悉了这些模式,接下来我们就来讨论如何将这些模式合并到其他模式中,以帮助你编写大型系统。下一章将会对这样的技术进行一个概述。 * * * * * 学识浅薄,错误在所难免。我是长风,欢迎来Golang中国的群(211938256)就本书提出修改意见。
';

context包

最后更新于:2022-04-02 06:51:27

在并发程序中,由于连接超时,用户取消或系统故障,往往需要执行抢占操作。我们之前使用done通道来在程序中取消所有阻塞的并发操作,虽然取得了不错的效果,但同样也存在局限。 如果我们可以给取消通知添加额外的信息:例如取消原因,操作是否正常完成等,这对我们进一步处理会起到非常大的作用。 在社区的不断推动下,Go开发组决定创建一个标准模式,以应对这种需求。在Go 1.7中,context包被引入标准库。 如果我们浏览一下context包,会发现其包含的内容非常少: ``` var Canceled = errors.New("context canceled") var Canceled = errors.New("context canceled") type CancelFunc type Context func Background() Context func TODO() Context func WithCancel(parent Context) (ctx Context, cancel CancelFunc) func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc) func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc) func WithValue(parent Context, key, val interface{}) Context ``` 我们稍后会讨论这些类型和函数,现在让我们把关注点放到Context类型上。这个类型会贯穿你的整个系统,就跟done通道一样。如果你使用context包,从上游衍生出来的每个下游函数都可以使用Context作为参数。其类型定义是这样的: ``` type Context interface { // Deadline 返回任务完成时(该 context 被取消)的时间。 // 如果deadline 未设置,则返回的ok值为false。 // 连续调用该函数将返回相同的结果。 Deadline() (deadline time.Time, ok bool) // Done 返回任务完成时(该 context 被取消)一个已关闭的通道。 // 如果该context无法被取消,Done 将返回nil。 // 连续调用该函数将返回相同的结果。 // // 当cancel被调用时,WithCancel 遍历 Done以执行关闭; // 当deadline即将到期时,WithDeadline 遍历 Done以执行关闭; // 当timeout时,WithTimeout 遍历 Done以执行关闭。 // // Done 主要被用于 select 语句: // // // Stream 使用DoSomething生成值,并将值发送出去 // // 直到 DoSomething 返回错误或 ctx.Done 被关闭 // func Stream(ctx context.Context, out chan<- Value) error { // for { // v, err := DoSomething(ctx) // if err != nil { // return err // } // select { // case <-ctx.Done(): // return ctx.Err() // case out <- v: // } // } // } // // 查看 https://blog.golang.org/pipelines更多示例以了解如何使用 // Done通道执行取消操作。 Done() <-chan struct{} // 如果 Done 尚未关闭, Err 返回 nil. // 如果 Done 已关闭, Err 返回值不为nil的error以解释为何关闭: // 因 context 的关闭导致 // 或 context 的 deadline 执行导致。 // 在 Err 返回值不为nil的error之后, 连续调用该函数将返回相同的结果。 Err() error // Value 根据 key 返回与 context 相关的结果, // 如果没有与key对应的结果,则返回nil。 // 连续调用该函数将返回相同的结果。 // // 该方法仅用于传输进程和API边界的请求数据, // 不可用于将可选参数传递给函数。 // // 键标识着上Context中的特定值。 // 在Context中存储值的函数通常在全局变量中分配一个键, // 然后使用该键作为context.WithValue和Context.Value的参数。 // 键可以是系统支持的任何类型; // 程序中各包应将键定义为未导出类型以避免冲突。 // // 定义Context键的程序包应该为使用该键存储的值提供类型安全的访问器: // // // user包 定义了一个User类型,该类型存储在Context中。 // package user // // import "context" // // // User 类型的值会存储在 Context中。 // type User struct {...} // // // key是位于包内的非导出类型。 // // 这可以防止与其他包中定义的键的冲突。 // type key int // // // userKey 是user.User类型的值存储在Contexts中的键。 // // 它是非导出的; clients use user.NewContext and user.FromContext // // 使用 user.NewContext 和 user.FromContext来替代直接使用键。 // var userKey key // // // NewContext 返回一个新的含有值 u 的 Context。 // func NewContext(ctx context.Context, u *User) context.Context { // return context.WithValue(ctx, userKey, u) // } // // // FromContext 返回存储在 ctx中的 User类型的值(如果存在的话)。 // func FromContext(ctx context.Context) (*User, bool) { // u, ok := ctx.Value(userKey).(*User) // return u, ok // } Value(key interface{}) interface{} ``` 这看起来挺简单。有一个Done方法返回当我们的函数被抢占时关闭的通道。还有一些新鲜的但不难理解的方法:一个Deadline函数,用于指示在一定时间之后goroutine是否会被取消,以及一个Err方法,如果goroutine被取消,将返回非零值。 但Value方法看起来有点奇怪。它是干嘛用的? goroutines的主要用途之一是为请求提供服务。通常在这些程序中,除了抢占信息之外,还需要传递特定于请求的信息。这是Value函数的意义。我们会稍微谈一谈这个问题,但现在我们只需要知道context包有两个主要目的: * 提供取消操作。 * 提供用于通过调用传输请求附加数据的数据包。 让我们看看第一个目的:取消操作。 正如我们在“防止Goroutine泄漏”中所学到的,函数中的取消有三个方面: * goroutine的生成者可能想要取消它。 * goroutine可能需要取消其衍生出来的goroutine。 * goroutine中的任何阻塞操作都必须是可抢占的,以便将其取消。 Context包可以帮助我们处理这三个方面的需求。 前面提到,Context类型将是函数的第一个参数。如果你查看了Context接口的方法,会发现没有任何东西可以改变底层结果的状态。更进一步的说,没有任何东西被系统允许把Context本身干掉。这保护了Context调用堆栈的功能。因此,结合接口中的Done方法,Context类型可以安全的管理取消操作。 这就产生了一个问题:如果Context是一成不变的,那我们如何影响调用堆栈中当前函数的子函数中的取消行为? context包提供的一些函数回答了这个问题: ``` func WithCancel(parent Context) (ctx Context, cancel CancelFunc) func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc) func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) ``` 你会发现,这些函数都传入了Context类型的值,同时返回了Context类型的值,它们都使用与这些函数相关的选项生成Context的新实例。 WithCancel返回一个新的Context,它在调用返回的cancel函数时关闭done通道。 WithDeadline返回一个新的Context,当机器的时钟超过给定的最后期限时,它关闭done通道。 WithTimeout返回一个新的Context,它在给定的超时时间后关闭done通道。 如果你的函数需要以某种方式在调用中取消它的子函数,可以调用这三个函数中的一个并传递给它的上下文,然后将返回的上下文传递给它的子函数。 如果你的函数不需要修改取消行为,那么函数只传递给定的上下文。 通过这种方式,调用者可以创建符合其需求的上下文,而不会影响其创建者。这为管理调用分支提供了一个可组合的优雅的解决方案。 通过这样的方式,Context的实例可以贯穿你的整个程序。在面向对象的范例中,通常将对经常使用的数据的引用存储为成员变量,但重要的是不要使用context.Context的实例来执行此操作。context.Context的实例可能与外部看起来相同,但在内部它们可能会在每个堆栈帧处发生变化。出于这个原因,总是将Context的实例传递给你的函数是很重要的。通过这种方式,函数具有用于它的上下文,而不是把堆栈里的上下文随意取出来用。 在异步调用链的顶部,你的代码可能不会传递Context。要启动链,context包提供了两个函数来创建Context的空实例。 我们来看一个使用done通道模式的例子,并比较下切换到使用context文包获得什么好处。 这是一个同时打印问候和告别的程序: ``` func main() { var wg sync.WaitGroup done := make(chan interface{}) defer close(done) wg.Add(1) go func() { defer wg.Done() if err := printGreeting(done); err != nil { fmt.Printf("%v", err) return } }() wg.Add(1) go func() { defer wg.Done() if err := printFarewell(done); err != nil { fmt.Printf("%v", err) return } }() wg.Wait() } func printGreeting(done <-chan interface{}) error { greeting, err := genGreeting(done) if err != nil { return err } fmt.Printf("%s world!\n", greeting) return nil } func printFarewell(done <-chan interface{}) error { farewell, err := genFarewell(done) if err != nil { return err } fmt.Printf("%s world!\n", farewell) return nil } func genGreeting(done <-chan interface{}) (string, error) { switch locale, err := locale(done); { case err != nil: return "", err case locale == "EN/US": return "hello", nil } return "", fmt.Errorf("unsupported locale") } func genFarewell(done <-chan interface{}) (string, error) { switch locale, err := locale(done); { case err != nil: return "", err case locale == "EN/US": return "goodbye", nil } return "", fmt.Errorf("unsupported locale") } func locale(done <-chan interface{}) (string, error) { select { case <-done: return "", fmt.Errorf("canceled") case <-time.After(5 * time.Second): } return "EN/US", nil } ``` 这会输出: ``` hello world! goodbye world! ``` 忽略竞争条件,我们可以看到程序有两个分支同时运行。通过创建done通道并将其传递给我们的调用链来设置标准抢占方法。如果我们在main的任何一点关闭done频道,那么两个分支都将被取消。 我们可以尝试几种不同且有趣的方式来控制该程序。也许我们希望genGreeting如果花费太长时间就会超时。也许我们不希望genFarewell调用locale——在其父进程很快就会被取消的情况下。在每个堆栈框架中,一个函数可以影响其下的整个调用堆栈。 使用done通道模式,我们可以通过将传入的done通道包装到其他done通道中,然后在其中任何一个通道启动时返回,但我们不会获得上下文给的deadline和错误的额外信息。 为了将done通道模式与使用context包进行比较,我们将该程序表示为树状图。 树中的每个节点代表一个函数的调用。 :-: ![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/bbdad8419e533c12d5e6488d1a3d4364_544x724.png) 让我们使用context包来修改该程序。由于现在可以使用context.Context的灵活性,所以我们引入一个有趣的场景。 假设genGreeting在放弃调用locale之前等待一秒——超时时间为1秒。如果printGreeting不成功,我们想取消对printFare的调用。 毕竟,如果我们不打声招呼,说再见就没有意义了: ``` func main() { var wg sync.WaitGroup ctx, cancel := context.WithCancel(context.Background()) //1 defer cancel() wg.Add(1) go func() { defer wg.Done() if err := printGreeting(ctx); err != nil { fmt.Printf("cannot print greeting: %v\n", err) cancel() //2 } }() wg.Add(1) go func() { defer wg.Done() if err := printFarewell(ctx); err != nil { fmt.Printf("cannot print farewell: %v\n", err) } }() wg.Wait() } func printGreeting(ctx context.Context) error { greeting, err := genGreeting(ctx) if err != nil { return err } fmt.Printf("%s world!\n", greeting) return nil } func printFarewell(ctx context.Context) error { farewell, err := genFarewell(ctx) if err != nil { return err } fmt.Printf("%s world!\n", farewell) return nil } func genGreeting(ctx context.Context) (string, error) { ctx, cancel := context.WithTimeout(ctx, 1*time.Second) //3 defer cancel() switch locale, err := locale(ctx); { case err != nil: return "", err case locale == "EN/US": return "hello", nil } return "", fmt.Errorf("unsupported locale") } func genFarewell(ctx context.Context) (string, error) { switch locale, err := locale(ctx); { case err != nil: return "", err case locale == "EN/US": return "goodbye", nil } return "", fmt.Errorf("unsupported locale") } func locale(ctx context.Context) (string, error) { select { case <-ctx.Done(): return "", ctx.Err() //4 case <-time.After(1 * time.Minute): } return "EN/US", nil } ``` 1. 在main函数中使用context.Background()建立个新的Context,并使用context.WithCancel将其包裹以便对其执行取消操作。 2. 在这一行上,如果从 printGreeting返回错误,main将取消context。 3. 这里genGreeting用context.WithTimeout包装Context。这将在1秒后自动取消返回的context,从而取消它传递context的子进程,即语言环境。 4. 这一行返回为什么Context被取消的原因。 这个错误会一直冒泡到main,这会导致注释2处的取消操作被调用。 这会输出: ``` cannot print greeting: context deadline exceeded cannot print farewell: context canceled ``` 下面的图中数字对应例子中的代码标注。 :-: ![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/7bc061feabfbe82b725e2ac92b27eead_447x592.png) 我们可以看到系统输出工作正常。由于local设置至少需要运行一分钟,因此genGreeting将始终超时,这意味着main会始终取消printFarewell下面的调用链。 请注意,genGreeting如何构建自定义的Context.Context以满足其需求,而不必影响父级的Context。如果genGreeting成功返回,并且printGreeting需要再次调用,则可以在不泄漏genGreeting相关操作信息的情况下进行。这种可组合性使你能够编写大型系统,而无需在整个调用链中费劲心思解决这样的问题。 我们可以在这个程序上进一步改进:因为我们知道locale需要大约一分钟的时间才能运行,所以可以在locale中检查是否给出了deadline。下面这个例子演示了如何使用context.Context的Deadline方法: ``` func main() { var wg sync.WaitGroup ctx, cancel := context.WithCancel(context.Background()) defer cancel() wg.Add(1) go func() { defer wg.Done() if err := printGreeting(ctx); err != nil { fmt.Printf("cannot print greeting: %v\n", err) cancel() } }() wg.Add(1) go func() { defer wg.Done() if err := printFarewell(ctx); err != nil { fmt.Printf("cannot print farewell: %v\n", err) } }() wg.Wait() } func printGreeting(ctx context.Context) error { greeting, err := genGreeting(ctx) if err != nil { return err } fmt.Printf("%s world!\n", greeting) return nil } func printFarewell(ctx context.Context) error { farewell, err := genFarewell(ctx) if err != nil { return err } fmt.Printf("%s world!\n", farewell) return nil } func genGreeting(ctx context.Context) (string, error) { ctx, cancel := context.WithTimeout(ctx, 1*time.Second) defer cancel() switch locale, err := locale(ctx); { case err != nil: return "", err case locale == "EN/US": return "hello", nil } return "", fmt.Errorf("unsupported locale") } func genFarewell(ctx context.Context) (string, error) { switch locale, err := locale(ctx); { case err != nil: return "", err case locale == "EN/US": return "goodbye", nil } return "", fmt.Errorf("unsupported locale") } func locale(ctx context.Context) (string, error) { if deadline, ok := ctx.Deadline(); ok { //1 if deadline.Sub(time.Now().Add(1*time.Minute)) <= 0 { return "", context.DeadlineExceeded } } select { case <-ctx.Done(): return "", ctx.Err() case <-time.After(1 * time.Minute): } return "EN/US", nil } ``` 1. 我们在这里检查是否Context提供了deadline。如果提供了,而且我们的程序时间已经越过这个时间线,这时简单的返回一个context包预设的错误——DeadlineExceeded。 虽然修改的部分很小,但它允许locale函数快速失败,而不必像之前那样等待一分钟。在调用资源耗费较高的程序中,这样做会节省大量时间。唯一的问题是,你得考虑deadline设置多久合适——这需要不断的尝试。 接下来我们讨论context包的另一个用处:存储和检索附加于请求的数据包。请记住,当一个函数创建一个goroutine和Context时,它通常会启动一个为请求提供服务的进程,并且子函数可能需要相关的请求信息。下面是一个示例: ``` func main() { ProcessRequest("jane", "abc123") } func ProcessRequest(userID, authToken string) { ctx := context.WithValue(context.Background(), "userID", userID) ctx = context.WithValue(ctx, "authToken", authToken) HandleResponse(ctx) } func HandleResponse(ctx context.Context) { fmt.Printf("handling response for %v (%v)", ctx.Value("userID"), ctx.Value("authToken"), ) } ``` 这会输出: ``` handling response for jane (abc123) ``` 很简单的用法。不过也是有限制的: * 你使用的key必须在Go中是可比较的,也就是说,== 和 != 必须能返回正确的结果。 * 返回值必须是并发安全的,这样才能从多个goroutine访问。 由于Context的键和值都被定义为interface{},所以当试图检索值时,我们会失去其类型安全性。基于此,Go建议在context中存储和检索值时遵循一些规则。 首先,推荐你在包中自行定义key的类型,这样无论是否其他包执行相同的操作都可以防止context中的冲突。看下面这个例子: ``` type foo int type bar int m := make(map[interface{}]int) m[foo(1)] = 1 m[bar(1)] = 2 fmt.Printf("%v", m) ``` 这会输出: ``` map[1:2 1:1] ``` 可以看到,虽然基础值是相同的,但不同类型的信息会在map中区分它们。由于你为包定义的key类型未导出,因此其他包不会与你在包中生成的key冲突。 由于用于存储数据的key是非导出的,因此我们必须导出执行检索数据的函数。这很容易做到,因为它允许这些数据的使用者使用静态的,类型安全的函数。 当你把所有这些放在一起时,你会得到类似下面的例子: ``` func main() { ProcessRequest("jane", "abc123") } type ctxKey int const ( ctxUserID ctxKey = iota ctxAuthToken ) func UserID(c context.Context) string { return c.Value(ctxUserID).(string) } func AuthToken(c context.Context) string { return c.Value(ctxAuthToken).(string) } func ProcessRequest(userID, authToken string) { ctx := context.WithValue(context.Background(), ctxUserID, userID) ctx = context.WithValue(ctx, ctxAuthToken, authToken) HandleResponse(ctx) } func HandleResponse(ctx context.Context) { fmt.Printf( "handling response for %v (auth: %v)", UserID(ctx), AuthToken(ctx), ) } ``` 这会输出: ``` handling response for jane (auth: abc123) ``` 在本例中,我们使用类型安全的方法来从Context获取值,如果消费者在不同的包中,他们不会知道或关心用于存储信息的key。 但是,这种技术会造成隐患。 在前面的例子中,我们假设HandleResponse存在于另一个名为response的包中,假设ProcessRequest包位于名为pross的包中。 pross包必须导入response包才能调用HandleResponse,但HandleResponse无法访问pross包中定义的访问函数,因为导入会形成循环依赖关系。由于用于Context中存储的key类型对于process包来说是私有的,所以response包无法检索这些数据! 这迫使我们创建以从多个位置导入的数据类型为中心的包。 虽然这不是一件坏事,但它是需要注意的。 context包非常简洁,但依然褒贬不一。在Go社区中一直存在争议。该包的取消操作功能相当受欢迎,但是在Context中存储任意数据的能力以及存储数据的类型不安全的造成了一些分歧。 虽然我们已经部分减缓了访问函数缺乏类型安全性的问题,但是仍然可以通过存储不正确的类型来引入错误。然而,更大的问题在于,开发人员到底应该在Context的实例中存储什么样的数据。 在context包的文档中这样写到: >使用context存储值仅适用于传输进程和API的请求附加数据,而不用于将可选参数传递给函数。 该说明十分含糊,“传输进程和API的请求”实在太过宽泛。我认为最好的解读方法是与开发组一起提出一些约定,并在代码评审中检查它们: 1. **数据应该是由进程传递的或与API相关**。如果你在进程的内存中生成数据,那么除非你也通过API传递数据,否则可能不是一个很好的候选应用程序。 2. **数据应该是不可变的**。如果可变,那么根据定义,你存储的内容肯定不是来自请求。 3. **数据应指向系统简单类型。**我们在上面已经讨论了关于使用安全类型的包导入问题,这个结论是很明显的。 4. **数据应该是纯粹的数据,而不是某种类型的函数。**消费者的逻辑应该是消耗这些数据。 5. **数据应该有助于操作,而不是驱动操作。**如果你的算法根据context中包含或不包含的内容而有所不同,那么就违背了“不用于将可选参数传递”的初衷。 这些不是硬性规定,但如果你发现自己的程序与以上约定由冲突,则可能需要考虑是否存在隐患或使用context是否必要。 另一个需要考虑的方面是该数据在使用之前可能需要经过多少层。如果在接受数据的位置和使用位置之间有几个框架和几十个函数,你可以考虑使用日志,并将数据添加为参数;或者你更愿意将它放在Context中,从而创建一个不可见的依赖关系。每种方法都有优点,最终这由你和你的团队做出决定。 我将这5条约定做成了表格,你可以将之作为参考: :-: ![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/03fa98563db15fb9ea105f420da62a0d_681x234.jpg) 对于是否有必要使用context存储值,这里并没有简单的答案,具体取决于你的业务、算法和团队。 我留给你的最后建议是Context提供的取消功能非常有用,这样轻便的功能如果不用实在是太可惜了。 * * * * * 学识浅薄,错误在所难免。我是长风,欢迎来Golang中国的群(211938256)就本书提出修改意见。
';

队列

最后更新于:2022-04-02 06:51:25

我们在之前的章节列举了管道的各种优点,但有时候,尽管管道没有准备好,我们的程序依然还是要干活的,这种处理方式,被称为“队列”。 这意味着,一旦管道的某个阶段完成了工作,将其存储在内存中的临时位置,以便其他阶段可以稍后检索它,而你无需保持其引用。在“Channels”章节,我们讨论了带缓冲的通道,你可以把它视作队列的一种。 虽然在系统中引入队列功能非常有用,但它通常是优化程序时希望采用的最后一种技术之一。过早地添加队列会隐藏同步问题,例如死锁和活锁,并且,随着程序不断重构,你可能会发现需要更多或更少的队列。 那么使用队列有什么好处呢?队列通常用来尝试解决性能问题。队列几乎不会减少程序的总运行时间,它只会让程序的行为有所不同。 让我们看个简单的管道例子: ``` done := make(chan interface{}) defer close(done) zeros := take(done, 3, repeat(done, 0)) short := sleep(done, 1*time.Second, zeros) long := sleep(done, 4*time.Second, short) pipeline := long ``` 这个管道链共有4个阶段: 1. 间隔0s,不间断生成数据流。 2. 在接收到3条数据后取消前置操作。 3. 休眠1秒,短耗时阶段。 4. 休眠3秒,长耗时阶段。 我们假设阶段1和阶段2是即时的,那么需要关注的是休眠如何影响管道的运行时间。 ![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/1aac976f616e1dfcbac544ba9cfd0386_681x489.jpg) 你可以看到,这个管道耗时13秒。短耗时阶段花费了大约9秒。 如果我们给管道加入缓存会怎么样?让我们试试在长短耗时阶段之间添加个缓冲: ``` done := make(chan interface{}) defer close(done) zeros := take(done, 3, repeat(done, 0)) short := sleep(done, 1*time.Second, zeros) buffer := buffer(done, 2, short) // Buffers sends from short by 2 long := sleep(done, 4*time.Second, short) pipeline := long ``` ![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/6483094be023672db2c5cfb0d332a23f_577x316.jpg) ![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/d9e52fa2faee1e2aac81ce77f98788ec_572x167.jpg) 整个管道依然是13秒,但短耗时阶段时长降低到了3秒,看来加入缓存是有效的。但是如果整个管道仍然需要13秒来执行,这对我们有什么帮助? 我们来看看下面这个操作: ``` p := processRequest(done, acceptConnection(done, httpHandler)) ``` 这条管道会持续运行直到被取消,并且在取消之前会持续接受连接。在这期间,你肯定不希望处理连接的processRequest因acceptConnection接受连接而阻塞,你会希望processRequest是持续可用的,否则程序的用户可能会发现连接请求被拒绝。 因此,队列的价值并不是减少了某个阶段的运行时间,而是减少了它处于阻塞状态的时间。 这可以让程序继续工作。 在这个例子中,用户可能会在他们的请求中感受到延迟,但不会被拒绝服务。 通过这种方式,队列的真正用途是将操作流程分离,以便一个阶段的运行时间不会影响另一个阶段的运行时间。以这种方式解耦来改变整个系统的运行时行为,这取决于你的程序,产生的结果可能是好的也可能是不好的。 接下来我们回到关于队列的讨论。 队列应该放在哪里? 缓冲区大小应该是多少? 这些问题的答案取决于管道的性质。 我们首先分析队列提高系统整体性能适用于哪些情况: * 如果某个阶段执行批处理能够节省时间。 * 如果推迟某个阶段产生结果可以在程序中循环执行。 适用于第一种情况的一个例子是,将输入缓冲到内存而非硬盘中。实际上bufio包就是这么干的。下面这个例子比较了使用缓冲与非缓冲进行写操作: ``` func BenchmarkUnbufferedWrite(b *testing.B) { performWrite(b, tmpFileOrFatal()) } func BenchmarkBufferedWrite(b *testing.B) { bufferredFile := bufio.NewWriter(tmpFileOrFatal()) performWrite(b, bufio.NewWriter(bufferredFile)) } func tmpFileOrFatal() *os.File { file, err := ioutil.TempFile("", "tmp") if err != nil { log.Fatal("error: %v", err) } return file } func performWrite(b *testing.B, writer io.Writer) { done := make(chan interface{}) defer close(done) b.ResetTimer() for bt := range take(done, repeat(done, byte(0)), b.N) { writer.Write([]byte{bt.(byte)}) } } ``` 执行命令行: ``` go test -bench=. src/concurrency-patterns-in-go/queuing/buffering_test.go ``` 这会输出: ![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/78e05c83bd14c27940261aa48159788e_563x118.jpg) 如预期的那样,有缓冲的写入比无缓冲更快。这是因为在bufio.Writer中,写入操作在内部缓冲区中进入队列,直到已经积累了足够长的数据块,该块才被写出。这个过程通常称为分块。 分块速度更快,因为bytes.Buffer必须增加其分配的内存以容纳存储的字节数据。出于各种原因,内存扩张操作代价高昂; 因此,我们需要增长的时间越少,整个系统的整体效率就越高。 于是,队列提高了整个系统的性能。 这只是一个简单的内存分块示例,但是你可能会频繁地进行分块。通常,执行任何操作都存在开销,分块可能会提高系统性能。例如打开数据库事务,计算消息校验和以及分配内存连续空间。 除了分块之外,如果程序算法支持向后查找或排序优化,队列也可以起到帮助作用。 第二种情况,某个阶段的延迟执行导致更多的数据进入管道,但没有被发现,这更加致命,因为它可能导致上游系统的崩溃。 这个想法通常被称为负反馈循环,甚至是死亡螺旋。 这是因为管道与上游系统之间存在经常性关系;上游系统提交新请求的速度在某种程度上与管道的有效性有关。 如果管道的效率降低到某个临界阈值以下,则管道上游的系统开始增加其对管道的输入,这导致管道损失更多效率,并且死亡螺旋开始。 如果没有安全防护,该系统将无法恢复。 通过在管道入口处引入队列,你可以延迟请求来打破反馈循环。从调用者的角度来看,请求似乎正在处理中,但需要很长时间。只要调用者不超时,管道将保持稳定。如果调用方超时,则需要确保你在出列时支持安全检查。如果不这样做,可能会无意中通过处理无效请求创建了另一个负馈循环,从而降低管道的效率。 >如果你曾尝试过一些热门的新系统(例如,新游戏服务器,用于产品发布的网站等),并且尽管开发人员尽了最大的努力,但该网站一直处于不稳定状态,恭喜!你可能目睹了一个负反馈循环。 >开发团队总是在尝试不同的解决方案,直到有人意识到他们需要一个队列,并且匆忙地将其实现。 >然后客户开始抱怨排队时间。 从以上的例子中,我们可以看到一种模式慢慢浮出水面,队列应该满足以下情况: * 在管道的入口处。 * 在某个阶段进行批处理会更高效。 您可能会试图在其他地方添加队列,例如,某个阶段会执行密集计算。要避免这种诱惑!正如我们所知道的那样,只有少数情况下,对立会减少管道的运行时间。为排除干扰而尝试队列会产生灾难性后果。 为了理解为什么,我们必须讨论管道的吞吐量。别担心,这并不困难,这也将帮助我们回答关于如何确定队列应该多大的问题。 在队列理论中,有一条定律(进行足够的抽样)可以预测管道的吞吐量。这就是所谓的"最小原则",你只需要知道几点就可以理解和利用它。 我们以代数的方式定义“最小原则”,它通常表示为:L = λW,其中 * L = 系统中的平均单位数。 * λ = 单位的平均到达率。 * W = 单位在系统中花费的平均时间。 这个等式仅适用于所谓的稳定系统。 在一条管道中,一个稳定的系统就是数据进入管道或入口的速率等于它退出系统或出口的速率。 如果进入速率超过出口速度,那么你的系统就不稳定,并且已经进入死亡螺旋。 如果入口速率小于出口速率,则系统仍然不稳定,因为你的资源没有被完全利用。这不是世界上最糟糕的情况,但是如果发现资源利用严重不足(例如,集群或数据中心),也许你会关心这一点。 假设我们的管道是稳定的。如果我们想要减少单位花费在系统中的平均时间n,只有一个选择:减少系统中平均单位数:L/n = λW / n。 如果提高出口率,我们只能减少系统中的平均单位数量。还要注意,如果我们将队列添加到阶段,我们增加L,这会增加单位的到达率(nL = nλ* W)或增加单位在系统中的平均时间(nL = λ* nW)。通过最小原则,我们可以证明,队列对于减少系统花费的时间帮助不大。 同时请注意,由于我们正在观察整个管道,因此将W减少n倍将分布在我们管道的所有阶段。在我们的案例中,最小原则应该是这样定义的: ``` L = λΣiWi ``` 不分青红皂白的优化,可能导致你的管道完全被最慢的执行阶段影响。 这个原则可以帮助我们分析管道的各个阶段。假设我们的管道有三个阶段。 让我们尝试确定管道每秒可以处理多少个请求。假设我们在管道上启用了采样,发现1个请求(r)需要约1秒才能通过管道。让我们向公式放入这些数字: ``` 3r = λr/s * 1s 3r/s = λr/s λr/s = 3r/s ``` 我们将L设置为3,因为我们管道中的每个阶段都在处理请求。然后我们将W设置为1秒,做一个小代数计算,瞧!在这个管道中,我们每秒可以处理三个请求。 假设采样表明请求需要1 ms来处理。 我们的队列需要处理每秒100000次请求的大小是多少? ``` Lr-3r = 100,000r/s * 0.0001s Lr-3r = 10r Lr = 7r ``` 我们的管道有三个阶段,所以我们将L递减3。将λ设置为100000 r/s,我们发现如果想要处理很多请求,我们的队列应该有7个容量。请记住, 如果增加队列的大小,它需要更长的时间才能完成! 你实际上在延迟降低系统利用率。 但这个公式也存在缺陷,它无法观察对失败的处理。请记住,如果由于某种原因管道发生混乱,你将丢失队列中的所有请求。为了缓解这种情况,你可以将队列大小保持为零,也可以将其移至持久队列中,该队列是一个持续存在的队列,可以在需要时再读取。 队列在你的系统中可能很有用,但由于它的复杂性,它通常是我建议实现的最后优化手段之一。 * * * * * 学识浅薄,错误在所难免。我是长风,欢迎来Golang中国的群(211938256)就本书提出修改意见。
';

bridge-channel

最后更新于:2022-04-02 06:51:23

在某些情况下,你可能会发现自己想要使用一系列通道的值: ``` <-chan <-chan interface{} ``` 这与将某个通道的数据切片合并到一个通道中稍有不同,这种调用方式意味着一系列通道有序的写入操作。这与管道的单个“阶段”类似,其生命周期是间歇性的。按“访问范围约束”章节所提到的,通道由写入它们的goroutine所拥有,每当在新的goroutine中启动一个管道的“阶段”时,就会创建一个新的通道——这意味着我们会得到一个通道队列。我们会在第五章“Goroutines异常行为修复”中详细讨论。 作为消费者,代码可能不关心其值来自于一系列通道的事实。在这种情况下,处理一系列通道中的单个通道可能很麻烦。如果我们定义一个函数,可以将一系列通道拆解为一个简单的通道——我们成为通道桥接(bridge-channle),这使得消费者更容易关注手头的问题: ``` bridge := func(done <-chan interface{}, chanStream <-chan <-chan interface{}) <-chan interface{} { valStream := make(chan interface{}) // 1 go func() { defer close(valStream) for { // 2 var stream <-chan interface{} select { case maybeStream, ok := <-chanStream: if ok == false { return } stream = maybeStream case <-done: return } for val := range orDone(done, stream) { // 3 select { case valStream <- val: case <-done: } } } }() return valStream } ``` 1. 这个通道会返回所有传入bridge的通道。 2. 该循环负责从chanStream中提取通道并将其提供给嵌套循环以供使用。 3. 该循环负责读取已经给出的通道的值,并将这些值重复到valStream中。当前正在循环的流关闭时,我们跳出执行从该通道读取的循环,并继续下一次循环来选择要读取的通道。 这为我们提供了一个不间断的流。 这段代码非常直白。接下来我们来使用它。下面这个例子创建了10个通道,每个通道都写入一个元素,并将这些通道传递给bridge: ``` genVals := func() <-chan <-chan interface{} { chanStream := make(chan (<-chan interface{})) go func() { defer close(chanStream) for i := 0; i < 10; i++ { stream := make(chan interface{}, 1) stream <- i close(stream) chanStream <- stream } }() return chanStream } for v := range bridge(nil, genVals()) { fmt.Printf("%v ", v) } ``` 这会输出: ``` 0 1 2 3 4 5 6 7 8 9 ``` 通过使用bridge,我们可以专注于解构之外的逻辑,而无需去关心大量的通道处理问题。 * * * * * 学识浅薄,错误在所难免。我是长风,欢迎来Golang中国的群(211938256)就本书提出修改意见。
';

tee-channel

最后更新于:2022-04-02 06:51:20

有时候你可能想分割来自通道的多个值,以便将它们发送到两个独立区域。想象一下:你可能想要在一个通道上接收一系列操作指令,将它们发送给执行者,同时记录操作日志。 与Unix系统的tee命令功能类似,我们用tee-channel来实现同样的功能。你可以传递给它一个用作读取的通道,它会返回两个单独的通道: ``` tee := func( done <-chan interface{}, in <-chan interface{}, ) (_, _ <-chan interface{}) { <-chan interface{}) { out1 := make(chan interface{}) out2 := make(chan interface{}) go func() { defer close(out1) defer close(out2) for val := range orDone(done, in) { var out1, out2 = out1, out2 //1 for i := 0; i < 2; i++ { //2 select { case <-done: case out1 <- val: out1 = nil //3 case out2 <- val: out2 = nil //3 } } } }() return out1, out2 } ``` ***注意:原文例子就是这样,反复确认没有贴错。大家就当伪码看吧*** 1. 我们希望使用使用本地的变量,所以建立了他们的副本。 2. 我们将使用一条select语句,以便写入out1和out2不会彼此阻塞。 为了确保两者都顺利写入,我们将执行select语句的两个迭代。 3. 一旦我们写入了通道,我们将其副本设置为零,这样继续写入将阻塞,而另一个通道可以继续执行。 注意写入out1和out2是紧密耦合的。 直到out1和out2都被写入,迭代才能继续。 通常这不是问题,因为无论如何,处理来自每个通道的读取流程的吞吐量应该是tee之外的关注点,但值得注意。 这是一个快速调用示例: ``` done := make(chan interface{}) defer close(done) out1, out2 := tee(done, take(done, repeat(done, 1, 2), 4)) for val1 := range out1 { fmt.Printf("out1: %v, out2: %v\n", val1, <-out2) } ``` 利用这种模式,很容易使用通道作为系统数据的连接点。 * * * * * 学识浅薄,错误在所难免。我是长风,欢迎来Golang中国的群(211938256)就本书提出修改意见。
';

or-done-channel

最后更新于:2022-04-02 06:51:18

有时你会与来自系统不同部分的通道交互。与管道不同的是,当你使用的代码通过done通道取消操作时,你无法对通道的行为方式做出判断。也就是说,你不知道正在执行读取操作的goroutine现在是什么状态。出于这个原因,正如我们在“防止Goroutine泄漏”中所阐述的那样,需要用select语句来封装我们的读取操作和done通道。可以简单的写成这样: ``` for val := range myChan { // 对 val 进行处理 } ``` 展开后可以写成这样: ``` loop: for { select { case <-done: break loop case maybeVal, ok := <-myChan: if ok == false { return // or maybe break from for } // Do something with val } } ``` 这样做可以快速退出嵌套循环。继续使用goroutines编写更清晰的并发代码,而不是过早优化的主题,我们可以用一个goroutine来解决这个问题。 我们封装了细节,以便其他人调用更方便: ``` orDone := func(done, c <-chan interface{}) <-chan interface{} { valStream := make(chan interface{}) go func() { defer close(valStream) for { select { case <-done: return case v, ok := <-c: if ok == false { return } select { case valStream <- v: case <-done: } } } }() return valStream } ``` 这样做允许我们回到简单的循环方式: ``` for val := range orDone(done, myChan) { // Do something with val } ``` 你可能会在代码中发现需要使用一系列select语句的循环代码,但我会鼓励你先尝试提高可读性,并避免过早优化。 * * * * * 学识浅薄,错误在所难免。我是长风,欢迎来Golang中国的群(211938256)就本书提出修改意见。
';

扇入扇出

最后更新于:2022-04-02 06:51:16

那么你已经建立了一条管道。 数据在你的系统中欢畅地流动,并在莫连接在一起的各个阶段发生变化。 它就像一条美丽的溪流; 一个美丽的,缓慢的溪流,哦,我的上帝为什么这需要这么久? 有时候,管道中的各个阶段可能在计算上特别耗费资源。当发生这种情况时,管道中的上游阶段可能会在等待完成时被阻塞。不仅如此,管道本身可能需要很长时间才能整体执行。 我们如何解决这个问题? 管道的一个有趣属性是它的各个阶段相互独立,方便组合。你可以多次重复使用管道的各个阶段。因此,在多个goroutine上重用管道的单个阶段实现并行化,将有助于提高管道的性能。 事实上,这种模式被称为扇入扇出。 扇出(Fan-out)是一个术语,用于描述启动多个goroutines以处理来自管道的输入的过程,并且扇入(fan-in)是描述将多个结果组合到一个通道中的过程的术语。 那么在什么情况下适用于这种模式呢?如果出现以下两种情况,你就可以考虑这么干了: * 不依赖模块之前的计算结果。 * 运行需要很长时间。 运行的独立性是非常重要的,因为你无法保证各阶段的并发程序以何种顺序运行,也无法保证其返回的顺序。 我们来看一个例子。在下面的例子中,构建了一个寻找素数的方法。我们将使用在“管道”中的经验,创建各个阶段,并将它们拼接在一起: ``` rand := func() interface{} { return rand.Intn(50000000) } done := make(chan interface{}) defer close(done) start := time.Now() randIntStream := toInt(done, repeatFn(done, rand)) fmt.Println("Primes:") for prime := range take(done, primeFinder(done, randIntStream), 10) { fmt.Printf("\t%d\n", prime) } fmt.Printf("Search took: %v", time.Since(start)) ``` 这会输出: ``` Primes: 24941317 36122539 6410693 10128161 25511527 2107939 14004383 7190363 45931967 2393161 Search took: 23.437511647s ``` 我们生成一串随机数,最大值为50000000,将数据流转换为整数流,然后将其传入primeFinder。pri meFinder会尝试将输入流提供的数字除以比它小的每个数字。如果不成功,会将该值传递到下一个阶段。当然,这个方法很低效,但它符合我们程序运行时间较长的要求。 在我们的for循环中,搜索找到的素数,在进入时将它们打印出来,并且take在找到10个素数后关闭管道。然后,我们打印出搜索需要多长时间,完成的通道被延迟声明关闭,管道停止 。 为了避免结果中出现重复,我们可以把已找到的素数缓存起来,但为了简单起见,我们将忽略这些。 你可以看到大概需要23秒才能找到10个素数,这实在是有点慢。通常遇到这种情况,我们首先看一下算法本身,也许是拿一本算法书籍,然后看看我们是否能在哪个阶段改进。但是,由于目的是通过扇出来解决该问题,所以算法我们暂时先不去管它。 我们的程序现在有两个阶段:生成随机数和筛选素数。在更大的程序中,你的管道可能由更多的阶段组成,那我们该对什么样的阶段使用扇出模式进行改进?请记住我们之前提出的标准:执行顺序的独立性和执行时间。我们的随机数生成器肯定是与顺序无关的,但运行起来并不需要很长的时间。PrimeFinder阶段也是顺序无关的,因为我们采用的算法效率非常低下,它需要很长时间才能运行完成。因此,我们可以把关注点放在PrimeFinder身上。 为此,我们可以将其操作拆散,就像这样: ``` numFinders := runtime.NumCPU() finders := make([]<-chan int, numFinders) for i := 0; i < numFinders; i++ { finders[i] = primeFinder(done, randIntStream) } ``` 在我的电脑上,runtime.NumCPU()返回8,在生产中,我们可能会做一些经验性的测试来确定CPU的最佳数量,但在这里我们将保持简单,并且假设只有一个findPrimes阶段的CPU会被占用。 这就好像一个班级的作业,原本由1位老师批改,现在变成了8位老师同时批改。 接下来我们遇到的问题是,如何将结果汇总到一起。为此,我们开始考虑使用扇入(fan-in)。 正如我们前面所提到的,扇入意味着将多个数据流复用或合并成一个流。 这样做相对简单: ``` fanIn := func(done <-chan interface{}, channels ...<-chan interface{}) <-chan interface{} { // 1 var wg sync.WaitGroup // 2 multiplexedStream := make(chan interface{}) multiplex := func(c <-chan interface{}) { // 3 defer wg.Done() for i := range c { select { case <-done: return case multiplexedStream <- i: } } } // 从所有的通道中取数据 wg.Add(len(channels)) // 4 for _, c := range channels { go multiplex(c) } // 等待所有数据汇总完毕 go func() { // 5 wg.Wait() close(multiplexedStream) }() return multiplexedStream } ``` 1. 一如既往,我们使用done通道来关闭衍生的goroutine,并接收接口类型的通道切片来汇总数据。 2. 这里我们使用sync.WaitGroup以等待全部通道读取完成。 3. 我们在这里建立函数multiplex,它会读取传入的通道,并把该通道的值放入multiplexedStream。 4. 这里增加等待计数。 5. 这里我们建立一个goroutine等待汇总完毕。***这样函数块可以快速return,不必等待wg.Wait()。这种用法不多见,但在这里很符合场景需求。*** 简而言之,扇入涉及读取多路复用通道,然后为每个传入通道启动一个goroutine,以及在传入通道全部关闭时关闭复用通道。由于我们要创建一个等待N个其他goroutine完成的goroutine,因此创建sync.WaitGroup来协调处理是有意义的。multiplex还通知WaitGroup它已执行完成。 额外提醒,在对返回结果的顺序有要求的情况下扇入扇出可能工作的不是很好。我们没有做任何事情来保证从randIntStream中读取数据的顺序。稍后,我们将看一个维护顺序的例子。 让我们把所有这些改进放在一起,看看运行时长是否有所减少: ``` done := make(chan interface{}) defer close(done) start := time.Now() rand := func() interface{} { return rand.Intn(50000000) } randIntStream := toInt(done, repeatFn(done, rand)) numFinders := runtime.NumCPU() fmt.Printf("Spinning up %d prime finders.\n", numFinders) finders := make([]<-chan interface{}, numFinders) fmt.Println("Primes:") for i := 0; i < numFinders; i++ { finders[i] = primeFinder(done, randIntStream) } for prime := range take(done, fanIn(done, finders...), 10) { fmt.Printf("\t%d\n", prime) } fmt.Printf("Search took: %v", time.Since(start)) ``` 这会输出: ``` Spinning up 8 prime finders. Primes: 6410693 24941317 10128161 36122539 25511527 2107939 14004383 7190363 2393161 45931967 Search took: 5.438491216s ``` 最大降幅23秒,这简直是个壮举。运用扇入扇出可以在不大幅改变程序结构的前提下将运行时间缩短了大约78%。 * * * * * 学识浅薄,错误在所难免。我是长风,欢迎来Golang中国的群(211938256)就本书提出修改意见。
';

便利的生成器

最后更新于:2022-04-02 06:51:13

我早些时候承诺会演示一些可能广泛使用的有趣的生成器。我们来看看一个名为repeat的生成器: ``` repeat := func(done <-chan interface{}, values ...interface{}) <-chan interface{} { valueStream := make(chan interface{}) go func() { defer close(valueStream) for { for _, v := range values { select { case <-done: return case valueStream <- v: } } } }() return valueStream } ``` 这个函数会重复你传给它的值,直到你告诉它停止。 让我们来看看另一个函数take,它在与repeat结合使用时很有用: ``` take := func(done <-chan interface{}, valueStream <-chan interface{}, num int, ) <-chan interface{} { takeStream := make(chan interface{}) go func() { defer close(takeStream) for i := 0; i < num; i++ { select { case <-done: return case takeStream <- <-valueStream: } } }() return takeStream } ``` 这个函数会从其传入的valueStream中取出第一个元素然后退出。二者组合起来会怎么样呢? ``` done := make(chan interface{}) defer close(done) for num := range take(done, repeat(done, 1), 10) { fmt.Printf("%v ", num) } ``` 这会输出: ``` 1 1 1 1 1 1 1 1 1 1 ``` 在这个基本的例子中,我们创建了一个repeat生成器来生成无限数量的重复生成器,但是只取前10个。repeat生成器由take接收。虽然我们可以生成无线数量的流,但只会生成n+1个实例,其中n是我们传入take的数量。 我们可以扩展这一点。让我们创建另一个生成器,但是这次我们创建一个重复调用函数的生成器repeatFn: ``` repeatFn := func(done <-chan interface{}, fn func() interface{}) <-chan interface{} { valueStream := make(chan interface{}) go func() { defer close(valueStream) for { select { case <-done: return case valueStream <- fn(): } } }() return valueStream } ``` 我们用它来生成10个随机数: ``` done := make(chan interface{}) defer close(done) rand := func() interface{} { return rand.Int() } for num := range take(done, repeatFn(done, rand), 10) { fmt.Println(num) } ``` 这会输出: ``` 5577006791947779410 8674665223082153551 6129484611666145821 4037200794235010051 3916589616287113937 6334824724549167320 605394647632969758 1443635317331776148 894385949183117216 2775422040480279449 ``` 您可能想知道为什么所有这些发生器通道类型都是interface{}。 Go中的空接口有点争议,但我认为处理interface的通道方便使用标准的管道模式。 正如我们前面所讨论的,管道的强大来自可重用的阶段。当阶段以适合自身的特异性水平进行操作时,这是最好的。在repeat和repeatFn生成器中,我们需要关注的是通过在列表或运算符上循环来生成数据流。这些操作都不需要关于处理的类型,而只需要知道参数的类型。 当需要处理特定的类型时,可以放置一个执行类型断言的阶段。有一个额外的管道阶段和类型断言的性能开销可以忽略不计,正如我们稍后会看到的。 以下是一个介绍toString管道阶段的小例子: ``` toString := func(done <-chan interface{}, valueStream <-chan interface{}, ) <-chan string { stringStream := make(chan string) go func() { defer close(stringStream) for v := range valueStream { select { case <-done: return case stringStream <- v.(string): } } }() return stringStream } ``` 可以这样使用它: ``` done := make(chan interface{}) defer close(done) var message string for token := range toString(done, take(done, repeat(done, "I", "am."), 5)) { message += token } fmt.Printf("message: %s...", message) ``` 这会输出: ``` message: Iam.Iam.I... ``` 现在让我们证明刚才提到的性能问题。我们将编写两个基准测试函数:一个测试通用阶段,一个测试类型特定阶段: ``` func BenchmarkGeneric(b *testing.B) { done := make(chan interface{}) defer close(done) b.ResetTimer() for range toString(done, take(done, repeat(done, "a"), b.N)) { } } func BenchmarkTyped(b *testing.B) { repeat := func(done <-chan interface{}, values ...string) <-chan string { valueStream := make(chan string) go func() { defer close(valueStream) for { for _, v := range values { select { case <-done: return case valueStream <- v: } } } }() return valueStream } take := func(done <-chan interface{}, valueStream <-chan string, num int, ) <-chan string { takeStream := make(chan string) go func() { defer close(takeStream) for i := num; i > 0 || i == -1; { if i != -1 { i-- } select { case <-done: return case takeStream <- <-valueStream: } } }() return takeStream } done := make(chan interface{}) defer close(done) b.ResetTimer() for range take(done, repeat(done, "a"), b.N) { } } ``` 这会输出: | BenchmarkGeneric-4 | 1000000 | 2266 ns/op | | --- | --- | --- | | BenchmarkTyped-4 | 1000000 | 1181 ns/op | | PASS ok | command-line-arguments | 3.486s | 可以看到,特定类型的速度是接口类型的2倍。一般来说,管道上的限制因素将是生成器,或者是密集计算的某个阶段。如果生成器不像repeat和repeatFn生成器那样从内存中创建流,则可能会受I/O限制。从磁盘或网络读取数据可能会超出此处显示的性能开销。 那么,如果真是在计算上存在性能瓶颈,我们该怎么办?基于这种情况,让我们来讨论扇出扇入技术。 * * * * * 学识浅薄,错误在所难免。我是长风,欢迎来Golang中国的群(211938256)就本书提出修改意见。
';

构建管道的最佳实践

最后更新于:2022-04-02 06:51:11

通道非常适合在Go中构建管道,因为它们满足了我们所有的基本要求。它们可以接收并传递值,它们可以在并发中安全的使用,它们可以被遍历,而且它们被语言给予了完美的支持。让我们用通道将之前的例子改造一下: ``` generator := func(done <-chan interface{}, integers ...int) <-chan int { intStream := make(chan int) go func() { defer close(intStream) for _, i := range integers { select { case <-done: return case intStream <- i: } } }() return intStream } multiply := func(done <-chan interface{}, intStream <-chan int, multiplier int) <-chan int { multipliedStream := make(chan int) go func() { defer close(multipliedStream) for i := range intStream { select { case <-done: return case multipliedStream <- i * multiplier: } } }() return multipliedStream } add := func(done <-chan interface{}, intStream <-chan int, additive int) <-chan int { addedStream := make(chan int) go func() { defer close(addedStream) for i := range intStream { select { case <-done: return case addedStream <- i + additive: } } }() return addedStream } done := make(chan interface{}) defer close(done) intStream := generator(done, 1, 2, 3, 4) pipeline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2) for v := range pipeline { fmt.Println(v) } ``` 这会输出: ``` 6 10 14 18 ``` 看起来我们得到了期望的输出结果,但代价是代码更多了。先,我们来看看我们写的是什么。 现在有三个函数,而不是两个。他们都看起来像是在内部开启一个通道,并使用我们在“防止Goroutine泄漏”中建立的模式,通过一个done通道表示该通道应该退出。他们都看起来像返回通道,其中一些看起来像他们也采用了额外的通道。让我们开始进一步分解: ``` done := make(chan interface{}) defer close(done) ``` 我们的程序首先创建了done通道,并调用close通过defer延迟执行。正如前面所讨论的那样,这可以确保我们的程序干净地退出,而不泄漏goroutines。没有什么新鲜的。接下来,我们来看看函数generator: ``` generator := func(done <-chan interface{}, integers ...int) <-chan int { intStream := make(chan int) go func() { defer close(intStream) for _, i := range integers { select { case <-done: return case intStream <- i: } } }() return intStream } // ... intStream := generator(done, 1, 2, 3, 4) ``` generator接收整数类型的切片,构造整数类型的通道,启动一个goroutine并返回构造的通道。然后,在创建的goroutine通道上发送切片的值。 请注意,通道上的发送与done通道上的选择共享一条select语句。这是我们在“防止Goroutine泄漏”中建立的模式。 简而言之,generator函数将一组离散值转换为一个通道上的数据流。这种操作的函数我们称之为生成器。在使用管道时,你会经常看到这一点,因为在管道开始时,你总是会有一些需要转换为通道的数据。我们将稍微介绍一些有趣的生成器的几个例子,但我们先来完成对这个程序的分析。 接下来,我们构建管道: ``` pipeline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2) ``` 这与本节之前的流程相同:对于一串数字,我们将它们乘以2,加1,然后将结果乘以2。这条管道与我们前面例子中使用函数的管道相似,但它在很重要的方面有所不同。 首先,我们正在使用通道。 这是显而易见的,因为它允许两件事:在我们的管道的末尾,可以使用range语句来提取值,并且在每个阶段我们可以安全地并发执行,因为我们的输入和输出在并发上下文中是安全的。 这给我们带来了第二个区别:管道的每个阶段都在同时执行。 这意味着任何阶段只需要等待其输入,并且能够发送其输出。 事实证明,这会产生巨大的影响,我们将在“扇出,扇入”一节中发现,但现在我们可以简单地注意到它允许各阶段相互独立地执行一段时间。 最后,在我们的例子中,我们对这个管道进行了遍历取值: ``` for v := range pipeline { fmt.Println(v) } ``` 下面是一个表格,演示系统中的每个值如何进入每个通道,以及通道何时关闭。 :-: ![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/7b8189f85ca510d5e09049baca7b6125_681x391.jpg) 让我们更仔细地研究一下这个模式来标示goroutines退出。当处理多个相互依赖的goroutines时,这种模式如何起作用? 如果我们在程序完成执行之前在完成的通道上调用close,会发生什么情况? 要回答这些问题,再来看看管道是构建的这一行: ``` pipeline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2) ``` 管道的各阶段通过两种方式连接在一起:通过默认的done通道,和被传递给后续阶段的通道。换句话说,multiply函数创建的通道被传递给add函数。让我们重新审视前面的表格,并在完成之前,关闭done通道,看看会发生什么: :-: ![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/68e3c8df3f3d759402361da69b91e926_676x362.jpg) 看到关闭done通道是如何影响到管道的了么?这是通过管道每个阶段的两件事情实现的: * 对传入的频道进行遍历。当输入通道关闭时,遍历操作将退出。 * 发送操作与done通道共享select语句。 无论流水线阶段处于等待数据通道的状态,还是处在等待发送通道关闭的状态,都会强制管道各阶段终止。 这里有一个复发关系。在管道开始时,我们已经确定必须将传入的切片值转换为通道。在这个过程中有两点必须是可抢占的: * 在生成器通道上创建值。 * 在其频道上发送离散值。 在我们的例子中,在生成器函数中,离散值是通过遍历切片生成的,它足够快,不需要被抢占。 第二个是通过我们的select语句和done通道处理的,它确保发生器即使被阻塞试图写入intStream也是可抢占的。 在管道的另一端,同样我们可以确保最终阶段的可抢占性。因为我们正在操作的通道在抢占时会被关闭,所以当这种情况发生时,通道将会中断。 最后阶段是可抢占的,因为我们依赖的流是可抢占的。 在管道开始和结束之间,代码总是在一个通道上遍历,并在包含done通道的select语句内的另一个通道上发送。 如果某个阶段在传入通道检索到值时被阻塞,则该通道关闭时它将变为未阻塞状态。 如果某个阶段在发送值时被阻塞,则由于select语句而可抢占。 因此,我们的整个管道始终可以通过关闭done通道来抢占。 * * * * * 学识浅薄,错误在所难免。我是长风,欢迎来Golang中国的群(211938256)就本书提出修改意见。
';

管道

最后更新于:2022-04-02 06:51:09

当你编写一个程序时,你可能不会坐下来写一个长函数——至少我希望你不要! 你会以函数,结构体,方法等形式构造抽象。为什么要这样做? 部分是为了抽象出无关的细节,另一部分是为了能够在不影响其他区域的情况下处理某个代码区域。你没有必要改变整个系统,当发现自己必须调整多个区域才能做出一个合乎逻辑的改变时,说明该系统的抽象实在是很糟糕。 pipline,又名管道,或者叫流水线,是你可以用来在系统中形成抽象的另一种工具。特别是当你的程序需要处理流或批处理数据时,它是一个非常强大的工具。管道这个词被认为是在1856年首次使用的,指将液体从一个地方输送到另一个地方的一系列管道。计算机科学借用了这个术语,因为我们也在从一个地方向另一个地方传输某些东西:数据。管道是个系统,它将一系列数据输入,执行操作并将数据传回。我们称这些操作都是管道的一个阶段。 通过使用管道,你可以分离每个阶段的关注点,这提供了许多好处。你可以独立于彼此修改模块,你可以混合搭配模块的组合方式,而无需修改模块,你可以让每个模块同时处理到上游或下游模块,并且可以扇出或限制你的部分管道。 我们将在"扇出,扇入”一节中介绍扇出,我们将在第5章介绍速率限制。你现在不必担心这些奇怪的术语; 让我们从最简单的开始,尝试构建一个管道。 如前所述,一个阶段只是类似于执行将数据输入,对其进行转换并将数据发回这样的功能。 这是一个可以被视为管道某阶段的例子: ``` multiply := func(values []int, multiplier int) []int { multipliedValues := make([]int, len(values)) for i, v := range values { multipliedValues[i] = v * multiplier } return multipliedValues } ``` 这个函数用取整数切片,循环遍历它们,然后返回一个新的切片。看起来很无聊的功能,对吧? 让我们创建管道的另一个阶段: ``` add := func(values []int, additive int) []int { addedValues := make([]int, len(values)) for i, v := range values { addedValues[i] = v + additive } return addedValues } ``` 跟上个函数类似,只不过把乘法变成了加法。接下来,让我们尝试将它们合并: ``` ints := []int{1, 2, 3, 4} for _, v := range add(multiply(ints, 2), 1) { fmt.Println(v) } ``` 这会输出: ``` 3 5 7 9 ``` 看看我们是如何将他们结合起来的。这些函数就像你每天工作使用的函数一样,但是我们可以将它们组合起来形成一个流水线。 那么我们怎么定义管道的“阶段”呢? * 一个阶段消耗并返回相同的类型。 * 一个阶段必须通过语言来体现,以便它可以被传递。 Go的函数已经实现并很好地适用于这个目的。 那些熟悉函数式编程的人可能会点头并思考像高阶函数和monad这样的术语。 事实上,管道的各阶段与函数式编程密切相关,可以被认为是monad的一个子集。 我不会在这里明确地讨论Monad或函数式编程,但它们本身就是有趣的主题,在尝试理解管道时,对这两个主题的工作知识虽然没有必要,但是对于加强理解是有用的。 在这里,我们的add和multiply满足管道的阶段属性:它们都消耗int切片并返回int切片,并且因为Go支持函数传递,所以我们可以传递add和multiply。 这些属性很有趣:在不改变阶段本身的情况下,我们可以很容易地将我们的阶段结合到更高层次。 例如,如果我们现在想为管道添加一个额外的阶段:乘以2,只需将我们以前的管道包装在一个新的阶段内,就像这样: ``` ints := []int{1, 2, 3, 4} for _, v := range multiply(add(multiply(ints, 2), 1), 2) { fmt.Println(v) } ``` 这会输出: ``` 6 10 14 18 ``` 注意我们是如何做到这一点的。也许你已经开始看到使用管道模式的好处了。 当然,我们也可以在程序上编写此代码: ``` ints := []int{1, 2, 3, 4} for _, v := range ints { fmt.Println(2 * (v*2 + 1)) } ``` 虽然这看起来简单得多,但正如我们接下来会看到的,程序在处理数据流时不会提供与管道相同的好处。 请注意每个阶段如何获取数据切片并返回数据的。这些阶段的行为我们称为批处理。这意味着它们一次性对大块数据进行操作,而不是一次一个单独进行。还有另一种类型的管道,模块每次仅接收和返回单个元素。 批处理和流处理各有优点和缺点,我们稍微讨论一下。现在,请注意,为使原始数据保持不变,每个阶段都必须创建一个等长的新切片来存储其计算结果。这意味着我们的程序在任何时候的内存占用量都是我们发送到管道开始处的切片大小的两倍。 让我们转换为面向流操作,看看会有什么效果: ``` multiply := func(value, multiplier int) int { return value * multiplier } add := func(value, additive int) int { return value + additive } ints := []int{1, 2, 3, 4} for _, v := range ints { fmt.Println(multiply(add(multiply(v, 2), 1), 2)) } ``` 这会输出: ``` 6 10 14 18 ``` 每个阶段都接收并返回一个值,程序的内存占用空间将回落到管道输入数据的大小。但是我们不得不将管道放入到for循环的体内,这让我们的操作变“重”了。这不仅限制了我们对管道的重复使用,而且我们稍后会在本节中看到,这也限制了我们的扩展能力。 实际上,我们因为循环而在每次迭代中实例化我们的管道。尽管进行函数调用耗费的资源很少,但函数的调用次数确实增加了。那么如果涉及到并发性又如何?我之前说过,使用管道的好处之一是能够同时处理数据的各个阶段,并且我提到了一些关于扇出的内容。这些我们在接下来会进一步了解到。 我会扩展add和multiply来介绍这些概念。现在开始学习在Go中构建管道的最佳实践的时候了,先从并发原语通道开始。 * * * * * 学识浅薄,错误在所难免。我是长风,欢迎来Golang中国的群(211938256)就本书提出修改意见。
';