Go 并发模式:管道和撤销
Sameer Ajmani
13 March 2014
介绍
Go的并发原语使构建流数据管道变得容易,这些流水数据管道可有效利用I/O和多个CPU。本文介绍了此类管道的示例,重点介绍了操作失败时出现的细微差别,并介绍了干净处理错误的技术。
什么是管道?
Go中没有管道的正式定义。它只是多种并发程序中的一种。非正式地,管道是由 channel 连接的一系列的 阶段,其中每个阶段都是一组运行相同函数的 goroutine。在每个阶段,goroutines
- 通过 入站(inbound) channel 从 上游(upstream) 接收数据
- 在这些数据上执行一些函数,常常产生新的值
- 通过 出站(outbound) channel 发送 下游(downstream) 数据
除了第一和最后阶段(分别只有出站或入站的 channel)之外,每个阶段都有任意数量的入站和出站 channel。第一阶段有时称为 来源 或 生产者; 最后阶段是 接收器 或 消费者。
我们将从一个简单的管道例子开始,以解释这些思想和技术。稍后,我们将提供一个更现实的示例。
平方数
考虑具有三个阶段的管道。
第一个阶段 gen
是一个函数,它将一个整数列表里的数发送到一个整数的 channel 中。 gen
函数启动一个goroutine,在 channel 上发送整数,并在发送所有值后关闭 channel:
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
第二阶段 sq
从 channel 接收整数并返回一个 channel,该 channel 发出每个接收到的整数的平方。在入站 channel 关闭后,此阶段已将所有值发送到下游,它将关闭出站通道:
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
main
函数建立一个管道并运行最后的阶段:它从第二阶段接收值并打印每一个,直到 channel 关闭:
func main() {
// 建立管道.
c := gen(2, 3)
out := sq(c)
// Consume the output.
fmt.Println(<-out) // 4
fmt.Println(<-out) // 9
}
由于 sq
有相同类型的进站和出站通道,我们能够任意次数组合它。我们可以像其他阶段一样,重写 main
函数实现一个range 循环:
func main() {
// 建立管道并打印输出.
for n := range sq(sq(gen(2, 3))) {
fmt.Println(n) // 16 then 81
}
}
扇出,扇入(Fan-out, fan-in)
多个函数能够从相同的 channel 读取直到 channel 关闭,这称之为 fan-out。这样提供了一种分配任务的方法,可以并行化使用 CPU 和 I/O。
通过在一个单独的 channel 上进行多路复用多个 input channel, 这样一个函数能够从多个input channel 读取并处理,直到所有 input channel 全部关闭为止。当单独的 channel 关闭,所有input channel 也关闭。这称之为 fan-in。
我们能够改变我们的管道来运行两个 sq
的实例,每一个都从相同的input channel 读取。我们介绍一个新函数 merge , 来 fan in 结果:
func main() {
in := gen(2, 3)
// 通过两个都在 in 读取的 goroutinue 给 sq 分配工作
c1 := sq(in)
c2 := sq(in)
// 合并从 c1和c2 的 output 并消耗掉。
for n := range merge(c1, c2) {
fmt.Println(n) // 4 then 9, or 9 then 4
}
}
merge
函数通过为每个入站 channel 启动一个 goroutine,复制里面的值发送给单一的出站 channel,来实现把一个 channel 列表转化为一个单独的 channel。一旦所有 output
goroutine开启后,在该 channel 上的所有发送完成后,merge
将开启另一个 goroutine 关闭出站 channel。
向关闭的 channel 发送数据会 panic,因此务必在调用 close 之前确保所有发送都已完成。 sync.WaitGroup
类型提供了一种安排这种同步的简单方法:
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// 为cs 中每一个 input channel 启动一个 output goroutine。
// output 函数从 c 中复制值到 out 直到 c 关闭,然后调用 wg.Done。
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// 当所有的 out goroutine 结束,开启一个 goroutine 来关闭 out。
// 这之前必须先调用 wg.Add。
go func() {
wg.Wait()
close(out)
}()
return out
}
止损
我们的管道函数有一种模式:
- 在某些阶段,当所有的发送操作完成,关闭其出站 channel。
- 在某些阶段,一直从入站 channel 接收值,直到这些 channel 关闭为止。
这种模式允许将每个接收阶段写为一个 range
循环,并确保一旦将所有值成功的发送到下游,所有 goroutine 都将退出。
但是在实际的管道中,并不总是接收所有入站值。有时这是设计使然:接收器可能只需要值的子集即可。更多时候,由于入站值表示较早阶段中的错误,因此更早退出。在任何一种情况下,接收器都不必等待剩余的值到达,并且我们希望早期停止产生后期不需要的值。
在我们的示例管道中,如果一个阶段无法消耗掉所有的入站值,则尝试发送这些值的 goroutine 将无限期地阻塞:
// 从 output 中消耗掉第一个值
out := merge(c1, c2)
fmt.Println(<-out) // 4 or 9
return
// 由于我们没有从 out 接收到第二个值,
// 其中一个output goroutine 挂起并尝试发送。
}
这里有个资源泄漏:goroutines 消耗内存和运行时资源,goroutine 堆栈中的堆引用阻止数据被垃圾回收。 Goroutine 也不会被垃圾收集;他们必须自己退出。
即使下游阶段无法接收所有入站值,我们也需要安排管道的上游阶段退出。 一种方法是将出站 channel 改为具有含有缓冲区的 channel 。缓冲区可以容纳固定数量的值;如果缓冲区中有空间,发送操作将立即完成:
c := make(chan int, 2) // 2个大小的缓冲区
c <- 1 // 立即成功
c <- 2 // 立即成功
c <- 3 // 阻塞,直到其他的 goroutine 执行 <-c 并接收 1
当 channel 创建时已知要被发送的值的数量时,缓冲区可以简化代码。例如,我们可以重写 gen
以将整数列表复制到缓冲 channel 中并避免创建新的goroutine:
func gen(nums ...int) <-chan int {
out := make(chan int, len(nums))
for _, n := range nums {
out <- n
}
close(out)
return out
}
返回管道中阻塞的 goroutine,我们可以考虑将缓冲区添加到 merge
返回的出站 channel 中:
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int, 1) // 对于未读的 input,要有够的空间
// ... 余下代码的未变 ...
虽然这可以修复该程序中阻塞的 goroutine,但这是错误的代码。此处缓冲区的大小选择为 1 取决于知道 merge
要接收的值的数量以及下游阶段将消耗的值的数量。 这很脆弱:如果我们向 gen
传递一个额外的值,或者如果下游阶段读取的值更少,那么我们将会再次阻塞 goroutines。
相反,我们需要为下游阶段提供一种方法,以向发送方指示他们将停止接受输入。
显式取消
当 main
函数在没有从 out
接收到所有值的情况下就决定退出时,它必须告诉上游阶段的 goroutines 丢弃它们尝试发送的值。它是通过在称为 done
的 channel 上发送值来实现的。它发送两个值,因为有两个潜在的阻塞的发送者:
func main() {
in := gen(2, 3)
// 通过两个都在 in 读取的 goroutinue 给 sq 分配工作。
c1 := sq(in)
c2 := sq(in)
// 从 output 消耗掉第一个值
done := make(chan struct{}, 2)
out := merge(done, c1, c2)
fmt.Println(<-out) // 4 or 9
// 告诉剩余的发送者我们将要离开
done <- struct{}{}
done <- struct{}{}
}
发送goroutine会将其发送操作替换为 select
语句,该语句在 out
上发送发生时,或从 done
接收到值时继续。 done
的值类型为空结构,因为该值无关紧要:它只是接收事件,指示应放弃在 out
上发送 。output
goroutine 继续在其入站 channel c
上循环,因此上游阶段不会被阻塞。(稍后我们将讨论如何让此循环尽早返回。)
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// 为cs 中的每个 input channel 开启一个 output goroutine。
// output 从 c 中复制值到 out 直到 c 关闭或者从 done 接收到一个值,output 调用 wg.Done。
output := func(c <-chan int) {
for n := range c {
select {
case out <- n:
case <-done:
}
}
wg.Done()
}
// ... 余下代码的未变 ...
这种方法存在一个问题:each 下游接收者都需要知道可能阻塞的上游发送者的数量,并安排在早期返回时向这些发送者发出信号。 跟踪这些计数是乏味且容易出错的。
我们需要一种方法来告知未知数量的 goroutine,以停止向下游发送其值。 在 Go 中,我们可以通过关闭 channel 来完成此操作,因为在关闭的 channel上的接收操作始终可以立即执...
这意味着 main
函数可以简单地通过关闭 done
通道来解除对所有发件人的阻塞。 此关闭实际上是向发送者的广播信号。 我们扩展 each 我们的管道函数以接收 done
作为参数,并通过 defer
语句安排关闭,以便所有来自 main
函数的返回路径都将指示管道阶段退出了。
func main() {
// 创建一个 done channel 在整个管道中共享,
// 当管道退出时,关闭 channel,作为对所有 goroutine
// 发出的我们要退出的信号。
done := make(chan struct{})
defer close(done)
in := gen(done, 2, 3)
// 通过两个都在 in 读取的 goroutinue 给 sq 分配工作。
c1 := sq(done, in)
c2 := sq(done, in)
// 从 output 消耗掉第一个值
out := merge(done, c1, c2)
fmt.Println(<-out) // 4 or 9
// done 通过延时调用来关闭。
}
现在,一旦 done
关闭,我们的每个管道阶段都可以自由返回。merge
中的 output
例程可以返回而不会耗尽其入站 channel,因为它知道上游发送者 sq
在关闭完成后将停止尝试发送。通过 defer
语句, output
确保在所有返回路径上 wg.Done
被调用:
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// 为cs 中的每个 input channel 开启一个 output goroutine。
// output 从 c 中复制值到 out 直到 c 关闭或者从 done 接收到一个值,output 调用 wg.Done。
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-done:
return
}
}
}
// ... 余下代码的未变 ...
同样,sq
可以在 done
关闭后立即返回。 sq
通过 defer
语句确保在所有返回路径上关闭其 out
channel:
func sq(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-done:
return
}
}
}()
return out
}
以下是管道构建的指导原则:
- 在某些阶段,完成所有发送操作后,将关闭其出站 channel。
- 在某些阶段,将继续从入站 channel 接收值,直到这些 channel 关闭或发送方被阻塞为止。
管道通过确保有足够的缓冲区来存储所有已发送的值,或者在接收方可能放弃 channel 时显式发信号通知发送方,从而解除发送方的阻塞。
计算一棵树的摘要
让我们考虑一个更现实的管道。
MD5是一种消息摘要算法,可用作文件校验和。 命令行实用程序 md5sum
打印摘要值以获取文件列表。
% md5sum *.go
d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go
ee869afd31f83cbb2d10ee81b2b831dc parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go
我们的示例程序就像 md5sum
一样,但是取一个目录作为参数,并打印该目录下每个常规文件的摘要值,并按路径名排序。
% go run serial.go .
d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go
ee869afd31f83cbb2d10ee81b2b831dc parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go
我们程序的 main 函数调用了一个辅助函数 MD5All
,该函数返回从路径名到摘要值的map,然后对结果进行排序和打印:
func main() {
// 计算特定目录下的所有文件的 MD5 sum,
// 然后根据路径名排序并打印
m, err := MD5All(os.Args[1])
if err != nil {
fmt.Println(err)
return
}
var paths []string
for path := range m {
paths = append(paths, path)
}
sort.Strings(paths)
for _, path := range paths {
fmt.Printf("%x %s.", m[path], path)
}
}
函数 MD5All
是我们讨论的重点。在 serial.go中,实现不使用并发性,只是在遍历树时,简单的读取并对每个文件计算校验和。
// MD5All 读取以 root 为根的文件树中的所有文件,并返回一个
// 从文件路径到文件内容的MD5 sum 的 map。如果遍历目录失败
// 或者任何读操作失败,MD5All 返回一个 error.
func MD5All(root string) (map[string][md5.Size]byte, error) {
m := make(map[string][md5.Size]byte)
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
data, err := ioutil.ReadFile(path)
if err != nil {
return err
}
m[path] = md5.Sum(data)
return nil
})
if err != nil {
return nil, err
}
return m, nil
}
并行计算摘要
在 parallel.go中,我们将 MD5All
分成两个阶段的管道。第一阶段, sumFiles
遍历树,在一个新的 goroutinue 中计算每个文件的摘要,并把结果发送到使用 result
类型值的 channel中:
type result struct {
path string
sum [md5.Size]byte
err error
}
sumFiles
返回两个channel:一个用于 results
,另一个用于由 filepath.Walk
返回的错误。walk 函数启动一个新的 goroutine 来处理每个常规文件,然后检查 done
。 如果done
已关闭,walk 将立即停止:
func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
// 为每个常规文件,启动一个 goroutine 来计算文件的 sum并把结果发送
// 给 c。并发送 walk 的结果给 errc。
c := make(chan result)
errc := make(chan error, 1)
go func() {
var wg sync.WaitGroup
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
wg.Add(1)
go func() {
data, err := ioutil.ReadFile(path)
select {
case c <- result{path, md5.Sum(data), err}:
case <-done:
}
wg.Done()
}()
// 如果 done 关闭,则终止 walk。
select {
case <-done:
return errors.New("walk canceled")
default:
return nil
}
})
// Walk 返回,所有wg.Add 调用都已完成。当所有发送完成后
// 启动一个 goroutine 来关闭 c 。
go func() {
wg.Wait()
close(c)
}()
// 这里不需要 select,因为 errc 是 buffered channel。
errc <- err
}()
return c, errc
}
MD5All
从 c
中接收摘要值。在发生错误时尽早返回,通过 defer
关闭 done
:
func MD5All(root string) (map[string][md5.Size]byte, error) {
// MD5All 返回时,关闭 done channel ;它可能
// 会在从 c 和 errc 接收所有值之前这样做。
done := make(chan struct{})
defer close(done)
c, errc := sumFiles(done, root)
m := make(map[string][md5.Size]byte)
for r := range c {
if r.err != nil {
return nil, r.err
}
m[r.path] = r.sum
}
if err := <-errc; err != nil {
return nil, err
}
return m, nil
}
有界并行
在parallel.go的 MD5All
实现中,为每个文件启动一个新的 goroutine 。当一个目录中有许多大型文件,这可能会分配比计算机上可用内存更多的内存。
我们可以通过限制并行读取的文件数来限制这些分配。在 bounded.go 中,我们通过创建固定数量的goroutine 来读取文件来实现。 我们的管道现在分为三个阶段:遍历树,读取和计算文件摘要,以及收集这些摘要。
在第一阶段, walkFiles
, 给出树中常规文件的路径:
func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
paths := make(chan string)
errc := make(chan error, 1)
go func() {
// 当 Walk 返回后,关闭 paths channel。
defer close(paths)
// 这里的 send 不需要 select,因为 errc 是 buffered channel。
errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
select {
case paths <- path:
case <-done:
return errors.New("walk canceled")
}
return nil
})
}()
return paths, errc
}
中间的阶段启动固定数量的 digester
goroutines 从 paths
中接收文件名字并把 results
发送给 channel c
:
func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
for path := range paths {
data, err := ioutil.ReadFile(path)
select {
case c <- result{path, md5.Sum(data), err}:
case <-done:
return
}
}
}
与我们之前的示例不同,由于多个 goroutine 在共享 channel 上发送消息,因此 digester
不会关闭其 output channel 。取而代之的是,在 MD5All
中的代码安排所有 digesters
完成后,关闭这个channel :
// 开启固定数量的 goroutines 来读取和计算文件摘要。
c := make(chan result)
var wg sync.WaitGroup
const numDigesters = 20
wg.Add(numDigesters)
for i := 0; i < numDigesters; i++ {
go func() {
digester(done, paths, c)
wg.Done()
}()
}
go func() {
wg.Wait()
close(c)
}()
相反,我们可以让每个 digester 创建并返回其自己的 output channel,但是随后我们将需要其他 goroutine 来 fan-in 结果。
最后阶段从 c
接收所有 results
,然后从 errc
检查错误。这项检查不能更早进行,因为在此之前,walkFiles
可能向下游发送值会阻塞:
m := make(map[string][md5.Size]byte)
for r := range c {
if r.err != nil {
return nil, r.err
}
m[r.path] = r.sum
}
// 检查 Walk 是否失败。
if err := <-errc; err != nil {
return nil, err
}
return m, nil
}
总结
本文介绍了用于在 Go 中构造流数据管道的技术。处理此类管道中的故障非常棘手,因为管道中的每个阶段都可能阻塞尝试向下游发送值,并且下游阶段可能不再关心传入的数据。我们展示了关闭 channel 如何向管道开始的所有 goroutine 广播 "done" 信号,并定义了正确构建管道的准则。
进一步阅读:
- Go Concurrency Patterns (video) 介绍了 Go 并发原语的基础知识以及几种应用它们的方法。
- Advanced Go Concurrency Patterns (video) 涵盖了 Go 原语的更复杂用法,尤其是
select
。 - Douglas McIlroy's paper Squinting at Power Series 展示了类似 Go 的并发如何为复杂的计算提供优雅的支持。
本译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接
我们的翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。
推荐文章: