下次想在Golang中写个并发处理,就用这个模板,准没错!

引言

要说Golang中最引以为傲的特性,那非goroutine莫属,goroutine(协程)很轻量,相比于每个线程要使用1MB的内存,每个go协程只需要1kb左右就够了。

于是,在需要做并发处理的时候,很自然的就想着,go一下就好了吗? 示例代码如下

    for i:=0; i < 5; i++ {
        go func(index int) {
            fmt.Println(index)
        }(i) //这里为什么要把i传进来呢?
    }

这样可以并发处理请求了是不假,但如果其中一个请求出错了,需要退出怎么办了? 一方面,可以自己实现这个错误处理(稍后会写),另一方面,也可以直接用golang官方errorgroup

errorgoup是个好东东

上面的示例代码,如果用errorgroup来重新实现,会是下面这个样子

    g, _ := errgroup.WithContext(context.Background())

    for i := 0; i < 5; i++ {
        index := i
        g.Go(func() error {
            fmt.Println(index)
            return nil // 如果想Mock一些错误,也可以return一个error
        })
    }

    if err = g.Wait(); err != nil {
        return err
    }

是不是还挺简单的?感兴趣的,可以自行搜下源码,除去注释只有大概30行代码,还是很好理解的。

现在错误处理也有了,是不是就完美了呢?

这个问题就要看你并发处理多少请求了,协程虽然很轻量,但也还是要耗费一些资源的,所以如果可以预见到有几百上千的请求的要处理,那就需要协程池来复用协程,达到节省资源的目的了。

网上有很多协程池的实现,大都做的大而全,考虑了很多场景,但实际编码场景中,很可能只是为了解决一个小问题,就引入一个包,实在觉得有些太重了呢,而且可能还不够灵活。

有没有一个简单的模板,可以copy/paste/tweak一下呢?这就来了

一个简单实用的模板

闲话少絮,直接上代码先,关键部分会在代码中加注释解释。

    var (
        err         error
        outputs     []int
        workers     = 4 //协程的数量,可以按需设置,一般不大于runtime.NumCPU()
        workChannel = make(chan int)
        errChannel  = make(chan error, workers)
        wg          = &sync.WaitGroup{}
        mux         = sync.Mutex{}
    )

    worker := func(input int) (int, error){
        retrun input, nil //如果想Mock一些错误,也可以return一个error
    }

    wg.Add(workers)
    for i := 0; i < workers; i++ {
        go func() {
            defer wg.Done()
            for input := range workChannel { // workChannel被close时,这个循环就会退出
                output, err:=worker(input)
                if err != nil {
                    errChannel <- err
                    break
                }

                mux.Lock() //使用lock保护outputs,来搜集执行结果,如果不需要可以删除
                outputs = append(outputs, output) 
                mux.Unlock()
            }
        }()
    }

loop:
    for _, input := range inputs {
        select {
        case workChannel <- input:
        case err = <-errChannel:
            break loop
        }
    }

    close(workChannel) //关闭workChannel,可以让工作协程,在处理完当前任务后退出
    wg.Wait()

    // 以于select case,如果有多个case满足时,会选择随机进入一个case的,所以需要再检查一次,双重保险
    if err == nil {
        select {
        case err = <-errChannel:
        default:
        }
    }

    return outputs, err

代码看着是多了些,但在实际使用过程中,按需要改下worker函数输入和输出的类型即可。

如果copy/paste也不想做,那就只能封装一下了,但是因为现在golang还没正式推出范型,只能用inteface{}了,看着是不大好看,使用时,也要自己转来转去的,不过可以凑合着用啦。

话说,封装好的代码在这 parallel_runner.go,需要的自取了。

关于context

也许有人会问,为什么不用context.WithCancel(),然后在出现错误的时候cancel一下?
讲究一点的话,确实应该用,但那也意味在在worker函数中,你也要检查ctx.Done(), 我不用还是因为懒了……

写在最后

之前还写过一些关于channel的使用的文章,

里面实现的轻量级util都开源在channelx,欢迎大家审阅,如果有你喜欢用的工具,欢迎点个赞或者star :)

本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!