并发原语之singleFlight

SingleFlight是Go开发组提供的一个并发原语

它的作用是,当有多个goroutine同时去执行一个函数时,最终只会有一个goroutine去执行到这个函数,然后返回结果。在返回结果时其他一起执行的goroutine也都会返回相同的结果。在处理某些并发请求时,使用该并发原语可以有效的减少重复的请求次数。

下面是一个实例:

package main

import (
   "fmt"
   "golang.org/x/sync/singleflight"
   "strconv"
   "sync"
   "time"
)

var tmpVar = "test_key"

func main() {
   var g singleflight.Group
  var wg sync.WaitGroup

  // 创建10个协程
  for i := 0; i < 10; i++ {
      wg.Add(1)
      go func() {
         defer wg.Done()
         //使用singleFlight的Do方法来进行并发控制
         ret, err, _ := g.Do(tmpVar, func() (interface{}, error) {
            //这里是模拟业务处理
            timeRet := getTime()
            return timeRet, nil
      })

      if err != nil {
          fmt.Println("报错了", err.Error())
      }

      fmt.Println(ret)
 }()
}

   wg.Wait()
}

func getTime() string {
   fmt.Println("exec query...")
   return strconv.Itoa(int(time.Now().UnixNano()))
}

最后输出的结果是:

并发原语之singleFlight

只有一个协程进行了业务的处理,其他协程都是返回相同的结果

SingleFlight 原理分析

  • singleFlight是通过 mutex + map 实现的,内部是一个group结构体:
    type Group struct {
     mu sync.Mutex       // protects m
     m  map[string]*call // lazily initialized
    }
  • mutex 是用来提供并发读写时的保护
  • map 是用来保存同一个key正在处理的请求

有三个重要方法

Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool)
DoChan(key string, fn func() (interface{}, error)) <-chan Result
Forget(key string)
  • Do:执行一个函数,并返回函数执行的结果。需要提供一个key,对于同一个key,在同一时间只有一个在执行,同一个key并发的请求会等待。第一个执行的请求返回的结果,就是它的返回结果。函数fn是一个无参的函数,返回一个结果或者error,而Do方法会返回函数执行的结果或者是error,shared会指示v是否返回给多个请求。
  • DoChan:类似Do方法,只不过是返回一个chan,等fn函数执行完,产生了结果以后,就能从这个chan中接收这个结果。
  • Forget:告诉Group忘记这个key。这样一来,之后这个key请求会执行f,而不是等待前一个未完成的fn函数的结果

Do函数的内部分析:

func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
    g.mu.Lock()
    if g.m == nil { // 懒加载
        g.m = make(map[string]*call)
    }
    // 先判断 key 是否已经存在
    if c, ok := g.m[key]; ok { //如果相同的key已经存在,就阻塞等待
        c.dups++       //这里是对同一个key下的N个协程的执行次数加1
        g.mu.Unlock()
        c.wg.Wait() //这里进行等待(直到第一个进来的协程完成时,才会进行释放)
        // 这里区分 panic 错误和 runtime 的错误,避免出现死锁
        if e, ok := c.err.(*panicError); ok {
            panic(e)
        } else if c.err == errGoexit {
            runtime.Goexit()
        }
        //直接返回结果
        return c.val, c.err, true
    }
    //如果key不存在则重新走一遍流程
    c := new(call)
    c.wg.Add(1)
    g.m[key] = c //这里的c,只有最先进来的第一个协程才会执行到
    g.mu.Unlock()

     //调用 doCall 去执行
    g.doCall(c, key, fn)
    return c.val, c.err, c.dups > 0
}

DoChan函数的内部分析:

func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
    ch := make(chan Result, 1)
    g.mu.Lock()
    if g.m == nil {
        g.m = make(map[string]*call)
    }
    if c, ok := g.m[key]; ok { //和do函数一样的处理逻辑
        c.dups++
        c.chans = append(c.chans, ch)
        g.mu.Unlock()
        return ch
    }
    c := &call{chans: []chan<- Result{ch}}
    c.wg.Add(1)
    g.m[key] = c
    g.mu.Unlock()

//最后也是调用了doCall函数
    go g.doCall(c, key, fn)
    return ch
}
  • 这个函数可以用来做执行结果的超时控制,防止第一个协程执行太久,后面协程长时间阻塞

  • 比如使用 select + ctx 的方式来做超时控制

    doCall函数内部分析(这是一个内部不公开的函数):

    func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
       // 这两个bool是用于标记结果的返回类型:是正常返回还是发生了painc
       normalReturn := false
       recovered := false
    
       defer func() {
           // 如果既没有正常执行完毕,又没有 recover 就需要直接退出了
           if !normalReturn && !recovered {
               c.err = errGoexit
           }
    
           g.mu.Lock()
           defer g.mu.Unlock()
           c.wg.Done() //这里done执行后,前面wait()的所有协程都会返回了
           if g.m[key] == c { //这个逻辑处理是为了当进行下一轮执行时,可以再重新走一遍新的业务流程,防止返回旧数据
               delete(g.m, key)
           }
    
           if e, ok := c.err.(*panicError); ok {
               // 如果返回的是 panic 错误,为了避免 channel 死锁,是为了确保这个 panic 无法被恢复
               if len(c.chans) > 0 {
                   go panic(e)
                   select {} 
               } else {
                   panic(e)
               }
           } else if c.err == errGoexit {
               // 如果是exitError就直接退出
           } else {
               // 如果没发生错误,就正常往 channel 里写入数据。这里传入的数据就是通过DoChan函数来获取的
               for _, ch := range c.chans {
                   ch <- Result{c.val, c.err, c.dups > 0}
               }
           }
       }()
    
       func() { // 使用匿名函数,保证这个defer能在上一个defer之前执行
           defer func() {
               // 判断是否发生了painc
               if !normalReturn {
                    // 如果panic了就 recover 掉,然后new一个自定义的panic错误信息 
                   if r := recover(); r != nil {
                       c.err = newPanicError(r)
                   }
               }
           }()
    
           //这里就是开始执行我们自己传入的那个匿名函数里的逻辑
           c.val, c.err = fn()
           // 如果正常返回结果,这里就是true
           // 所以最上面那个defer里可以通过这个标记来判定是否 panic 了
           normalReturn = true
       }()
    
       //如果我们传入的fn函数执行发生了painc,这里的值就是true
       if !normalReturn {
           recovered = true
       }
    }

最后一个是Forget函数:

func (g *Group) Forget(key string) {
    g.mu.Lock()
    delete(g.m, key)
    g.mu.Unlock()
}
  • 这个函数就是手动移除某个 key
  • 这个函数的应用场景是啥呢?当并发执行时,如果协程执行过长或者执行失败时,可以定时调用这个函数,删除key,增加重试的执行机会

使用SingleFlight还可以用来解决缓存击穿的问题。当某个key在某些情况下突然在缓存(比如redis)中不存在了,那么大量重复的请求就会被打到数据库,这时数据库压力一下子就变大了,这时候就可以使用singleFlight来解决这个问题了。

本作品采用《CC 协议》,转载必须注明作者和本文链接
fengzi
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!