Go语言并发模式:超越Goroutine和Channel的进阶实践
引言
Go语言以轻量级的goroutine和优雅的channel闻名,但真正的高效并发系统往往需要更深入的设计模式。本文将探讨三种超越基础概念的进阶并发模式,帮助你构建更健壮、可控的并发系统。
1. 扇出/扇入模式(Fan-out/Fan-in)
当一个慢速生产者需要被多个消费者并行处理时,扇出模式非常有用。
go
复制
下载
func fanOut(in <-chan int, workers int) []<-chan int {
channels := make([]<-chan int, workers)
for i := 0; i < workers; i++ {
channels[i] = worker(in)
}
return channels
}
func worker(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n // 模拟耗时计算
}
}()
return out
}
func fanIn(channels …<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for v := range c {
out <- v
}
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
2. 带取消的Pipeline
使用context实现优雅的取消机制,避免goroutine泄露:
go
复制
下载
type PipelineStage func(context.Context, <-chan int) <-chan int
func WithCancel(stage PipelineStage) PipelineStage {
return func(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for {
select {
case <-ctx.Done():
return
case val, ok := <-in:
if !ok {
return
}
out <- val
}
}
}()
return out
}
}
func Pipeline(ctx context.Context, stages …PipelineStage) {
var ch <-chan int
for _, stage := range stages {
ch = stage(ctx, ch)
}
}
3. 速率限制与令牌桶
在高并发场景中,限流是保护后端服务的必要手段:
go
复制
下载
type RateLimiter struct {
tokens chan struct{}
ticker *time.Ticker
}
func NewRateLimiter(rate int, burst int) *RateLimiter {
rl := &RateLimiter{
tokens: make(chan struct{}, burst),
ticker: time.NewTicker(time.Second / time.Duration(rate)),
}
go func() {
for range rl.ticker.C {
select {
case rl.tokens <- struct{}{}:
default:
}
}
}()
return rl
}
func (rl *RateLimiter) Wait(ctx context.Context) error {
select {
case <-rl.tokens:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// 使用示例
func processWithRateLimit(requests []Request) {
limiter := NewRateLimiter(100, 20) // 每秒100个请求,突发20个
var wg sync.WaitGroup
for _, req := range requests {
wg.Add(1)
go func(r Request) {
defer wg.Done()
if err := limiter.Wait(context.Background()); err != nil {
return
}
handleRequest(r)
}(req)
}
wg.Wait()
}
最佳实践总结
| 模式 | 适用场景 | 注意事项 |
|---|---|---|
| 扇出/扇入 | CPU密集型任务并行处理 | 注意goroutine数量控制 |
| Pipeline | 数据流处理 | 每个阶段应有清晰的边界 |
| 速率限制 | API限流、资源保护 | 选择合适的算法和参数 |
关于 LearnKu
推荐文章: