并发原语之singleFlight
SingleFlight是Go开发组提供的一个并发原语
它的作用是,当有多个goroutine同时去执行一个函数时,最终只会有一个goroutine去执行到这个函数,然后返回结果。在返回结果时其他一起执行的goroutine也都会返回相同的结果。在处理某些并发请求时,使用该并发原语可以有效的减少重复的请求次数。
下面是一个实例:
package main
import (
"fmt"
"golang.org/x/sync/singleflight"
"strconv"
"sync"
"time"
)
var tmpVar = "test_key"
func main() {
var g singleflight.Group
var wg sync.WaitGroup
// 创建10个协程
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
//使用singleFlight的Do方法来进行并发控制
ret, err, _ := g.Do(tmpVar, func() (interface{}, error) {
//这里是模拟业务处理
timeRet := getTime()
return timeRet, nil
})
if err != nil {
fmt.Println("报错了", err.Error())
}
fmt.Println(ret)
}()
}
wg.Wait()
}
func getTime() string {
fmt.Println("exec query...")
return strconv.Itoa(int(time.Now().UnixNano()))
}
最后输出的结果是:
只有一个协程进行了业务的处理,其他协程都是返回相同的结果
SingleFlight 原理分析
- singleFlight是通过 mutex + map 实现的,内部是一个group结构体:
type Group struct { mu sync.Mutex // protects m m map[string]*call // lazily initialized }
- mutex 是用来提供并发读写时的保护
- map 是用来保存同一个key正在处理的请求
有三个重要方法
Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool)
DoChan(key string, fn func() (interface{}, error)) <-chan Result
Forget(key string)
- Do:执行一个函数,并返回函数执行的结果。需要提供一个key,对于同一个key,在同一时间只有一个在执行,同一个key并发的请求会等待。第一个执行的请求返回的结果,就是它的返回结果。函数fn是一个无参的函数,返回一个结果或者error,而Do方法会返回函数执行的结果或者是error,shared会指示v是否返回给多个请求。
- DoChan:类似Do方法,只不过是返回一个chan,等fn函数执行完,产生了结果以后,就能从这个chan中接收这个结果。
- Forget:告诉Group忘记这个key。这样一来,之后这个key请求会执行f,而不是等待前一个未完成的fn函数的结果
Do函数的内部分析:
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
if g.m == nil { // 懒加载
g.m = make(map[string]*call)
}
// 先判断 key 是否已经存在
if c, ok := g.m[key]; ok { //如果相同的key已经存在,就阻塞等待
c.dups++ //这里是对同一个key下的N个协程的执行次数加1
g.mu.Unlock()
c.wg.Wait() //这里进行等待(直到第一个进来的协程完成时,才会进行释放)
// 这里区分 panic 错误和 runtime 的错误,避免出现死锁
if e, ok := c.err.(*panicError); ok {
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
//直接返回结果
return c.val, c.err, true
}
//如果key不存在则重新走一遍流程
c := new(call)
c.wg.Add(1)
g.m[key] = c //这里的c,只有最先进来的第一个协程才会执行到
g.mu.Unlock()
//调用 doCall 去执行
g.doCall(c, key, fn)
return c.val, c.err, c.dups > 0
}
DoChan函数的内部分析:
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
ch := make(chan Result, 1)
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok { //和do函数一样的处理逻辑
c.dups++
c.chans = append(c.chans, ch)
g.mu.Unlock()
return ch
}
c := &call{chans: []chan<- Result{ch}}
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
//最后也是调用了doCall函数
go g.doCall(c, key, fn)
return ch
}
这个函数可以用来做执行结果的超时控制,防止第一个协程执行太久,后面协程长时间阻塞
比如使用 select + ctx 的方式来做超时控制
doCall函数内部分析(这是一个内部不公开的函数):
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) { // 这两个bool是用于标记结果的返回类型:是正常返回还是发生了painc normalReturn := false recovered := false defer func() { // 如果既没有正常执行完毕,又没有 recover 就需要直接退出了 if !normalReturn && !recovered { c.err = errGoexit } g.mu.Lock() defer g.mu.Unlock() c.wg.Done() //这里done执行后,前面wait()的所有协程都会返回了 if g.m[key] == c { //这个逻辑处理是为了当进行下一轮执行时,可以再重新走一遍新的业务流程,防止返回旧数据 delete(g.m, key) } if e, ok := c.err.(*panicError); ok { // 如果返回的是 panic 错误,为了避免 channel 死锁,是为了确保这个 panic 无法被恢复 if len(c.chans) > 0 { go panic(e) select {} } else { panic(e) } } else if c.err == errGoexit { // 如果是exitError就直接退出 } else { // 如果没发生错误,就正常往 channel 里写入数据。这里传入的数据就是通过DoChan函数来获取的 for _, ch := range c.chans { ch <- Result{c.val, c.err, c.dups > 0} } } }() func() { // 使用匿名函数,保证这个defer能在上一个defer之前执行 defer func() { // 判断是否发生了painc if !normalReturn { // 如果panic了就 recover 掉,然后new一个自定义的panic错误信息 if r := recover(); r != nil { c.err = newPanicError(r) } } }() //这里就是开始执行我们自己传入的那个匿名函数里的逻辑 c.val, c.err = fn() // 如果正常返回结果,这里就是true // 所以最上面那个defer里可以通过这个标记来判定是否 panic 了 normalReturn = true }() //如果我们传入的fn函数执行发生了painc,这里的值就是true if !normalReturn { recovered = true } }
最后一个是Forget函数:
func (g *Group) Forget(key string) {
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
}
- 这个函数就是手动移除某个 key
- 这个函数的应用场景是啥呢?当并发执行时,如果协程执行过长或者执行失败时,可以定时调用这个函数,删除key,增加重试的执行机会
使用SingleFlight还可以用来解决缓存击穿的问题。当某个key在某些情况下突然在缓存(比如redis)中不存在了,那么大量重复的请求就会被打到数据库,这时数据库压力一下子就变大了,这时候就可以使用singleFlight来解决这个问题了。
本作品采用《CC 协议》,转载必须注明作者和本文链接