go并发-工作池模式

开篇

之前写过一篇文章,它有个响亮的名字: Handling 1 Million Requests per Minute with Go。 这是国外的一个作者写的,我做了一篇说明。起的也是这个标题, 阅读量是我最好的一篇,果然文章都是靠标题出彩的…..

今天偶然看到另一篇文章(原文在文末)。两篇文章原理相似:有一批工作任务(job),通过工作池(worker-pool)的方式,达到多 worker 并发处理 job 的效果。

他们还是有很多不同的点,实现上差别也是蛮大的。

首先上一篇文章我放了一张图片,大概就是上篇整体的工作流。 image

  • 每个 worker 处理完任务就好,不关心结果,不对结果做进一步处理。
  • 只要请求不停止,程序就不会停止,没有控制机制,除非宕机。

这篇文章不同点在于:

首先数据会从 generate (生产数据)->并发处理数据->处理结果聚合。 图大概是这样的, image

然后它可以通过 context.context 达到控制工作池停止工作的效果。

最后通过代码,你会发现它不是传统意义上的 worker-pool,后面会说明。

下图能清晰表达整体流程了。 image

顺便说一句,这篇文章实现的代码比 Handling 1 Million Requests per Minute with Go 的代码简单多了。

首先看 job

package wpool

import (
    "context"
)

type JobID string
type jobType string
type jobMetadata map[string]interface{}

type ExecutionFn func(ctx context.Context, args interface{}) (interface{}, error)

type JobDescriptor struct {
    ID       JobID 
    JType    jobType
    Metadata map[string]interface{}
}

type Result struct {
    Value      interface{}
    Err        error
    Descriptor JobDescriptor
}

type Job struct {
    Descriptor JobDescriptor
    ExecFn     ExecutionFn
    Args       interface{}
}

// 处理 job 逻辑,处理结果包装成 Result 结果
func (j Job) execute(ctx context.Context) Result {
    value, err := j.ExecFn(ctx, j.Args)
    if err != nil {
        return Result{
            Err:        err,
            Descriptor: j.Descriptor,
        }
    }

    return Result{
        Value:      value,
        Descriptor: j.Descriptor,
    }
}

这个可以简单过一下。最终每个 job 处理完都会包装成 Result 返回。

下面这段就是核心代码了。

package wpool

import (
    "context"
    "fmt"
    "sync"
)

// 运行中的每个worker
func worker(ctx context.Context, wg *sync.WaitGroup, jobs <-chan Job, results chan<- Result) {
    defer wg.Done()
    for {
        select {
        case job, ok := <-jobs:
            if !ok {
                return
            }
            results <- job.execute(ctx)
        case <-ctx.Done():
            fmt.Printf("cancelled worker. Error detail: %v\n", ctx.Err())
            results <- Result{
                Err: ctx.Err(),
            }
            return
        }
    }
}

type WorkerPool struct {
    workersCount int //worker 数量
    jobs         chan Job // 存储 job 的 channel 
    results      chan Result // 处理完每个 job 对应的 结果集
    Done         chan struct{} //是否结束
}

func New(wcount int) WorkerPool {
    return WorkerPool{
        workersCount: wcount,
        jobs:         make(chan Job, wcount),
        results:      make(chan Result, wcount),
        Done:         make(chan struct{}),
    }
}

func (wp WorkerPool) Run(ctx context.Context) {
    var wg sync.WaitGroup
    for i := 0; i < wp.workersCount; i++ {
        wg.Add(1)
        go worker(ctx, &wg, wp.jobs, wp.results)
    }

    wg.Wait()
    close(wp.Done)
    close(wp.results)
}

func (wp WorkerPool) Results() <-chan Result {
    return wp.results
}

func (wp WorkerPool) GenerateFrom(jobsBulk []Job) {
    for i, _ := range jobsBulk {
        wp.jobs <- jobsBulk[i]
    }
    close(wp.jobs)
}

整个 WorkerPool 结构很简单。 jobs 是一个缓冲 channel。每一个任务都会放入 jobs 中等待处理 woker 处理。

results 也是一个通道类型,它的作用是保存每个 job 处理后产生的结果 Result

首先通过 New 初始化一个 worker-pool 工作池,然后执行 Run 开始运行。

func New(wcount int) WorkerPool {
    return WorkerPool{
        workersCount: wcount,
        jobs:         make(chan Job, wcount),
        results:      make(chan Result, wcount),
        Done:         make(chan struct{}),
    }
}
func (wp WorkerPool) Run(ctx context.Context) {
    var wg sync.WaitGroup

    for i := 0; i < wp.workersCount; i++ {
        wg.Add(1)
        go worker(ctx, &wg, wp.jobs, wp.results)
    }

    wg.Wait()
    close(wp.Done)
    close(wp.results)
}

初始化的时候传入 worker 数,对应每个 g 运行 work(ctx,&wg,wp.jobs,wp.results),组成了 worker-pool。 同时通过 sync.WaitGroup,我们可以等待所有 worker 工作结束,也就意味着 work-pool 结束工作,当然可能是因为任务处理结束,也可能是被停止了。

每个 job 数据源是如何来的?

// job数据源,把每个 job 放入到 jobs channel 中
func (wp WorkerPool) GenerateFrom(jobsBulk []Job) {
    for i, _ := range jobsBulk {
        wp.jobs <- jobsBulk[i]
    }
    close(wp.jobs)
}

对应每个 worker 的工作,

func worker(ctx context.Context, wg *sync.WaitGroup, jobs <-chan Job, results chan<- Result) {
    defer wg.Done()
    for {
        select {
        case job, ok := <-jobs:
            if !ok {
                return
            }
            results <- job.execute(ctx)
        case <-ctx.Done():
            fmt.Printf("cancelled worker. Error detail: %v\n", ctx.Err())
            results <- Result{
                Err: ctx.Err(),
            }
            return
        }
    }
}

每个 worker 都尝试从同一个 jobs 获取数据,这是一个典型的 fan-out 模式。 当对应的 g 获取到 job 进行处理后,会把处理结果发送到同一个 results channel 中,这又是一个 fan-in 模式。 当然我们通过 context.Context 可以对每个 worker 做停止运行控制。

最后是处理结果集合,

// 处理结果集
func (wp WorkerPool) Results() <-chan Result {
    return wp.results
}

那么整体的测试代码就是:

func TestWorkerPool(t *testing.T) {
    wp := New(workerCount)

    ctx, cancel := context.WithCancel(context.TODO())
    defer cancel()

    go wp.GenerateFrom(testJobs())

    go wp.Run(ctx)

    for {
        select {
        case r, ok := <-wp.Results():
            if !ok {
                continue
            }

            i, err := strconv.ParseInt(string(r.Descriptor.ID), 10, 64)
            if err != nil {
                t.Fatalf("unexpected error: %v", err)
            }

            val := r.Value.(int)
            if val != int(i)*2 {
                t.Fatalf("wrong value %v; expected %v", val, int(i)*2)
            }
        case <-wp.Done:
            return
        default:
        }
    }
}

看了代码之后,我们知道,这并不是一个传统意义的 worker-pool。它并不像 Handling 1 Million Requests per Minute with Go 这篇文章一样, 初始化一个真正的 worker-pool,一旦接收到 job,就尝试从池中获取一个 worker, 把对应的 job 交给这个 work 进行处理,等 work 处理完毕,重新进行到工作池中,等待下一次被利用。

go
本作品采用《CC 协议》,转载必须注明作者和本文链接
吴亲库里
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!
未填写
文章
46
粉丝
117
喜欢
493
收藏
604
排名:176
访问:5.5 万
私信
所有博文
社区赞助商