Go 博客
Go 并发模式:管道和取消
引言
Go 的并发原语使得构建能够有效利用 I/O 和多个 CPU 的流式数据管道变得容易。本文将介绍此类管道的示例,重点介绍操作失败时出现的细微之处,并介绍干净地处理故障的技巧。
什么是管道?
Go 中没有管道的正式定义;它只是众多并发程序类型中的一种。非正式地讲,管道是由通道连接的一系列阶段,其中每个阶段都是运行相同函数的 goroutine 组。在每个阶段,goroutine
- 通过入站通道从上游接收值
- 对该数据执行某些函数,通常会生成新值
- 通过出站通道将值发送到*下游*
每个阶段都有任意数量的入站和出站通道,除了第一个和最后一个阶段,它们分别只有出站或入站通道。第一个阶段有时被称为源或生产者;最后一个阶段,汇或消费者。
我们将从一个简单的示例管道开始来解释这些概念和技术。稍后,我们将介绍一个更真实的示例。
平方数
考虑一个具有三个阶段的管道。
第一个阶段 `gen` 是一个将整数列表转换为通道的函数,该通道发出列表中的整数。`gen` 函数启动一个 goroutine,该 goroutine 将整数发送到通道,并在发送完所有值后关闭通道
func gen(nums ...int) <-chan int { out := make(chan int) go func() { for _, n := range nums { out <- n } close(out) }() return out }
第二个阶段 `sq` 从通道接收整数,并返回一个通道,该通道发出每个接收到的整数的平方。在入站通道关闭并且此阶段已将所有值发送到下游后,它会关闭出站通道
func sq(in <-chan int) <-chan int { out := make(chan int) go func() { for n := range in { out <- n * n } close(out) }() return out }
`main` 函数设置管道并运行最后一个阶段:它从第二个阶段接收值并打印每个值,直到通道关闭
func main() { // Set up the pipeline. c := gen(2, 3) out := sq(c) // Consume the output. fmt.Println(<-out) // 4 fmt.Println(<-out) // 9 }
由于 `sq` 的入站和出站通道类型相同,因此我们可以多次组合它。我们还可以像其他阶段一样,将 `main` 重写为 range 循环
func main() { // Set up the pipeline and consume the output. for n := range sq(sq(gen(2, 3))) { fmt.Println(n) // 16 then 81 } }
扇出,扇入
多个函数可以从同一个通道读取,直到该通道关闭;这称为扇出。这提供了一种在一组工作进程之间分发工作的方法,以并行化 CPU 使用和 I/O。
一个函数可以从多个输入读取,并继续处理直到所有输入都关闭,方法是将输入通道多路复用到单个通道上,当所有输入都关闭时,该通道就会关闭。这称为扇入。
我们可以更改管道以运行两个 `sq` 实例,每个实例从同一个输入通道读取。我们引入了一个新函数 _merge_ 来将结果扇入
func main() { in := gen(2, 3) // Distribute the sq work across two goroutines that both read from in. c1 := sq(in) c2 := sq(in) // Consume the merged output from c1 and c2. for n := range merge(c1, c2) { fmt.Println(n) // 4 then 9, or 9 then 4 } }
`merge` 函数通过为每个入站通道启动一个 goroutine,将通道列表转换为单个通道,该 goroutine 将值复制到唯一的出站通道。启动所有 `output` goroutine 后,`merge` 会启动一个额外的 goroutine,在完成对该通道的所有发送后关闭出站通道。
在已关闭的通道上发送会引起恐慌,因此在调用 close 之前确保所有发送都已完成很重要。`sync.WaitGroup
` 类型提供了一种简单的方法来安排此同步
func merge(cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int) // Start an output goroutine for each input channel in cs. output // copies values from c to out until c is closed, then calls wg.Done. output := func(c <-chan int) { for n := range c { out <- n } wg.Done() } wg.Add(len(cs)) for _, c := range cs { go output(c) } // Start a goroutine to close out once all the output goroutines are // done. This must start after the wg.Add call. go func() { wg.Wait() close(out) }() return out }
提前终止
我们的管道函数遵循一种模式
- 阶段在完成所有发送操作后关闭其出站通道。
- 阶段一直从入站通道接收值,直到这些通道关闭。*
这种模式允许将每个接收阶段编写为 `range` 循环,并确保所有 goroutine 在所有值成功发送到下游后退出。
但在实际管道中,阶段并不总是接收所有入站值。有时是故意的:接收方可能只需要一部分值才能继续。更常见的是,由于入站值代表早期阶段的错误,因此阶段会提前退出。在这两种情况下,接收方都不应该等待剩余的值到达,并且我们希望早期阶段停止生成下游不需要的值。
在我们的示例管道中,如果一个阶段未能消耗所有入站值,则尝试发送这些值的 goroutine 将无限期阻塞
// Consume the first value from the output. out := merge(c1, c2) fmt.Println(<-out) // 4 or 9 return // Since we didn't receive the second value from out, // one of the output goroutines is hung attempting to send it. }
这是一个资源泄漏:goroutine 会消耗内存和运行时资源,goroutine 堆栈中的堆引用会阻止数据被垃圾回收。goroutine 不会被垃圾回收;它们必须自行退出。
我们需要安排我们的管道的上游阶段退出,即使下游阶段未能接收所有入站值。一种方法是更改出站通道以具有缓冲区。缓冲区可以容纳固定数量的值;如果缓冲区中有空间,发送操作会立即完成
c := make(chan int, 2) // buffer size 2
c <- 1 // succeeds immediately
c <- 2 // succeeds immediately
c <- 3 // blocks until another goroutine does <-c and receives 1
当要在通道创建时知道要发送的值的数量时,缓冲区可以简化代码。例如,我们可以重写 `gen` 来将整数列表复制到缓冲通道中,并避免创建新的 goroutine
func gen(nums ...int) <-chan int { out := make(chan int, len(nums)) for _, n := range nums { out <- n } close(out) return out }
回到我们管道中被阻塞的 goroutine,我们可以考虑为 `merge` 返回的出站通道添加缓冲区
func merge(cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int, 1) // enough space for the unread inputs // ... the rest is unchanged ...
虽然这解决了此程序中被阻塞的 goroutine,但这是糟糕的代码。此处缓冲区大小 1 的选择取决于知道 `merge` 将接收到的值的数量以及下游阶段将消耗的值的数量。这很脆弱:如果我们向 `gen` 传递一个额外的值,或者下游阶段读取的值少于该值,我们将再次遇到被阻塞的 goroutine。
相反,我们需要提供一种方法,让下游阶段向发送者指示它们将停止接受输入。
显式取消
当 `main` 决定在未接收完 `out` 的所有值的情况下退出时,它必须告知上游阶段的 goroutine 放弃它们正在尝试发送的值。它通过在名为 `done` 的通道上发送值来做到这一点。它发送两个值,因为可能存在两个被阻塞的发送者
func main() { in := gen(2, 3) // Distribute the sq work across two goroutines that both read from in. c1 := sq(in) c2 := sq(in) // Consume the first value from output. done := make(chan struct{}, 2) out := merge(done, c1, c2) fmt.Println(<-out) // 4 or 9 // Tell the remaining senders we're leaving. done <- struct{}{} done <- struct{}{} }
发送 goroutine 将它们的发送操作替换为 `select` 语句,该语句会在 `out` 上的发送发生时或在它们从 `done` 接收到值时继续。`done` 的值类型是空结构,因为值无关紧要:接收事件表明 `out` 上的发送应被放弃。`output` goroutine 继续在它们的入站通道 `c` 上循环,因此上游阶段不会被阻塞。(我们稍后将讨论如何允许此循环提前返回。)
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int) // Start an output goroutine for each input channel in cs. output // copies values from c to out until c is closed or it receives a value // from done, then output calls wg.Done. output := func(c <-chan int) { for n := range c { select { case out <- n: case <-done: } } wg.Done() } // ... the rest is unchanged ...
这种方法有一个问题:*每个*下游接收者都需要知道可能被阻塞的上游发送者的数量,并安排在提前返回时向这些发送者发出信号。跟踪这些计数既繁琐又容易出错。
我们需要一种方法来告诉未知且无界数量的 goroutine 停止将其值发送到下游。在 Go 中,我们可以通过关闭通道来做到这一点,因为在已关闭的通道上执行接收操作始终可以立即进行,并产生元素类型的零值。
这意味着 `main` 仅通过关闭 `done` 通道就可以解除所有发送者的阻塞。此关闭有效地向发送者广播信号。我们将*每个*管道函数扩展为接受 `done` 作为参数,并通过 `defer` 语句安排关闭,以便从 `main` 返回的所有路径都会向管道阶段发出退出信号。
func main() { // Set up a done channel that's shared by the whole pipeline, // and close that channel when this pipeline exits, as a signal // for all the goroutines we started to exit. done := make(chan struct{}) defer close(done) in := gen(done, 2, 3) // Distribute the sq work across two goroutines that both read from in. c1 := sq(done, in) c2 := sq(done, in) // Consume the first value from output. out := merge(done, c1, c2) fmt.Println(<-out) // 4 or 9 // done will be closed by the deferred call. }
我们的每个管道阶段现在都可以尽快返回,只要 `done` 被关闭。`merge` 中的 `output` 例程可以在不耗尽其入站通道的情况下返回,因为它知道上游发送者 `sq` 将在 `done` 关闭时停止尝试发送。`output` 通过 `defer` 语句确保在所有返回路径上都调用 `wg.Done`
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int) // Start an output goroutine for each input channel in cs. output // copies values from c to out until c or done is closed, then calls // wg.Done. output := func(c <-chan int) { defer wg.Done() for n := range c { select { case out <- n: case <-done: return } } } // ... the rest is unchanged ...
类似地,`sq` 可以在 `done` 关闭后尽快返回。`sq` 通过 `defer` 语句确保其 `out` 通道在所有返回路径上都已关闭
func sq(done <-chan struct{}, in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { select { case out <- n * n: case <-done: return } } }() return out }
以下是构建管道的指南
- 阶段在完成所有发送操作后关闭其出站通道。
- 阶段一直从入站通道接收值,直到这些通道关闭或发送者被解除阻塞。
管道通过确保有足够的缓冲区容纳所有发送的值,或者通过在接收方可以放弃通道时显式地向发送者发出信号来解除发送者阻塞。
消化树
让我们考虑一个更真实的管道。
MD5 是一种非常有用的消息摘要算法,可用作文件校验和。`md5sum` 命令行实用程序会打印文件的摘要值。
% md5sum *.go
d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go
ee869afd31f83cbb2d10ee81b2b831dc parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go
我们的示例程序类似于 `md5sum`,但它接受单个目录作为参数,并打印该目录下的每个常规文件的摘要值,按路径名排序。
% go run serial.go .
d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go
ee869afd31f83cbb2d10ee81b2b831dc parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go
我们的程序的 `main` 函数调用一个名为 `MD5All` 的辅助函数,该函数返回一个从路径名到摘要值的映射,然后对结果进行排序和打印
func main() { // Calculate the MD5 sum of all files under the specified directory, // then print the results sorted by path name. m, err := MD5All(os.Args[1]) if err != nil { fmt.Println(err) return } var paths []string for path := range m { paths = append(paths, path) } sort.Strings(paths) for _, path := range paths { fmt.Printf("%x %s\n", m[path], path) } }
我们的讨论重点是 `MD5All` 函数。在serial.go中,该实现不使用并发,而是在遍历树时简单地读取和计算每个文件的摘要。
// MD5All reads all the files in the file tree rooted at root and returns a map // from file path to the MD5 sum of the file's contents. If the directory walk // fails or any read operation fails, MD5All returns an error. func MD5All(root string) (map[string][md5.Size]byte, error) { m := make(map[string][md5.Size]byte) err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.Mode().IsRegular() { return nil } data, err := ioutil.ReadFile(path) if err != nil { return err } m[path] = md5.Sum(data) return nil }) if err != nil { return nil, err } return m, nil }
并行消化
在parallel.go中,我们将 `MD5All` 分成一个两阶段管道。第一个阶段 `sumFiles` 遍历树,在新 goroutine 中计算每个文件的摘要,并将结果发送到类型为 `result` 的通道
type result struct { path string sum [md5.Size]byte err error }
`sumFiles` 返回两个通道:一个用于 `results`,另一个用于 `filepath.Walk` 返回的错误。walk 函数启动一个新的 goroutine 来处理每个常规文件,然后检查 `done`。如果 `done` 已关闭,则 walk 会立即停止
func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) { // For each regular file, start a goroutine that sums the file and sends // the result on c. Send the result of the walk on errc. c := make(chan result) errc := make(chan error, 1) go func() { var wg sync.WaitGroup err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.Mode().IsRegular() { return nil } wg.Add(1) go func() { data, err := ioutil.ReadFile(path) select { case c <- result{path, md5.Sum(data), err}: case <-done: } wg.Done() }() // Abort the walk if done is closed. select { case <-done: return errors.New("walk canceled") default: return nil } }) // Walk has returned, so all calls to wg.Add are done. Start a // goroutine to close c once all the sends are done. go func() { wg.Wait() close(c) }() // No select needed here, since errc is buffered. errc <- err }() return c, errc }
`MD5All` 从 `c` 接收摘要值。`MD5All` 在发生错误时提前返回,通过 `defer` 关闭 `done`
func MD5All(root string) (map[string][md5.Size]byte, error) { // MD5All closes the done channel when it returns; it may do so before // receiving all the values from c and errc. done := make(chan struct{}) defer close(done) c, errc := sumFiles(done, root) m := make(map[string][md5.Size]byte) for r := range c { if r.err != nil { return nil, r.err } m[r.path] = r.sum } if err := <-errc; err != nil { return nil, err } return m, nil }
有界并行
中的 `MD5All` 实现parallel.go为每个文件启动一个新的 goroutine。在包含许多大文件的目录中,这可能会分配比机器上可用内存更多的内存。
我们可以通过限制并行读取文件的数量来限制这些分配。在bounded.go中,我们通过创建固定数量的 goroutine 来读取文件来实现这一点。我们的管道现在有三个阶段:遍历树,读取和计算文件摘要,以及收集摘要。
第一个阶段 `walkFiles` 发出树中常规文件的路径
func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) { paths := make(chan string) errc := make(chan error, 1) go func() { // Close the paths channel after Walk returns. defer close(paths) // No select needed for this send, since errc is buffered. errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.Mode().IsRegular() { return nil } select { case paths <- path: case <-done: return errors.New("walk canceled") } return nil }) }() return paths, errc }
中间阶段启动固定数量的 `digester` goroutine,这些 goroutine 从 `paths` 接收文件名,并将 `results` 发送到通道 `c`
func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
for path := range paths {
data, err := ioutil.ReadFile(path)
select {
case c <- result{path, md5.Sum(data), err}:
case <-done:
return
}
}
}
与我们之前的示例不同,`digester` 不会关闭其输出通道,因为有多个 goroutine 在共享通道上发送。相反,`MD5All` 中的代码会在所有 `digester` 完成后安排通道关闭
// Start a fixed number of goroutines to read and digest files. c := make(chan result) var wg sync.WaitGroup const numDigesters = 20 wg.Add(numDigesters) for i := 0; i < numDigesters; i++ { go func() { digester(done, paths, c) wg.Done() }() } go func() { wg.Wait() close(c) }()
我们也可以让每个 digester 创建并返回自己的输出通道,但那样我们就需要额外的 goroutine 来扇入结果。
最后一个阶段从 `c` 接收所有 `results`,然后检查来自 `errc` 的错误。此检查不能提前进行,因为在此之前,`walkFiles` 可能会在发送值到下游时被阻塞
m := make(map[string][md5.Size]byte) for r := range c { if r.err != nil { return nil, r.err } m[r.path] = r.sum } // Check whether the Walk failed. if err := <-errc; err != nil { return nil, err } return m, nil }
结论
本文介绍了在 Go 中构建流式数据管道的技术。处理此类管道中的故障很棘手,因为管道中的每个阶段都可能因尝试将值发送到下游而被阻塞,而下游阶段可能不再关心传入的数据。我们展示了如何关闭通道可以向管道启动的所有 goroutine 广播“完成”信号,并定义了正确构建管道的准则。
延伸阅读
- Go 并发模式(视频)介绍了 Go 并发原语的基础知识和几种应用方法。
- Go 高级并发模式(视频)涵盖了 Go 原语更复杂的用法,特别是 `select`。
- Douglas McIlroy 的论文《Squinting at Power Series》展示了 Go 式并发如何为复杂计算提供优雅的支持。
下一篇文章:Go Gopher
上一篇文章:FOSDEM 2014 Go 会议
博客索引