当错误发生时,如何关闭多个 goroutine ?

方法一、context

您可以使用 context 为以下内容创建的包 ("carries deadlines, cancelation signals...").

您可以使用 context.WithCancel() 创建一个可以发布取消信号的上下文(父上下文可能是 context.Background()返回的上下文). 这将为您返回一个 cancel() 函数, 该函数可用于取消 (或更准确地说, 发出取消意图的 信号 ) 在工作的 goroutines.
在工作的 goroutines 中, 您必须通过检查 Context.Done() 返回的通道是否关闭来检查是否启动了此意图, 最简单的方法是尝试从其接收 (如果关闭则立即进行). 并执行非阻塞检查 (因此,如果未关闭,则可以继续使用), 请将 select 与 default 分支一起使用.

我将使用以下 work() 实现, 它模拟10%的失败概率, 并模拟1秒的工作:

func work(i int) (int, error) {
    if rand.Intn(100) < 10 { // 10% 概率失败
        return 0, errors.New("random error")
    }
    time.Sleep(time.Second)
    return 100 + i, nil
}

而 doAllWork() 可能如下所示:

func doAllWork() error {
    var wg sync.WaitGroup

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel() // 确保它被调用以释放资源,即使没有错误

    for i := 0; i < 2; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()

            for j := 0; j < 10; j++ {
                // 检查是否存在其他 gorouties 错误:
                select {
                case <-ctx.Done():
                    return // 某处错误, 终止
                default: // 默认值必须避免阻塞
                }
                result, err := work(j)
                if err != nil {
                    fmt.Printf("Worker #%d during %d, error: %v\n", i, j, err)
                    cancel()
                    return
                }
                fmt.Printf("Worker #%d finished %d, result: %d.\n", i, j, result)
            }
        }(i)
    }
    wg.Wait()

    return ctx.Err()
}

测试方法如下:

func main() {
    rand.Seed(time.Now().UnixNano() + 1) // +1 因为 Playground 的时间是固定的
    fmt.Printf("doAllWork: %v\n", doAllWork())
}

输出 (在 Go Playground 上尝试):

Worker #0 finished 0, result: 100.
Worker #1 finished 0, result: 100.
Worker #1 finished 1, result: 101.
Worker #0 finished 1, result: 101.
Worker #0 finished 2, result: 102.
Worker #1 finished 2, result: 102.
Worker #1 finished 3, result: 103.
Worker #1 during 4, error: random error
Worker #0 finished 3, result: 103.
doAllWork: context canceled

如果没有错误, 例如, 使用以下 work() 函数:

func work(i int) (int, error) {
    time.Sleep(time.Second)
    return 100 + i, nil
}

输出类似 (在 Go Playground 上尝试):

Worker #0 finished 0, result: 100.
Worker #1 finished 0, result: 100.
Worker #1 finished 1, result: 101.
Worker #0 finished 1, result: 101.
Worker #0 finished 2, result: 102.
Worker #1 finished 2, result: 102.
Worker #1 finished 3, result: 103.
Worker #0 finished 3, result: 103.
Worker #0 finished 4, result: 104.
Worker #1 finished 4, result: 104.
Worker #1 finished 5, result: 105.
Worker #0 finished 5, result: 105.
Worker #0 finished 6, result: 106.
Worker #1 finished 6, result: 106.
Worker #1 finished 7, result: 107.
Worker #0 finished 7, result: 107.
Worker #0 finished 8, result: 108.
Worker #1 finished 8, result: 108.
Worker #1 finished 9, result: 109.
Worker #0 finished 9, result: 109.
doAllWork: <nil>

笔记:

如上所示在上述解决方案中我们只是使用了上下文的 Done() 通道,所以看起来我们可以很容易地(确实看起来很容易)使用 done 通道来关闭协程,而不是使用 Contextcancel() 关闭通道来执行关闭协程的操作。

但是这不是真的“关闭”。 这只能在只有一个 goroutine 的时候可以使用,但在我们的例子中,任何成员都去这样关闭 goroutine 并尝试关闭已经关闭的通道(这可能会引起panics)(有疑问请参阅此处的详细信息:[未初始化的 通道行为?](stackoverflow.com/questions/390156...))。 因此,这种操作会导致您必须确保围绕 close(done) 操作进行的同时进行某种同步/排除操作,这将使其可读性降低,甚至逻辑更加复杂。 实际上,这应该是 cancel() 函数在幕后所做的事情,把复杂的操作隐藏/抽象出来,远离你的眼睛,所以 cancel() 是你首选的操作,要尽量使用此函数,以使你的代码/使用更简单,简洁。

如何从工作程序中获取并返回错误?

为此,你可以使用 error 类型的channel:

errs := make(chan error, 2) // 2个错误的缓存区

在工作程序内部,在遇到错误时,将其发送到 channel 上而不是打印它:

result, err := work(j)
if err != nil {
    errs <- fmt.Errorf("Worker #%d during %d, error: %v\n", i, j, err)
    cancel()
    return
}

循环之后,如果出现错误,则返回错误(否则返回nil):

// 如果有,返回(第一次)错误
if ctx.Err() != nil {
    return <-errs
}
return nil

这次输出(在[Go Playground]上尝试(play.golang.org/p/j2wmouxitm)):

Worker #0 finished 0, result: 100.
Worker #1 finished 0, result: 100.
Worker #1 finished 1, result: 101.
Worker #0 finished 1, result: 101.
Worker #0 finished 2, result: 102.
Worker #1 finished 2, result: 102.
Worker #1 finished 3, result: 103.
Worker #0 finished 3, result: 103.
doAllWork: Worker #1 during 4, error: random error

请注意,我使用了一个等于 worker 数量的缓冲通道的缓冲区大小,这确保在其发送始终是非阻塞的。 这也使你可以接收和处理所有错误,而不仅仅是一个错误(例如第一个)。 另一种选择是使用缓冲通道只保存 1个,并对其进行非阻塞发送,如下所示:

errs := make(chan error, 1) // 只缓存第一个错误

// ...内部其他工作程序:

result, err := work(j)
if err != nil {
    // 非阻塞发送:
    select {
    case errs <- fmt.Errorf("Worker #%d during %d, error: %v\n", i, j, err):
    default:
    }
    cancel()
    return
}

方法二、errgroup

一个更清晰的方法是使用errgroup (documentation).

errgroup 为处理公共任务子任务的 goroutine 组提供同步、错误传播和上下文取消。

你可以在此查看这个例子 (playground):

    var g errgroup.Group
    var urls = []string{
        "http://www.golang.org/",
        "http://www.google.com/",
        "http://www.somestupidname.com/",
    }

    for _, url := range urls {
        // 启动一个 goroutine 来获取 URL
        url := url // https://golang.org/doc/faq#closures_and_goroutines

       g.Go(func() error {
            // 抓取URL内容
            resp, err := http.Get(url)
            if err == nil {
                resp.Body.Close()
            }
            return err
        })
    }

    // 等待所有 HTTP 请求获取完成
    if err := g.Wait(); err == nil {
        fmt.Println("Successfully fetched all URLs.")
    } else {
        // 全部运行后,至少有一个返回了错误!
        // 但所有人都必须完成他们的工作!
        // 如果你想在其他 goroutines 失败时停止它,请继续阅读!
        fmt.Println("Unsuccessfully fetched URLs.")
    }

但请注意:Go 文档返回首个非 nil 的错误将取消该组的运行这句话有点歧义。

实际上,如果使用上下文(WithContext 函数)去创建 errgroup.Group,这将会在组内协程返回错误时调用 WithContext 返回的上下文中的 cancel 函数,否则什么也不会发生(请看这里的源码)。

所以,如果需要关闭不同的协程,则必须使用 WithContext 返回的上下文并由其内部自行管理,errgroup 则只会关闭上下文! 这里有个例子

总之,errgroup 可以以不同的方式实现效果,比如这个例子

  1. "仅错误",就如上面的例子所示:Wait 将等待所有协程结束,然后返回其中第一个非 nil 错误,没有错误则返回 nil

  2. 在并发情况下:必须使用 WithContext 函数 创建组并使用上下文管理其结束。我在 playgroud 使用 sleep 做了一个案例 必须手动的关闭每个协程,但可通过关闭上下文进而控制它们的关闭。

  3. 管道(详见示例)。

本文中的所有译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接
我们的翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。

原文地址:https://stackoverflow.com/questions/4550...

译文地址:https://learnku.com/go/t/62115

本帖已被设为精华帖!
本文为协同翻译文章,如您发现瑕疵请点击「改进」按钮提交优化建议
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!