Go 并发模式:管道和撤销

未匹配的标注

本文为官方 Go Blog 的中文翻译,详见 翻译说明

Sameer Ajmani

13 March 2014

介绍

Go的并发原语使构建流数据管道变得容易,这些流水数据管道可有效利用I/O和多个CPU。本文介绍了此类管道的示例,重点介绍了操作失败时出现的细微差别,并介绍了干净处理错误的技术。

什么是管道?

Go中没有管道的正式定义。它只是多种并发程序中的一种。非正式地,管道是由 channel 连接的一系列的 阶段,其中每个阶段都是一组运行相同函数的 goroutine。在每个阶段,goroutines

  • 通过 入站(inbound) channel 从 上游(upstream) 接收数据
  • 在这些数据上执行一些函数,常常产生新的值
  • 通过 出站(outbound) channel 发送 下游(downstream) 数据

除了第一和最后阶段(分别只有出站或入站的 channel)之外,每个阶段都有任意数量的入站和出站 channel。第一阶段有时称为 来源生产者; 最后阶段是 接收器消费者

我们将从一个简单的管道例子开始,以解释这些思想和技术。稍后,我们将提供一个更现实的示例。

平方数

考虑具有三个阶段的管道。

第一个阶段 gen 是一个函数,它将一个整数列表里的数发送到一个整数的 channel 中。 gen 函数启动一个goroutine,在 channel 上发送整数,并在发送所有值后关闭 channel:

func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

第二阶段 sq 从 channel 接收整数并返回一个 channel,该 channel 发出每个接收到的整数的平方。在入站 channel 关闭后,此阶段已将所有值发送到下游,它将关闭出站通道:

func sq(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

main 函数建立一个管道并运行最后的阶段:它从第二阶段接收值并打印每一个,直到 channel 关闭:

func main() {
    // 建立管道.
    c := gen(2, 3)
    out := sq(c)

    // Consume the output.
    fmt.Println(<-out) // 4
    fmt.Println(<-out) // 9
}

由于 sq 有相同类型的进站和出站通道,我们能够任意次数组合它。我们可以像其他阶段一样,重写 main 函数实现一个range 循环:

func main() {
    // 建立管道并打印输出.
    for n := range sq(sq(gen(2, 3))) {
        fmt.Println(n) // 16 then 81
    }
}

扇出,扇入(Fan-out, fan-in)

多个函数能够从相同的 channel 读取直到 channel 关闭,这称之为 fan-out。这样提供了一种分配任务的方法,可以并行化使用 CPU 和 I/O。

通过在一个单独的 channel 上进行多路复用多个 input channel, 这样一个函数能够从多个input channel 读取并处理,直到所有 input channel 全部关闭为止。当单独的 channel 关闭,所有input channel 也关闭。这称之为 fan-in

我们能够改变我们的管道来运行两个 sq 的实例,每一个都从相同的input channel 读取。我们介绍一个新函数 merge , 来 fan in 结果:

func main() {
    in := gen(2, 3)

    // 通过两个都在 in 读取的 goroutinue 给 sq 分配工作
    c1 := sq(in)
    c2 := sq(in)

    // 合并从 c1和c2 的 output 并消耗掉。
    for n := range merge(c1, c2) {
        fmt.Println(n) // 4 then 9, or 9 then 4
    }
}

merge 函数通过为每个入站 channel 启动一个 goroutine,复制里面的值发送给单一的出站 channel,来实现把一个 channel 列表转化为一个单独的 channel。一旦所有 output goroutine开启后,在该 channel 上的所有发送完成后,merge 将开启另一个 goroutine 关闭出站 channel。

向关闭的 channel 发送数据会 panic,因此务必在调用 close 之前确保所有发送都已完成。 sync.WaitGroup类型提供了一种安排这种同步的简单方法:

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // 为cs 中每一个 input channel 启动一个 output goroutine。  
    // output 函数从 c 中复制值到 out 直到 c 关闭,然后调用 wg.Done。 
    output := func(c <-chan int) {
        for n := range c {
            out <- n
        }
        wg.Done()
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    // 当所有的 out goroutine 结束,开启一个 goroutine 来关闭 out。
    // 这之前必须先调用 wg.Add。
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

止损

我们的管道函数有一种模式:

  • 在某些阶段,当所有的发送操作完成,关闭其出站 channel。
  • 在某些阶段,一直从入站 channel 接收值,直到这些 channel 关闭为止。

这种模式允许将每个接收阶段写为一个 range 循环,并确保一旦将所有值成功的发送到下游,所有 goroutine 都将退出。

但是在实际的管道中,并不总是接收所有入站值。有时这是设计使然:接收器可能只需要值的子集即可。更多时候,由于入站值表示较早阶段中的错误,因此更早退出。在任何一种情况下,接收器都不必等待剩余的值到达,并且我们希望早期停止产生后期不需要的值。

在我们的示例管道中,如果一个阶段无法消耗掉所有的入站值,则尝试发送这些值的 goroutine 将无限期地阻塞:

    // 从 output 中消耗掉第一个值
    out := merge(c1, c2)
    fmt.Println(<-out) // 4 or 9
    return

    // 由于我们没有从 out 接收到第二个值,
    // 其中一个output goroutine 挂起并尝试发送。
}

这里有个资源泄漏:goroutines 消耗内存和运行时资源,goroutine 堆栈中的堆引用阻止数据被垃圾回收。 Goroutine 也不会被垃圾收集;他们必须自己退出。

即使下游阶段无法接收所有入站值,我们也需要安排管道的上游阶段退出。 一种方法是将出站 channel 改为具有含有缓冲区的 channel 。缓冲区可以容纳固定数量的值;如果缓冲区中有空间,发送操作将立即完成:

c := make(chan int, 2) // 2个大小的缓冲区
c <- 1  // 立即成功
c <- 2  // 立即成功
c <- 3  // 阻塞,直到其他的 goroutine 执行 <-c 并接收 1

当 channel 创建时已知要被发送的值的数量时,缓冲区可以简化代码。例如,我们可以重写 gen 以将整数列表复制到缓冲 channel 中并避免创建新的goroutine:

func gen(nums ...int) <-chan int {
    out := make(chan int, len(nums))
    for _, n := range nums {
        out <- n
    }
    close(out)
    return out
}

返回管道中阻塞的 goroutine,我们可以考虑将缓冲区添加到 merge 返回的出站 channel 中:

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int, 1) // 对于未读的 input,要有够的空间
    // ... 余下代码的未变 ...

虽然这可以修复该程序中阻塞的 goroutine,但这是错误的代码。此处缓冲区的大小选择为 1 取决于知道 merge 要接收的值的数量以及下游阶段将消耗的值的数量。 这很脆弱:如果我们向 gen 传递一个额外的值,或者如果下游阶段读取的值更少,那么我们将会再次阻塞 goroutines。

相反,我们需要为下游阶段提供一种方法,以向发送方指示他们将停止接受输入。

显式取消

main 函数在没有从 out 接收到所有值的情况下就决定退出时,它必须告诉上游阶段的 goroutines 丢弃它们尝试发送的值。它是通过在称为 done 的 channel 上发送值来实现的。它发送两个值,因为有两个潜在的阻塞的发送者:

func main() {
    in := gen(2, 3)

    // 通过两个都在 in 读取的 goroutinue 给 sq 分配工作。
    c1 := sq(in)
    c2 := sq(in)

    // 从 output 消耗掉第一个值
    done := make(chan struct{}, 2)
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 or 9

    // 告诉剩余的发送者我们将要离开
    done <- struct{}{}
    done <- struct{}{}
}

发送goroutine会将其发送操作替换为 select 语句,该语句在 out 上发送发生时,或从 done 接收到值时继续。 done 的值类型为空结构,因为该值无关紧要:它只是接收事件,指示应放弃在 out 上发送 。output goroutine 继续在其入站 channel c 上循环,因此上游阶段不会被阻塞。(稍后我们将讨论如何让此循环尽早返回。)

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // 为cs 中的每个 input channel  开启一个 output goroutine。
    // output 从 c 中复制值到 out 直到 c 关闭或者从 done 接收到一个值,output 调用 wg.Done。
    output := func(c <-chan int) {
        for n := range c {
            select {
            case out <- n:
            case <-done:
            }
        }
        wg.Done()
    }
    // ... 余下代码的未变 ...

这种方法存在一个问题:each 下游接收者都需要知道可能阻塞的上游发送者的数量,并安排在早期返回时向这些发送者发出信号。 跟踪这些计数是乏味且容易出错的。

我们需要一种方法来告知未知数量的 goroutine,以停止向下游发送其值。 在 Go 中,我们可以通过关闭 channel 来完成此操作,因为在关闭的 channel上的接收操作始终可以立即执...

这意味着 main函数可以简单地通过关闭 done 通道来解除对所有发件人的阻塞。 此关闭实际上是向发送者的广播信号。 我们扩展 each 我们的管道函数以接收 done  作为参数,并通过 defer 语句安排关闭,以便所有来自 main  函数的返回路径都将指示管道阶段退出了。

func main() {
    // 创建一个 done channel 在整个管道中共享,
    // 当管道退出时,关闭 channel,作为对所有 goroutine
    // 发出的我们要退出的信号。
    done := make(chan struct{})
    defer close(done)

    in := gen(done, 2, 3)

    // 通过两个都在 in 读取的 goroutinue 给 sq 分配工作。
    c1 := sq(done, in)
    c2 := sq(done, in)

    //  从 output 消耗掉第一个值
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 or 9

    // done 通过延时调用来关闭。
}

现在,一旦 done 关闭,我们的每个管道阶段都可以自由返回。merge 中的 output  例程可以返回而不会耗尽其入站 channel,因为它知道上游发送者 sq 在关闭完成后将停止尝试发送。通过 defer 语句, output 确保在所有返回路径上 wg.Done 被调用:

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // 为cs 中的每个 input channel  开启一个 output goroutine。
    // output 从 c 中复制值到 out 直到 c 关闭或者从 done 接收到一个值,output 调用 wg.Done。
    output := func(c <-chan int) {
        defer wg.Done()
        for n := range c {
            select {
            case out <- n:
            case <-done:
                return
            }
        }
    }
    // ... 余下代码的未变 ...

同样,sq 可以在 done  关闭后立即返回。 sq 通过 defer 语句确保在所有返回路径上关闭其 out channel:

func sq(done <-chan struct{}, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            select {
            case out <- n * n:
            case <-done:
                return
            }
        }
    }()
    return out
}

以下是管道构建的指导原则:

  • 在某些阶段,完成所有发送操作后,将关闭其出站 channel。
  • 在某些阶段,将继续从入站 channel 接收值,直到这些 channel 关闭或发送方被阻塞为止。

管道通过确保有足够的缓冲区来存储所有已发送的值,或者在接收方可能放弃 channel 时显式发信号通知发送方,从而解除发送方的阻塞。

计算一棵树的摘要

让我们考虑一个更现实的管道。

MD5是一种消息摘要算法,可用作文件校验和。 命令行实用程序 md5sum 打印摘要值以获取文件列表。

% md5sum *.go
d47c2bbc28298ca9befdfbc5d3aa4e65  bounded.go
ee869afd31f83cbb2d10ee81b2b831dc  parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96  serial.go

我们的示例程序就像 md5sum 一样,但是取一个目录作为参数,并打印该目录下每个常规文件的摘要值,并按路径名排序。

% go run serial.go .
d47c2bbc28298ca9befdfbc5d3aa4e65  bounded.go
ee869afd31f83cbb2d10ee81b2b831dc  parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96  serial.go

我们程序的 main 函数调用了一个辅助函数 MD5All,该函数返回从路径名到摘要值的map,然后对结果进行排序和打印:

func main() {
    // 计算特定目录下的所有文件的 MD5 sum,
    // 然后根据路径名排序并打印
    m, err := MD5All(os.Args[1])
    if err != nil {
        fmt.Println(err)
        return
    }
    var paths []string
    for path := range m {
        paths = append(paths, path)
    }
    sort.Strings(paths)
    for _, path := range paths {
        fmt.Printf("%x  %s.", m[path], path)
    }
}

函数 MD5All 是我们讨论的重点。在 serial.go中,实现不使用并发性,只是在遍历树时,简单的读取并对每个文件计算校验和。

// MD5All 读取以 root 为根的文件树中的所有文件,并返回一个
// 从文件路径到文件内容的MD5 sum 的 map。如果遍历目录失败
// 或者任何读操作失败,MD5All 返回一个 error.
func MD5All(root string) (map[string][md5.Size]byte, error) {
    m := make(map[string][md5.Size]byte)
    err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
        if err != nil {
            return err
        }
        if !info.Mode().IsRegular() {
            return nil
        }
        data, err := ioutil.ReadFile(path)
        if err != nil {
            return err
        }
        m[path] = md5.Sum(data)
        return nil
    })
    if err != nil {
        return nil, err
    }
    return m, nil
}

并行计算摘要

在 parallel.go中,我们将 MD5All 分成两个阶段的管道。第一阶段, sumFiles 遍历树,在一个新的 goroutinue 中计算每个文件的摘要,并把结果发送到使用 result 类型值的 channel中:

type result struct {
    path string
    sum  [md5.Size]byte
    err  error
}

sumFiles 返回两个channel:一个用于 results,另一个用于由 filepath.Walk 返回的错误。walk 函数启动一个新的 goroutine 来处理每个常规文件,然后检查 done。 如果done已关闭,walk 将立即停止:

func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
    // 为每个常规文件,启动一个 goroutine 来计算文件的 sum并把结果发送
    // 给 c。并发送 walk 的结果给 errc。
    c := make(chan result)
    errc := make(chan error, 1)
    go func() {
        var wg sync.WaitGroup
        err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if !info.Mode().IsRegular() {
                return nil
            }
            wg.Add(1)
            go func() {
                data, err := ioutil.ReadFile(path)
                select {
                case c <- result{path, md5.Sum(data), err}:
                case <-done:
                }
                wg.Done()
            }()
            // 如果 done 关闭,则终止 walk。
            select {
            case <-done:
                return errors.New("walk canceled")
            default:
                return nil
            }
        })
        // Walk 返回,所有wg.Add 调用都已完成。当所有发送完成后
        // 启动一个 goroutine 来关闭 c 。
        go func() {
            wg.Wait()
            close(c)
        }()
        // 这里不需要 select,因为 errc 是 buffered channel。
        errc <- err
    }()
    return c, errc
}

MD5All 从  c 中接收摘要值。在发生错误时尽早返回,通过 defer 关闭 done :

func MD5All(root string) (map[string][md5.Size]byte, error) {
    // MD5All 返回时,关闭 done channel ;它可能
    // 会在从 c 和 errc 接收所有值之前这样做。
    done := make(chan struct{})
    defer close(done)

    c, errc := sumFiles(done, root)

    m := make(map[string][md5.Size]byte)
    for r := range c {
        if r.err != nil {
            return nil, r.err
        }
        m[r.path] = r.sum
    }
    if err := <-errc; err != nil {
        return nil, err
    }
    return m, nil
}

有界并行

parallel.goMD5All 实现中,为每个文件启动一个新的 goroutine 。当一个目录中有许多大型文件,这可能会分配比计算机上可用内存更多的内存。

我们可以通过限制并行读取的文件数来限制这些分配。在 bounded.go 中,我们通过创建固定数量的goroutine 来读取文件来实现。 我们的管道现在分为三个阶段:遍历树,读取和计算文件摘要,以及收集这些摘要。

在第一阶段, walkFiles, 给出树中常规文件的路径:

func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
    paths := make(chan string)
    errc := make(chan error, 1)
    go func() {
        // 当 Walk 返回后,关闭 paths channel。
        defer close(paths)
        // 这里的 send 不需要 select,因为 errc 是 buffered channel。
        errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if !info.Mode().IsRegular() {
                return nil
            }
            select {
            case paths <- path:
            case <-done:
                return errors.New("walk canceled")
            }
            return nil
        })
    }()
    return paths, errc
}

中间的阶段启动固定数量的 digester goroutines 从 paths 中接收文件名字并把 results  发送给 channel c

func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
    for path := range paths {
        data, err := ioutil.ReadFile(path)
        select {
        case c <- result{path, md5.Sum(data), err}:
        case <-done:
            return
        }
    }
}

与我们之前的示例不同,由于多个 goroutine 在共享 channel 上发送消息,因此 digester 不会关闭其 output channel 。取而代之的是,在 MD5All 中的代码安排所有 digesters 完成后,关闭这个channel :

    // 开启固定数量的 goroutines 来读取和计算文件摘要。
    c := make(chan result)
    var wg sync.WaitGroup
    const numDigesters = 20
    wg.Add(numDigesters)
    for i := 0; i < numDigesters; i++ {
        go func() {
            digester(done, paths, c)
            wg.Done()
        }()
    }
    go func() {
        wg.Wait()
        close(c)
    }()

相反,我们可以让每个 digester 创建并返回其自己的 output channel,但是随后我们将需要其他 goroutine 来 fan-in 结果。

最后阶段从  c 接收所有 results,然后从 errc 检查错误。这项检查不能更早进行,因为在此之前,walkFiles 可能向下游发送值会阻塞:

    m := make(map[string][md5.Size]byte)
    for r := range c {
        if r.err != nil {
            return nil, r.err
        }
        m[r.path] = r.sum
    }
    // 检查 Walk 是否失败。
    if err := <-errc; err != nil {
        return nil, err
    }
    return m, nil
}

总结

本文介绍了用于在 Go 中构造流数据管道的技术。处理此类管道中的故障非常棘手,因为管道中的每个阶段都可能阻塞尝试向下游发送值,并且下游阶段可能不再关心传入的数据。我们展示了关闭 channel 如何向管道开始的所有 goroutine 广播 "done" 信号,并定义了正确构建管道的准则。

进一步阅读:

本文章首发在 LearnKu.com 网站上。

本译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接
我们的翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。

原文地址:https://learnku.com/docs/go-blog/pipelin...

译文地址:https://learnku.com/docs/go-blog/pipelin...

上一篇 下一篇
Summer
贡献者:3
讨论数量: 0
发起讨论 只看当前版本


暂无话题~