为什么要手撕一个Go协程池,用ants不好吗

为什么要手撕一个Go协程池,用ants不好吗

1、协程池是什么?

在Go语言中,协程是其最大的特性之一。在实际应用中,协程的创建和销毁成本比较高。当需要同时处理大量的任务时,创建大量的协程会导致系统开销变大,进而影响程序的性能。这时候,就需要使用协程池来管理协程的生命周期,将协程的创建和销毁成本降至最小,提高程序的并发性能。

尽管Go有高效的GMP调度模型,理论上支持千万的goroutine,但是G过多,总会对GC、调度造成压力,这会使性能降低。因此,就可以考虑做一个协程池,来控制一定数量的协程。

接下来,我使用Go语言手搓一个简单的协程池。

2、协程池实现

使用自定义的协程池,完成以下任务:

  • 计算斐波那契数列

  • 计算阶乘

  • 模拟长时间的 I/O 操作(比如 API 请求)

package main

import (
    "fmt"
    "math/big"
    "sync"
    "time"
)

// Task 任务结构体,包含需要执行的函数
type Task struct {
    f func() error
}

// NewTask 创建一个新的任务
func NewTask(f func() error) *Task {
    return &Task{f: f}
}

// Pool 协程池结构
type Pool struct {
    capacity int64          // 最大并发协程数
    tasks    chan *Task     // 任务通道
    wg       sync.WaitGroup // 用于等待任务完成
}

// NewPool 创建一个新的协程池
func NewPool(capacity int64) *Pool {
    if capacity <= 0 {
        capacity = 1
    }
    p := &Pool{
        capacity: capacity,
        tasks:    make(chan *Task),
    }
    return p
}

// Start 启动固定数量的 worker
func (p *Pool) Start() {
    for i := int64(0); i < p.capacity; i++ {
        p.wg.Add(1)
        go p.worker()
    }
}

// worker 负责执行任务
func (p *Pool) worker() {
    defer p.wg.Done()
    for task := range p.tasks {
        if task != nil {
            _ = task.f()
        }
    }
}

// Submit 提交任务到协程池
func (p *Pool) Submit(task *Task) {
    p.tasks <- task
}

// Stop 关闭任务通道,并等待所有任务完成
func (p *Pool) Stop() {
    close(p.tasks)
    p.wg.Wait()
}

// fib 计算斐波那契数列
func fib(n int) *big.Int {
    if n <= 1 {
        return big.NewInt(int64(n))
    }
    a, b := big.NewInt(0), big.NewInt(1)
    for i := 2; i <= n; i++ {
        a.Add(a, b)
        a, b = b, a
    }
    return b
}

// factorial 计算 n 的阶乘
func factorial(n int) *big.Int {
    result := big.NewInt(1)
    for i := 2; i <= n; i++ {
        result.Mul(result, big.NewInt(int64(i)))
    }
    return result
}

func main() {
    // 创建协程池,最多 5 个并发任务
    pool := NewPool(5)
    pool.Start()

    // 提交任务
    for i := 10; i <= 15; i++ {
        n := i
        pool.Submit(NewTask(func() error {
            fmt.Printf("Fib(%d) = %s\n", n, fib(n).String())
            return nil
        }))
    }

    for i := 5; i <= 10; i++ {
        n := i
        pool.Submit(NewTask(func() error {
            fmt.Printf("%d! = %s\n", n, factorial(n).String())
            return nil
        }))
    }

    // 模拟 I/O 操作
    for i := 1; i <= 3; i++ {
        id := i
        pool.Submit(NewTask(func() error {
            fmt.Printf("Task %d: Simulating long I/O operation...\n", id)
            time.Sleep(2 * time.Second)
            fmt.Printf("Task %d: I/O operation completed.\n", id)
            return nil
        }))
    }

    pool.Stop()
    fmt.Println("All tasks completed.")
}

总结

  1. 协程池并发执行任务;
  2. 任务可以是计算密集型或IO密集型;
  3. 调用Stop方法确保所有任务执行完毕。

运行结果

Fib(10) = 55
Fib(15) = 610
5! = 120
6! = 720
7! = 5040
8! = 40320
9! = 362880
10! = 3628800
Task 1: Simulating long I/O operation...
Fib(14) = 377
Task 2: Simulating long I/O operation...
Fib(11) = 89
Task 3: Simulating long I/O operation...
Fib(13) = 233
Fib(12) = 144
Task 3: I/O operation completed.
Task 1: I/O operation completed.
Task 2: I/O operation completed.
All tasks completed.

3、使用ants

开源地址:github.com/panjf2000/ants/blob/dev...

安装

go get -u github.com/panjf2000/ants/v2

开源的Go协程池。具有高效的协程池管理。所以,直接使用是最好的选择。

package main

import (
    "fmt"
    "github.com/panjf2000/ants/v2"
    "math/big"
    "sync"
    "time"
)

// fib 计算斐波那契数列
func fib(n int) *big.Int {
    if n <= 1 {
        return big.NewInt(int64(n))
    }
    a, b := big.NewInt(0), big.NewInt(1)
    for i := 2; i <= n; i++ {
        a.Add(a, b)
        a, b = b, a
    }
    return b
}

// factorial 计算 n 的阶乘
func factorial(n int) *big.Int {
    result := big.NewInt(1)
    for i := 2; i <= n; i++ {
        result.Mul(result, big.NewInt(int64(i)))
    }
    return result
}

func main() {
    var wg sync.WaitGroup

    pool, err := ants.NewPool(5)
    if err != nil {
        return
    }
    defer pool.Release() // 释放协程池

    // 提交斐波那契数列计算任务
    for i := 10; i <= 15; i++ {
        n := i
        wg.Add(1)
        _ = pool.Submit(func() {
            defer wg.Done()
            fmt.Printf("Fib(%d) = %s\n", n, fib(n).String())
        })
    }
    // 提交阶乘计算任务
    for i := 5; i <= 10; i++ {
        n := i
        wg.Add(1)
        _ = pool.Submit(func() {
            defer wg.Done()
            fmt.Printf("%d! = %s\n", n, factorial(n).String())
        })
    }

    // 模拟 I/O 操作
    for i := 1; i <= 3; i++ {
        id := i
        wg.Add(1)
        _ = pool.Submit(func() {
            defer wg.Done()
            fmt.Printf("Task %d: Simulating long I/O operation...\n", id)
            time.Sleep(2 * time.Second)
            fmt.Printf("Task %d: I/O operation completed.\n", id)
        })
    }

    wg.Wait()
    fmt.Println("All tasks completed.")
}

运行结果

Fib(14) = 377
Fib(15) = 610
Fib(10) = 55
Fib(13) = 233
Fib(11) = 89
6! = 720
Fib(12) = 144
5! = 120
10! = 3628800
Task 1: Simulating long I/O operation...
Task 2: Simulating long I/O operation...
8! = 40320
9! = 362880
Task 3: Simulating long I/O operation...
7! = 5040
Task 3: I/O operation completed.
Task 2: I/O operation completed.
Task 1: I/O operation completed.
All tasks completed.

4、生产者消费者模型(如果处理高并发请求?)

Go语言中常见的处理高并发的操作,就是通过创建有缓冲的channel来存放请求任务,然后由协程池去处理这个channel里面的任务,从而避免了创建大量goroutine导致性能下降的问题。

// 定义 Worker 处理的任务
func worker(id int, jobs <-chan int, results chan<- string, wg *sync.WaitGroup) {
    defer wg.Done() // 任务完成后通知 WaitGroup

    for job := range jobs {
        // 模拟任务执行时间
        time.Sleep(time.Millisecond * time.Duration(rand.Intn(500)))
        results <- fmt.Sprintf("Worker %d processed job %d", id, job)
    }
}

func main() {
    rand.Seed(time.Now().UnixNano())

    numWorkers := 5 // 设定固定的 worker 数量
    numJobs := 20   // 任务总数

    jobs := make(chan int, numJobs)       // 创建有缓冲的 channel
    results := make(chan string, numJobs) // 存储处理结果

    var wg sync.WaitGroup

    // 启动固定数量的 Worker
    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go worker(i, jobs, results, &wg)
    }

    // 生产任务
    for j := 1; j <= numJobs; j++ {
        jobs <- j
    }
    close(jobs) // 关闭 jobs channel,通知 workers 不再有新任务

    // 等待所有 worker 完成
    wg.Wait()
    close(results) // 关闭结果 channel

    // 输出所有的处理结果
    for result := range results {
        fmt.Println(result)
    }
}
go
本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!