并发原语之循环栅栏(cyclicbarrier)
循环栅栏(Cyclicbarrier)常用于需要重复进行一组goroutine同时执行的场景
Cyclicbarrier的大概机制是:允许一组goroutine相互等待,当所有的goroutine都到达了同一个执行点时然后放行。放行后的同一组goroutine可以被重复的使用再次执行,所以就叫着循环栅栏
下面是一个简单例子:
package main
import (
"context"
"fmt"
"github.com/marusama/cyclicbarrier"
"strconv"
"sync"
)
func main() {
//需要执行的任务数
cb := cyclicbarrier.New(5)
var wg sync.WaitGroup
//执行的协程数
for i := 0; i < 5; i++ {
wg.Add(1)
i := i
go func() {
defer wg.Done()
fmt.Printf("开始第1阶段执行 #%d\n", i)
_ = cb.Await(context.Background())
fmt.Printf("开始第2阶段执行 #%d\n", i)
}()
}
wg.Wait()
}
执行结果如下:
CyclicBarrier和WaitGroup的功能有点类似。不过,CyclicBarrier更适合用在“固定数量的goroutine等待同一个执行点”的场景中,而且CyclicBarrier可以重复利用,不用担心panic的问题。WaitGroup重用的时候,必须小心翼翼避免panic。
处理可重用的多goroutine等待同一个执行点的场景的时候,CyclicBarrier和WaitGroup方法调用的对应关系如下图:
- waitGroup使用起来更复杂一些,没有CyclicBrrier那么简洁
CyclicBarrier的原理
- 内部就是一个CyclicBarrier结构体,实现了CyclicBarrier接口
type CyclicBarrier interface { // 等待所有的参与者到达,如果被ctx.Done()中断,会返回ErrBrokenBarrier Await(ctx context.Context) error // 重置循环栅栏到初始化状态。如果当前有等待者,那么它们会返回ErrBrokenBarrier Reset() //获取等待者的数量 GetNumberWaiting() int //获取参与者数量 GetParties() int //循环栅栏是否处于中断状态 IsBroken() bool }
CyclicBarrier有两个初始化方法:
- New方法,它只需要一个参数,来指定循环栅栏参与者的数量;
- NewWithAction,它额外提供了一个函数,是在每一次所有协程都到达同一个执行点时执行一次。具体执行逻辑是,最后一个协程到达后,其他协程还没有执行之前执行这个函数。可以利用这个特点,在放行之前做一些操作啥的。比如记录日志后者更新状态等
CyclicBarrier的签名方法
New(parties int) CyclicBarrier
NewWithAction(parties int, barrierAction func() error) CyclicBarrier
New函数的内部分析
func New(parties int) CyclicBarrier {
if parties <= 0 {
panic("parties must be positive number")
}
return &cyclicBarrier{ //就是初始化一个cyclicBarrier结构体
parties: parties,
lock: sync.RWMutex{},
round: &round{
waitCh: make(chan struct{}),
brokeCh: make(chan struct{}),
},
}
}
NewWithAction内部分析
func NewWithAction(parties int, barrierAction func() error) CyclicBarrier {
if parties <= 0 {
panic("parties must be positive number")
}
return &cyclicBarrier{
parties: parties,
lock: sync.RWMutex{},
round: &round{
waitCh: make(chan struct{}),
brokeCh: make(chan struct{}),
},
barrierAction: barrierAction, //就是这里加了一个函数,上面逻辑都一样
}
}
核心Await和reset函数内部分析
func (b *cyclicBarrier) Await(ctx context.Context) error {
var ctxDoneCh <-chan struct{} //这里是初始化一个上下文管理的可写管道
if ctx != nil {
ctxDoneCh = ctx.Done()
}
// check if context is done
select { //这是对执行超时的判断,如果超时了就直接返回错误
case <-ctxDoneCh:
return ctx.Err()
default:
}
b.lock.Lock()
// check if broken
if b.round.isBroken { //这里是判断是否有被破坏的栅栏,如果有返回错误信息
b.lock.Unlock()
return ErrBrokenBarrier
}
// increment count of waiters
b.round.count++ //增加参与者数
// saving in local variables to prevent race
//这是把全局参数存放到局部变量里,防止数据竞争
waitCh := b.round.waitCh
brokeCh := b.round.brokeCh
count := b.round.count
b.lock.Unlock()
if count > b.parties {
panic("CyclicBarrier.Await is called more than count of parties")
}
//这里就是进行栅栏处理的核心逻辑了
if count < b.parties { //参与者数是否大于设置的任务数,如果是就进入到select里
// wait other parties
select {
case <-waitCh: //这里是对所有的协程进行阻塞
return nil
case <-brokeCh: //这里是如果栅栏别破坏了,直接返回错误
return ErrBrokenBarrier
case <-ctxDoneCh: //这里是超时处理
b.breakBarrier(true)
return ctx.Err()
}
} else { //这里的逻辑就是,当最后一个协程执行完后,就会进入到这里来
// 这里就是对是否有自定义函数的判断处理,如果有就先执行自定义的fn
if b.barrierAction != nil {
err := b.barrierAction()
if err != nil {
b.breakBarrier(true)
return err
}
}
//这里就是重置栅栏了
b.reset(true)
return nil
}
}
//重置栅栏
func (b *cyclicBarrier) reset(safe bool) {
b.lock.Lock()
defer b.lock.Unlock()
if safe {
// broadcast to pass waiting goroutines
//这里就是关闭阻塞的管道,当管道关闭时,会释放所有的协程,然后对应的全部协程都会收到通知,起到了放行的效果
close(b.round.waitCh)
} else if b.round.count > 0 {
b.breakBarrier(false)
}
// create new round
b.round = &round{
waitCh: make(chan struct{}),
brokeCh: make(chan struct{}),
}
}
循环栅栏的核心思想就是利用了 同步channel + select + RwMutex来实现的
- channel 来实现阻塞
- select 来对阻塞的channel类型做处理
- RwMutex 实现并发读写时的安全保护
本作品采用《CC 协议》,转载必须注明作者和本文链接