当错误发生时,如何关闭多个 goroutine ?
方法一、context#
您可以使用 context
为以下内容创建的包 ("carries deadlines, cancelation signals...").
您可以使用 context.WithCancel()
创建一个可以发布取消信号的上下文 (父上下文可能是 context.Background()
返回的上下文). 这将为您返回一个 cancel()
函数,该函数可用于取消 (或更准确地说,发出取消意图的 信号 ) 在工作的 goroutines.
在工作的 goroutines 中,您必须通过检查 Context.Done()
返回的通道是否关闭来检查是否启动了此意图,最简单的方法是尝试从其接收 (如果关闭则立即进行). 并执行非阻塞检查 (因此,如果未关闭,则可以继续使用), 请将 select
与 default
分支一起使用.
我将使用以下 work()
实现,它模拟 10% 的失败概率,并模拟 1 秒的工作:
func work(i int) (int, error) {
if rand.Intn(100) < 10 { // 10% 概率失败
return 0, errors.New("random error")
}
time.Sleep(time.Second)
return 100 + i, nil
}
而 doAllWork()
可能如下所示:
func doAllWork() error {
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // 确保它被调用以释放资源,即使没有错误
for i := 0; i < 2; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
for j := 0; j < 10; j++ {
// 检查是否存在其他 gorouties 错误:
select {
case <-ctx.Done():
return // 某处错误, 终止
default: // 默认值必须避免阻塞
}
result, err := work(j)
if err != nil {
fmt.Printf("Worker #%d during %d, error: %v\n", i, j, err)
cancel()
return
}
fmt.Printf("Worker #%d finished %d, result: %d.\n", i, j, result)
}
}(i)
}
wg.Wait()
return ctx.Err()
}
测试方法如下:
func main() {
rand.Seed(time.Now().UnixNano() + 1) // +1 因为 Playground 的时间是固定的
fmt.Printf("doAllWork: %v\n", doAllWork())
}
输出 (在 Go Playground 上尝试):
Worker #0 finished 0, result: 100.
Worker #1 finished 0, result: 100.
Worker #1 finished 1, result: 101.
Worker #0 finished 1, result: 101.
Worker #0 finished 2, result: 102.
Worker #1 finished 2, result: 102.
Worker #1 finished 3, result: 103.
Worker #1 during 4, error: random error
Worker #0 finished 3, result: 103.
doAllWork: context canceled
如果没有错误,例如,使用以下 work()
函数:
func work(i int) (int, error) {
time.Sleep(time.Second)
return 100 + i, nil
}
输出类似 (在 Go Playground 上尝试):
Worker #0 finished 0, result: 100.
Worker #1 finished 0, result: 100.
Worker #1 finished 1, result: 101.
Worker #0 finished 1, result: 101.
Worker #0 finished 2, result: 102.
Worker #1 finished 2, result: 102.
Worker #1 finished 3, result: 103.
Worker #0 finished 3, result: 103.
Worker #0 finished 4, result: 104.
Worker #1 finished 4, result: 104.
Worker #1 finished 5, result: 105.
Worker #0 finished 5, result: 105.
Worker #0 finished 6, result: 106.
Worker #1 finished 6, result: 106.
Worker #1 finished 7, result: 107.
Worker #0 finished 7, result: 107.
Worker #0 finished 8, result: 108.
Worker #1 finished 8, result: 108.
Worker #1 finished 9, result: 109.
Worker #0 finished 9, result: 109.
doAllWork: <nil>
笔记:
如上所示在上述解决方案中我们只是使用了上下文的 Done()
通道,所以看起来我们可以很容易地(确实看起来很容易)使用 done
通道来关闭协程,而不是使用 Context
的 cancel()
关闭通道来执行关闭协程的操作。
但是这不是真的 “关闭”。 这只能在只有一个 goroutine 的时候可以使用,但在我们的例子中,任何成员都去这样关闭 goroutine
。 并尝试关闭已经关闭的通道(这可能会引起 panics)(有疑问请参阅此处的详细信息:[未初始化的 通道行为?](stackoverflow.com/questions/390156...))。 因此,这种操作会导致您必须确保围绕 close(done)
操作进行的同时进行某种同步 / 排除操作,这将使其可读性降低,甚至逻辑更加复杂。 实际上,这应该是 cancel()
函数在幕后所做的事情,把复杂的操作隐藏 / 抽象出来,远离你的眼睛,所以 cancel()
是你首选的操作,要尽量使用此函数,以使你的代码 / 使用更简单,简洁。
如何从工作程序中获取并返回错误?#
为此,你可以使用 error 类型的 channel:
errs := make(chan error, 2) // 2个错误的缓存区
在工作程序内部,在遇到错误时,将其发送到 channel 上而不是打印它:
result, err := work(j)
if err != nil {
errs <- fmt.Errorf("Worker #%d during %d, error: %v\n", i, j, err)
cancel()
return
}
循环之后,如果出现错误,则返回错误 (否则返回 nil
):
// 如果有,返回(第一次)错误
if ctx.Err() != nil {
return <-errs
}
return nil
这次输出 (在 [Go Playground] 上尝试 (play.golang.org/p/j2wmouxitm)):
Worker #0 finished 0, result: 100.
Worker #1 finished 0, result: 100.
Worker #1 finished 1, result: 101.
Worker #0 finished 1, result: 101.
Worker #0 finished 2, result: 102.
Worker #1 finished 2, result: 102.
Worker #1 finished 3, result: 103.
Worker #0 finished 3, result: 103.
doAllWork: Worker #1 during 4, error: random error
请注意,我使用了一个等于 worker 数量的缓冲通道的缓冲区大小,这确保在其发送始终是非阻塞的。 这也使你可以接收和处理所有错误,而不仅仅是一个错误(例如第一个)。 另一种选择是使用缓冲通道只保存 1 个,并对其进行非阻塞发送,如下所示:
errs := make(chan error, 1) // 只缓存第一个错误
// ...内部其他工作程序:
result, err := work(j)
if err != nil {
// 非阻塞发送:
select {
case errs <- fmt.Errorf("Worker #%d during %d, error: %v\n", i, j, err):
default:
}
cancel()
return
}
方法二、errgroup#
一个更清晰的方法是使用 errgroup
(documentation).
包 errgroup
为处理公共任务子任务的 goroutine 组提供同步、错误传播和上下文取消。
你可以在此查看这个例子 (playground):
var g errgroup.Group
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
"http://www.somestupidname.com/",
}
for _, url := range urls {
// 启动一个 goroutine 来获取 URL
url := url // https://golang.org/doc/faq#closures_and_goroutines
g.Go(func() error {
// 抓取URL内容
resp, err := http.Get(url)
if err == nil {
resp.Body.Close()
}
return err
})
}
// 等待所有 HTTP 请求获取完成
if err := g.Wait(); err == nil {
fmt.Println("Successfully fetched all URLs.")
} else {
// 全部运行后,至少有一个返回了错误!
// 但所有人都必须完成他们的工作!
// 如果你想在其他 goroutines 失败时停止它,请继续阅读!
fmt.Println("Unsuccessfully fetched URLs.")
}
但请注意:Go
文档 中 返回首个非 nil 的错误将取消该组的运行
这句话有点歧义。
实际上,如果使用上下文(WithContext
函数)去创建 errgroup.Group
,这将会在组内协程返回错误时调用 WithContext
返回的上下文中的 cancel 函数,否则什么也不会发生(请看这里的源码)。
所以,如果需要关闭不同的协程,则必须使用 WithContext
返回的上下文并由其内部自行管理,errgroup
则只会关闭上下文! 这里有个例子
总之,errgroup
可以以不同的方式实现效果,比如这个例子
-
"仅错误",就如上面的例子所示:
Wait
将等待所有协程结束,然后返回其中第一个非 nil 错误,没有错误则返回nil
。 -
在并发情况下:必须使用
WithContext
函数 创建组并使用上下文管理其结束。我在 playgroud 使用 sleep 做了一个案例 必须手动的关闭每个协程,但可通过关闭上下文进而控制它们的关闭。 -
管道(详见示例)。
本文中的所有译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接
我们的翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。
推荐文章: