go

背景

在服务端开发中我们经常会在一个服务中发起请求调用其他服务。很多服务主要逻辑就是根据产品逻辑调用各个rpc请求,再把各个请求的结果组合在一起返回。业内把专注于开发这类服务的开发者称为API Boy。这类服务的特点是io密集,耗时主要是rpc请求。所以优化这类服务的耗时就是优化rpc请求的串并行关系。那么怎样的串并行关系才是最优的呢?其实很简单,只要做到下面两点就可以了。

  • 所有逻辑都在依赖数据准备好的第一时间开始跑,每个rpc请求都在请求数据准备好的第一时间发起。
  • 相同的数据只请求一次。

于是我们就把一个全局的问题分解成各个局部的问题,就可以分而治之,逐个击破。本文主要讨论如何在Go语言开发的服务中管理rpc调用,实现上述两点。

下面我们先讨论几种情况。

情况1 多个相同数据的调用合并为一个

func func1() {
    resp := RPC()
    handle1(resp)
}

func func2() {
    resp := RPC()
    handle2(resp)
}

func Case1() {
    go func1()
    go func2()
}

func1与func2分别调用同一个rpc获得同一个结果,然后分别对结果做处理。这里func1与func2应该合并调用一次rpc,减少下游的压力。一个问题是这里并不确定func1与func2谁会先发起rpc,解决方法是使用sync.Once。

情况2 一个协程生产,一个协程消费

func func1(resp *Resp) {
    resp = RPC()
}

func func2(resp *Resp) {
    handle(resp)
}

func Case2() {
    var resp Resp
    go func1(&resp)
    go func2(&resp)
}

这里func2在handle之前必须保证func1已经生产出resp。在Go,这一般用channel来实现。这里不能用sync.Once是因为func2没有生产能力,func2调用sync.Once会导致RPC调用不了。

最初的想法:混合sync.Once与channel

我想做一个通用的包裹来包住resp,做一个通用的数据获取接口获取resp,使其能囊括上面两种情况。于是我混合了sync.Once与channel,写出了下面的代码。核心是提供了InitAndGet接口来生产数据,Get接口来获取数据。

type DataGetter[T any] struct {
    Data     T
    DataOnce sync.Once
    SelfOnce sync.Once
    IsReady  chan struct{}
}

func (d *DataGetter[T]) InitAndGet(initFunc func(*T)) *T {
    d.InitSelf()
    d.DataOnce.Do(func() {
        DoInit(initFunc)
    })
    <-d.IsReady
    return &d.Data
}

func (d *DataGetter[T]) Get() *T {
    d.InitSelf()
    <-d.IsReady
    return &d.Data
}

func (d *DataGetter[T]) InitSelf() {
    d.SelfOnce.Do(func() {
        d.IsReady = make(chan struct{})
    })
}

func (d *DataGetter[T]) DoInit(initFunc func(*T)) {
    defer func() {
        recover()
        close(d.IsReady)
    }()
    if initFunc != nil {
        initFunc(&d.Data)
    }
}

于是上面两个Case可作如下改写。

func func1(resp *DataGetter[Resp]) {
    handle1(resp.InitAndGet(func(r *Resp) {
        *r = RPC()
    }))
}

func func2(resp *DataGetter[Resp]) {
    handle2(resp.InitAndGet(func(r *Resp) {
        *r = RPC()
    }))
}

func Case1() {
    var resp DataGetter[Resp]
    go func1(&resp)
    go func2(&resp)
}
func func1(resp *DataGetter[Resp]) {
    resp.InitAndGet(func(r *Resp) {
        *r = RPC()
    })
}

func func2(resp *DataGetter[Resp]) {
    handle(resp.Get())
}

func Case2() {
    var resp DataGetter[Resp]
    go func1(&resp)
    go func2(&resp)
}

Get接口的危险

看起来满足需求了。但很快我便发现,Get接口非常危险。我们看下面Case3,我把Case2的func1加了个if。

func func1(resp *DataGetter[Resp]) {
    if Condition() {
        resp.InitAndGet(func(r *Resp) {
            *r = RPC()
        })
    }
}

func func2(resp *DataGetter[Resp]) {
    handle(resp.Get())
}

func Case3() {
    var resp DataGetter[Resp]
    go func1(&resp)
    go func2(&resp)
}

如果Condition()为false,func1没有生产数据,func2就会无限等待。我们要加强Get接口的安全性。首先会想到的是加一个超时,这是应该的,但是超时只是最后的兜底,它不会帮你把数据生产出来。我们要做的是加以限制,从逻辑上避免出现有消费但没有生产数据的情况。

我先简单修一下Case3。

func func1(resp *DataGetter[Resp]) {
    defer resp.InitAndGet(nil)
    if Condition() {
        resp.InitAndGet(func(r *Resp) {
            *r = RPC()
        })
    }
}

func func2(resp *DataGetter[Resp]) {
    handle(resp.Get())
}

func Case3() {
    var resp DataGetter[Resp]
    go func1(&resp)
    go func2(&resp)
}

我在func1中加了一行defer,表示如果Condition()为false,resp就不会生产了,func2的handle就不会永远阻塞,但是要处理没数据的情况。这个fix处理了永远阻塞的情况,但前提是func1一定会运行。

func func1(resp *DataGetter[Resp]) {
    if Condition() {
        resp.InitAndGet(func(r *Resp) {
            *r = RPC()
        })
    }
}

func func2(resp *DataGetter[Resp]) {
    handle(resp.Get())
}

func func3(resp *DataGetter[Resp]) {
    defer resp.InitAndGet(nil)
    if Condition2() {
        func1(resp)
    }
}

func Case4() {
    var resp DataGetter[Resp]
    go func3(&resp)
    go func2(&resp)
}

在Case4中,func1并不是一定会运行的,func3才是一定会运行的,所以defer语句挪到func3。这样产生一个问题,RPC代码在func1不在func3,看func3代码时会觉得莫名其妙。更要命的是,除了梳理代码,没有任何方式来检测我这个defer写的位置究竟对不对?defer究竟应该写在func1还是func3还是别的函数?有没有漏写?写错的话依然会造成无限等待。怎么办呢?

明确知道Init在Get之前

有一个比较简单的case是func2的handle知道,如果resp有数据,resp一定会在hanle之前调用InitAndGet,那么handle只需要再调用一次InitAndGet(nil)就可以保证不无限等待了。由于只有同步操作能保证先后顺序,为了方便handle与resp的获取在不同协程上进行,我在DataGetter增加了一个AsyncInit接口。

func (d *DataGetter[T]) AsyncInit(initFunc func(*T)) {
    d.InitSelf()
    d.DataOnce.Do(func() {
        go DoInit(initFunc)
    })
}

其实这个AsyncInit的实现是有问题的,如果在AsyncInit之前被调用了InitAndGet,AsyncInit就会同步等到InitAndGet结束,不符合AyncInit给人的感觉。这里为了便于理解先这样写着,本文最后会给出修正后的完整代码。Case5是一个典型的应用场景。

func Case5() {
    var resp DataGetter[Resp]
    resp.AsyncInit(func(r *Resp) {
        *r = RPC()
    })
    handle(resp.InitAndGet(nil))
}

promise

对于更一般的情况怎么办呢?于是我借鉴别的编程语言,发现了一个叫promise的东西,我决定把的std::promise抄过来。promise的思想是,我在Get的时候,要指定一个promise,这个promise并不是说保证会生产数据,而是说保证这个数据只会在promise的内生产,也就是说,如果promise的生命周期结束了,那么数据就不会再生产。更准确地说,我对promise提出了下面几个要求。

  • promise在Get之前必须已初始化。
  • promise的生命周期一定会结束,不会无限等待。
  • 如果数据会生产,则一定会在promise的生命周期结束之前调用InitAndGet或AsyncInit。

promise用channel来实现就好了,用make来表示promise的初始化,用close来表示promise生命周期的结束。加上上文提到的要加个超时,因为DataGetter作为一个通用的东西,并不知道超时多少是合理的,于是就交给外部来设定,直接采用context的超时。于是Get接口就改成下面这样。

func (d *DataGetter[T]) Get(ctx context.Context, promise chan struct{}) *T {
    d.InitSelf()
    select {
    case <-promise:
        return d.InitAndGet(nil)
    case <-d.IsReady:
        return &d.Data
    case <-ctx.Done():
        return d.InitAndGet(nil)
    }
}

Case4就改写成下面这样。

func func1(resp *DataGetter[Resp]) {
    if Condition() {
        resp.InitAndGet(func(r *Resp) {
            *r = RPC()
        })
    }
}

func func2(ctx, context.Context, resp *DataGetter[Resp], func3Promise chan struct{}) {
    handle(resp.Get(ctx, func3Promise))
}

func func3(resp *DataGetter[Resp], func3Promise chan struct{}) {
    defer close(func3Promise)
    if Condition2() {
        func1(resp)
    }
}

func Case4() {
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    defer cancel()
    var resp DataGetter[Resp]
    func3Promise := make(chan struct{})
    go func3(&resp, func3Promise)
    go func2(ctx, &resp, func3Promise)
}

这样做的好处是:

  • func3Promise是只跟func3有关,跟resp无关的,即使有resp2、resp3等也可以用func3Promise。在func3 close func3Promise也不会觉得奇怪。
  • promise跟resp的生产无关,跟消费有关,是Get的时候选择的。生产可以无压力地写生产代码,不用担心某个ifelse没覆盖到。不同的地方Get的时候可以选择不同的promise。
  • 如果Get选错promise,promise在Get的时候没有初始化,那么程序会panic。我认为panic是比较好定位的错误,因为可以看堆栈。

缺点是Get的时候确实需要谨慎选择promise,使promise满足上面说的promise的几个条件。一般在生产与消费分叉之前令promise初始化,比如Case4函数,在分叉之后的生产方的第一个函数进行close,比如func3。

move promise

有时候并不能简单按上面的promise规则,比如下面的Case6,我把Case4中func3中的func1调用改为了go func1。

func func1(resp *DataGetter[Resp]) {
    if Condition() {
        resp.InitAndGet(func(r *Resp) {
            *r = RPC()
        })
    }
}

func func2(ctx, context.Context, resp *DataGetter[Resp], func3Promise chan struct{}) {
    handle(resp.Get(ctx, func3Promise))
}

func func3(resp *DataGetter[Resp], func3Promise chan struct{}) {
    defer close(func3Promise)
    if Condition2() {
        go func1(resp)
    }
}

func Case6() {
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    defer cancel()
    var resp DataGetter[Resp]
    func3Promise := make(chan struct{})
    go func3(&resp, func3Promise)
    go func2(ctx, &resp, func3Promise)
}

于是就出问题了,func3Promise被close的时候可能func1还没被调用,导致resp的生产在func3Promise生命周期之后,不满足promise定义。怎么办呢?我又把目光投向C++,发现还有个std::move的语义,于是我又把它抄过来了。Go语言里一般没有move的概念,我抄什么呢?抄析构函数调用的时机。在C++里,如果一个promise被move到新的变量,那么原变量就不会执行析构函数。我就抄这个逻辑。下面是改写后的Case6。

func func1(resp *DataGetter[Resp], func1Promise chan struct{}) {
    defer close(func1Promise)
    if Condition() {
        resp.InitAndGet(func(r *Resp) {
            *r = RPC()
        })
    }
}

func func2(ctx, context.Context, resp *DataGetter[Resp], func1Promise chan struct{}) {
    handle(resp.Get(ctx, func1Promise))
}

func func3(resp *DataGetter[Resp], func1Promise chan struct{}) {
    isFun1PromiseMoved := false
    defer func() {
        if !isFun1PromiseMoved {
            close(func1Promise)
        }
    }()
    if Condition2() {
        isFun1PromiseMoved = true
        go func1(resp, func1Promise)
    }
}

func Case6() {
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    defer cancel()
    var resp DataGetter[Resp]
    func1Promise := make(chan struct{})
    go func3(&resp, func1Promise)
    go func2(ctx, &resp, func1Promise)
}

函数及在函数生命周期发起的协程的总生命周期

问题是解决了,但是,这怎么这么复杂?而且并没有很通用。比如下面的Case7就不行了,func1被调用多次,其中第一次并不发起rpc,第二次才发起。

func func1(resp *DataGetter[Resp], func1Promise chan struct{}, condition bool) {
    defer close(func1Promise)
    if condition {
        resp.InitAndGet(func(r *Resp) {
            *r = RPC()
        })
    }
}

func func2(ctx, context.Context, resp *DataGetter[Resp], func1Promise chan struct{}) {
    handle(resp.Get(ctx, func1Promise))
}

func func3(resp *DataGetter[Resp], func1Promise chan struct{}) {
    isFun1PromiseMoved := false
    defer func() {
        if !isFun1PromiseMoved {
            close(func1Promise)
        }
    }()
    if Condition2() {
        isFun1PromiseMoved = true
        go func1(resp, func1Promise, false)
    }
    if Condition3() {
        isFun1PromiseMoved = true
        go func1(resp, func1Promise, true)
    }
}

func Case7() {
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    defer cancel()
    var resp DataGetter[Resp]
    func1Promise := make(chan struct{})
    go func3(&resp, func1Promise)
    go func2(ctx, &resp, func1Promise)
}

于是我又想了一想,发现我可能抄错作业了,有一个更无脑的语义可以用:func3及在func3生命周期里发起的协程的总的生命周期。于是Case7改写成下面这样。

func func1(resp *DataGetter[Resp], func1Promise chan struct{}, condition bool) {
    defer close(func1Promise)
    if condition {
        resp.InitAndGet(func(r *Resp) {
            *r = RPC()
        })
    }
}

func func2(ctx, context.Context, resp *DataGetter[Resp], func3Promise chan struct{}) {
    handle(resp.Get(ctx, func3Promise))
}

func func3(resp *DataGetter[Resp], func3Promise chan struct{}) {
    subPromises := make([]chan struct{}, 0)
    defer func() {
        go func() {
            for _, promise := range subPromises {
                <-promise
            }
            close(func3Promise)
        }()
    }()
    if Condition2() {
        func1Promise := make(chan struct{})
        subPromises = append(subPromises, func1Promise)
        go func1(resp, func1Promise, false)
    }
    if Condition3() {
        func1Promise := make(chan struct{})
        subPromises = append(subPromises, func1Promise)
        go func1(resp, func1Promise, true)
    }
}

func Case7() {
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    defer cancel()
    var resp DataGetter[Resp]
    func3Promise := make(chan struct{})
    go func3(&resp, func3Promise)
    go func2(ctx, &resp, func3Promise)
}

这个语义就无脑多了,每见到一个可能生产数据的函数调用就加一个subPromise就行了。

总结

总的来说,promise就是要划定一个范围,从生产者消费者分开之前开始,到所有可能发生生产的路径的范围。大多数时候,我们只需要无脑划定一个很粗的范围就足够了,比如Case4,Case5,Case7(Case4可看作promise在Get调用的时候close掉)。有时候可能真的需要一些很精细的操作去划定一个很精确的范围,不过我暂时还没遇到过,就这样吧,等我遇到了再来补充。

另外提一句,我后来尴尬地发现,其实用DataGetter包住数据的方式是很反人类的,因为这改变了生产者的代码。写生产代码的时候可能并不知道哪个数据需要用DataGetter包着,等到写消费者的时候才知道。所以,像sync.Once这样,在数据之外自行创建一个变量才是更好的方式。所以直接用DataGetter[struct{}]就好了。这样的话实际上不需要用泛型了,可以支持更低版本的Go。

写了这么多,可能大家会疑惑这跟开头说的最优串并行有什么关系啊?其实我写了这么多,就是为了降低门槛,让大家可以随心所欲地在程序的各个地方起协程发起rpc,同时可以随心所欲地获取其他协程的rpc结果。为了达到全局最优,我的方法不是宏观管控,而是做好每一个微观逻辑。一个典型的新需求的开发流程是这样的:在一个合适的地方起一个协程进入新需求的流程,看看新需求依赖的数据在旧代码的哪里生产,在生产的地方加上DataGetter,然后在新需求的协程里等待结果,等到结果马上开始生产新需求的数据,并加上DataGetter,新需求的数据需要合入旧逻辑来改变旧逻辑的输出,那么在需要改变旧逻辑的地方,用DataGetter去等待新需求的数据,一但等到了马上开始改变。你看,这不就满足了最优串并行的要求了吗?如果旧逻辑是全局最优的,那么加入新需求后的逻辑也依然是全局最优的。这可维护性拉满了啊。

完整代码

下面是DataGetter完整的代码,修复了上面说的AsyncInit的问题,去除了泛型。

type DataGetterState = int32

const (
    DataGetterStateNotStarted DataGetterState = 0
    DataGetterStateStarted    DataGetterState = 1
    DataGetterStateDone       DataGetterState = 2
)

type DataGetter struct {
    State    DataGetterState
    SelfOnce sync.Once
    IsReady  chan struct{}
}

func (d *DataGetter) AsyncInit(initFunc func()) {
    if !d.IsMyTurn() {
        return
    }
    d.InitSelf()
    go d.DoInit(initFunc)
}

func (d *DataGetter) IsMyTurn() bool {
    return atomic.CompareAndSwapInt32(&d.State, DataGetterStateNotStarted, DataGetterStateStarted)
}

func (d *DataGetter) Get(ctx context.Context, promise chan struct{}) {
    if atomic.LoadInt32(&d.State) == DataGetterStateDone {
        return
    }
    d.InitSelf()
    select {
    case <-promise:
        d.InitAndGet(nil)
        return
    case <-d.IsReady:
        return
    case <-ctx.Done():
        d.InitAndGet(nil)
        return
    }
}

func (d *DataGetter) InitAndGet(initFunc func()) {
    if atomic.LoadInt32(&d.State) == DataGetterStateDone {
        return
    }
    d.InitSelf()
    if d.IsMyTurn() {
        d.DoInit(initFunc)
    }
    <-d.IsReady
}

func (d *DataGetter) DoInit(initFunc func()) {
    defer func() {
        recover()
        close(d.IsReady)
        atomic.StoreInt32(&d.State, DataGetterStateDone)
    }()
    if initFunc != nil {
        initFunc()
    }
}

func (d *DataGetter) InitSelf() {
    d.SelfOnce.Do(func() {
        d.IsReady = make(chan struct{})
    })
}
本作品采用《CC 协议》,转载必须注明作者和本文链接
BrianWilliam
刘天承
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!