为什么要手撕一个Go协程池,用ants不好吗
为什么要手撕一个Go协程池,用ants不好吗
1、协程池是什么?
在Go语言中,协程是其最大的特性之一。在实际应用中,协程的创建和销毁成本比较高。当需要同时处理大量的任务时,创建大量的协程会导致系统开销变大,进而影响程序的性能。这时候,就需要使用协程池来管理协程的生命周期,将协程的创建和销毁成本降至最小,提高程序的并发性能。
尽管Go有高效的GMP调度模型,理论上支持千万的goroutine,但是G过多,总会对GC、调度造成压力,这会使性能降低。因此,就可以考虑做一个协程池,来控制一定数量的协程。
接下来,我使用Go语言手搓一个简单的协程池。
2、协程池实现
使用自定义的协程池,完成以下任务:
计算斐波那契数列
计算阶乘
模拟长时间的 I/O 操作(比如 API 请求)
package main
import (
"fmt"
"math/big"
"sync"
"time"
)
// Task 任务结构体,包含需要执行的函数
type Task struct {
f func() error
}
// NewTask 创建一个新的任务
func NewTask(f func() error) *Task {
return &Task{f: f}
}
// Pool 协程池结构
type Pool struct {
capacity int64 // 最大并发协程数
tasks chan *Task // 任务通道
wg sync.WaitGroup // 用于等待任务完成
}
// NewPool 创建一个新的协程池
func NewPool(capacity int64) *Pool {
if capacity <= 0 {
capacity = 1
}
p := &Pool{
capacity: capacity,
tasks: make(chan *Task),
}
return p
}
// Start 启动固定数量的 worker
func (p *Pool) Start() {
for i := int64(0); i < p.capacity; i++ {
p.wg.Add(1)
go p.worker()
}
}
// worker 负责执行任务
func (p *Pool) worker() {
defer p.wg.Done()
for task := range p.tasks {
if task != nil {
_ = task.f()
}
}
}
// Submit 提交任务到协程池
func (p *Pool) Submit(task *Task) {
p.tasks <- task
}
// Stop 关闭任务通道,并等待所有任务完成
func (p *Pool) Stop() {
close(p.tasks)
p.wg.Wait()
}
// fib 计算斐波那契数列
func fib(n int) *big.Int {
if n <= 1 {
return big.NewInt(int64(n))
}
a, b := big.NewInt(0), big.NewInt(1)
for i := 2; i <= n; i++ {
a.Add(a, b)
a, b = b, a
}
return b
}
// factorial 计算 n 的阶乘
func factorial(n int) *big.Int {
result := big.NewInt(1)
for i := 2; i <= n; i++ {
result.Mul(result, big.NewInt(int64(i)))
}
return result
}
func main() {
// 创建协程池,最多 5 个并发任务
pool := NewPool(5)
pool.Start()
// 提交任务
for i := 10; i <= 15; i++ {
n := i
pool.Submit(NewTask(func() error {
fmt.Printf("Fib(%d) = %s\n", n, fib(n).String())
return nil
}))
}
for i := 5; i <= 10; i++ {
n := i
pool.Submit(NewTask(func() error {
fmt.Printf("%d! = %s\n", n, factorial(n).String())
return nil
}))
}
// 模拟 I/O 操作
for i := 1; i <= 3; i++ {
id := i
pool.Submit(NewTask(func() error {
fmt.Printf("Task %d: Simulating long I/O operation...\n", id)
time.Sleep(2 * time.Second)
fmt.Printf("Task %d: I/O operation completed.\n", id)
return nil
}))
}
pool.Stop()
fmt.Println("All tasks completed.")
}
总结
- 协程池并发执行任务;
- 任务可以是计算密集型或IO密集型;
- 调用Stop方法确保所有任务执行完毕。
运行结果
Fib(10) = 55
Fib(15) = 610
5! = 120
6! = 720
7! = 5040
8! = 40320
9! = 362880
10! = 3628800
Task 1: Simulating long I/O operation...
Fib(14) = 377
Task 2: Simulating long I/O operation...
Fib(11) = 89
Task 3: Simulating long I/O operation...
Fib(13) = 233
Fib(12) = 144
Task 3: I/O operation completed.
Task 1: I/O operation completed.
Task 2: I/O operation completed.
All tasks completed.
3、使用ants
开源地址:github.com/panjf2000/ants/blob/dev...
安装
go get -u github.com/panjf2000/ants/v2
开源的Go协程池。具有高效的协程池管理。所以,直接使用是最好的选择。
package main
import (
"fmt"
"github.com/panjf2000/ants/v2"
"math/big"
"sync"
"time"
)
// fib 计算斐波那契数列
func fib(n int) *big.Int {
if n <= 1 {
return big.NewInt(int64(n))
}
a, b := big.NewInt(0), big.NewInt(1)
for i := 2; i <= n; i++ {
a.Add(a, b)
a, b = b, a
}
return b
}
// factorial 计算 n 的阶乘
func factorial(n int) *big.Int {
result := big.NewInt(1)
for i := 2; i <= n; i++ {
result.Mul(result, big.NewInt(int64(i)))
}
return result
}
func main() {
var wg sync.WaitGroup
pool, err := ants.NewPool(5)
if err != nil {
return
}
defer pool.Release() // 释放协程池
// 提交斐波那契数列计算任务
for i := 10; i <= 15; i++ {
n := i
wg.Add(1)
_ = pool.Submit(func() {
defer wg.Done()
fmt.Printf("Fib(%d) = %s\n", n, fib(n).String())
})
}
// 提交阶乘计算任务
for i := 5; i <= 10; i++ {
n := i
wg.Add(1)
_ = pool.Submit(func() {
defer wg.Done()
fmt.Printf("%d! = %s\n", n, factorial(n).String())
})
}
// 模拟 I/O 操作
for i := 1; i <= 3; i++ {
id := i
wg.Add(1)
_ = pool.Submit(func() {
defer wg.Done()
fmt.Printf("Task %d: Simulating long I/O operation...\n", id)
time.Sleep(2 * time.Second)
fmt.Printf("Task %d: I/O operation completed.\n", id)
})
}
wg.Wait()
fmt.Println("All tasks completed.")
}
运行结果
Fib(14) = 377
Fib(15) = 610
Fib(10) = 55
Fib(13) = 233
Fib(11) = 89
6! = 720
Fib(12) = 144
5! = 120
10! = 3628800
Task 1: Simulating long I/O operation...
Task 2: Simulating long I/O operation...
8! = 40320
9! = 362880
Task 3: Simulating long I/O operation...
7! = 5040
Task 3: I/O operation completed.
Task 2: I/O operation completed.
Task 1: I/O operation completed.
All tasks completed.
4、生产者消费者模型(如果处理高并发请求?)
Go语言中常见的处理高并发的操作,就是通过创建有缓冲的channel来存放请求任务,然后由协程池去处理这个channel里面的任务,从而避免了创建大量goroutine导致性能下降的问题。
// 定义 Worker 处理的任务
func worker(id int, jobs <-chan int, results chan<- string, wg *sync.WaitGroup) {
defer wg.Done() // 任务完成后通知 WaitGroup
for job := range jobs {
// 模拟任务执行时间
time.Sleep(time.Millisecond * time.Duration(rand.Intn(500)))
results <- fmt.Sprintf("Worker %d processed job %d", id, job)
}
}
func main() {
rand.Seed(time.Now().UnixNano())
numWorkers := 5 // 设定固定的 worker 数量
numJobs := 20 // 任务总数
jobs := make(chan int, numJobs) // 创建有缓冲的 channel
results := make(chan string, numJobs) // 存储处理结果
var wg sync.WaitGroup
// 启动固定数量的 Worker
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(i, jobs, results, &wg)
}
// 生产任务
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs) // 关闭 jobs channel,通知 workers 不再有新任务
// 等待所有 worker 完成
wg.Wait()
close(results) // 关闭结果 channel
// 输出所有的处理结果
for result := range results {
fmt.Println(result)
}
}
本作品采用《CC 协议》,转载必须注明作者和本文链接
推荐文章: