并发原语之循环栅栏(cyclicbarrier)

循环栅栏(Cyclicbarrier)常用于需要重复进行一组goroutine同时执行的场景

Cyclicbarrier的大概机制是:允许一组goroutine相互等待,当所有的goroutine都到达了同一个执行点时然后放行。放行后的同一组goroutine可以被重复的使用再次执行,所以就叫着循环栅栏

下面是一个简单例子:

package main

import (
    "context"
    "fmt"
    "github.com/marusama/cyclicbarrier"
    "strconv"
    "sync"
)

func main() {
    //需要执行的任务数
    cb := cyclicbarrier.New(5)

    var wg sync.WaitGroup

    //执行的协程数
    for i := 0; i < 5; i++ {
        wg.Add(1)
        i := i
        go func() {
            defer wg.Done()
            fmt.Printf("开始第1阶段执行 #%d\n", i)
            _ = cb.Await(context.Background())

            fmt.Printf("开始第2阶段执行 #%d\n", i)
        }()
    }

    wg.Wait()
}

执行结果如下:

并发原语之循环栅栏(cyclicbarrier)

CyclicBarrier和WaitGroup的功能有点类似。不过,CyclicBarrier更适合用在“固定数量的goroutine等待同一个执行点”的场景中,而且CyclicBarrier可以重复利用,不用担心panic的问题。WaitGroup重用的时候,必须小心翼翼避免panic。

处理可重用的多goroutine等待同一个执行点的场景的时候,CyclicBarrier和WaitGroup方法调用的对应关系如下图:

并发原语之循环栅栏(cyclicbarrier)

  • waitGroup使用起来更复杂一些,没有CyclicBrrier那么简洁

CyclicBarrier的原理

  • 内部就是一个CyclicBarrier结构体,实现了CyclicBarrier接口
    type CyclicBarrier interface {
    // 等待所有的参与者到达,如果被ctx.Done()中断,会返回ErrBrokenBarrier
      Await(ctx context.Context) error
    // 重置循环栅栏到初始化状态。如果当前有等待者,那么它们会返回ErrBrokenBarrier
      Reset()
    //获取等待者的数量
      GetNumberWaiting() int
    //获取参与者数量
      GetParties() int
    //循环栅栏是否处于中断状态
      IsBroken() bool
    }

CyclicBarrier有两个初始化方法:

  • New方法,它只需要一个参数,来指定循环栅栏参与者的数量;
  • NewWithAction,它额外提供了一个函数,是在每一次所有协程都到达同一个执行点时执行一次。具体执行逻辑是,最后一个协程到达后,其他协程还没有执行之前执行这个函数。可以利用这个特点,在放行之前做一些操作啥的。比如记录日志后者更新状态等

CyclicBarrier的签名方法

 New(parties int) CyclicBarrier
 NewWithAction(parties int, barrierAction func() error) CyclicBarrier

New函数的内部分析

func New(parties int) CyclicBarrier {
    if parties <= 0 {
        panic("parties must be positive number")
    }
    return &cyclicBarrier{  //就是初始化一个cyclicBarrier结构体
        parties: parties,
        lock:    sync.RWMutex{},
        round: &round{
            waitCh:  make(chan struct{}),
            brokeCh: make(chan struct{}),
        },
    }
}

NewWithAction内部分析

func NewWithAction(parties int, barrierAction func() error) CyclicBarrier {
    if parties <= 0 {
        panic("parties must be positive number")
    }
    return &cyclicBarrier{
        parties: parties,
        lock:    sync.RWMutex{},
        round: &round{
            waitCh:  make(chan struct{}),
            brokeCh: make(chan struct{}),
        },
        barrierAction: barrierAction,  //就是这里加了一个函数,上面逻辑都一样
    }
}

核心Await和reset函数内部分析

func (b *cyclicBarrier) Await(ctx context.Context) error {
    var     ctxDoneCh <-chan struct{} //这里是初始化一个上下文管理的可写管道

    if ctx != nil {
        ctxDoneCh = ctx.Done()
    }

    // check if context is done
    select {  //这是对执行超时的判断,如果超时了就直接返回错误
    case <-ctxDoneCh:
        return ctx.Err()
    default:
    }

    b.lock.Lock()

    // check if broken
    if b.round.isBroken { //这里是判断是否有被破坏的栅栏,如果有返回错误信息
        b.lock.Unlock()
        return ErrBrokenBarrier
    }

    // increment count of waiters
    b.round.count++     //增加参与者数

    // saving in local variables to prevent race
    //这是把全局参数存放到局部变量里,防止数据竞争
    waitCh := b.round.waitCh
    brokeCh := b.round.brokeCh
    count := b.round.count

    b.lock.Unlock()

    if count > b.parties {
        panic("CyclicBarrier.Await is called more than count of parties")
    }

//这里就是进行栅栏处理的核心逻辑了
    if count < b.parties { //参与者数是否大于设置的任务数,如果是就进入到select里
        // wait other parties
        select {
        case <-waitCh: //这里是对所有的协程进行阻塞
            return nil
        case <-brokeCh:  //这里是如果栅栏别破坏了,直接返回错误
            return ErrBrokenBarrier
        case <-ctxDoneCh: //这里是超时处理
            b.breakBarrier(true)
            return ctx.Err()
        }
    } else {  //这里的逻辑就是,当最后一个协程执行完后,就会进入到这里来
    // 这里就是对是否有自定义函数的判断处理,如果有就先执行自定义的fn
        if b.barrierAction != nil {
            err := b.barrierAction()
            if err != nil {
                b.breakBarrier(true)
                return err
            }
        }
        //这里就是重置栅栏了
        b.reset(true)
        return nil
    }
}
//重置栅栏
func (b *cyclicBarrier) reset(safe bool) {
   b.lock.Lock()
   defer b.lock.Unlock()

   if safe {
      // broadcast to pass waiting goroutines
      //这里就是关闭阻塞的管道,当管道关闭时,会释放所有的协程,然后对应的全部协程都会收到通知,起到了放行的效果
  close(b.round.waitCh)

   } else if b.round.count > 0 {
      b.breakBarrier(false)
   }

   // create new round
  b.round = &round{
      waitCh:  make(chan struct{}),
  brokeCh: make(chan struct{}),
  }
}

循环栅栏的核心思想就是利用了 同步channel + select + RwMutex来实现的

  • channel 来实现阻塞
  • select 来对阻塞的channel类型做处理
  • RwMutex 实现并发读写时的安全保护
本作品采用《CC 协议》,转载必须注明作者和本文链接
fengzi
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!