go

背景#

在服务端开发中我们经常会在一个服务中发起请求调用其他服务。很多服务主要逻辑就是根据产品逻辑调用各个 rpc 请求,再把各个请求的结果组合在一起返回。业内把专注于开发这类服务的开发者称为 API Boy。这类服务的特点是 io 密集,耗时主要是 rpc 请求。所以优化这类服务的耗时就是优化 rpc 请求的串并行关系。那么怎样的串并行关系才是最优的呢?其实很简单,只要做到下面两点就可以了。

  • 所有逻辑都在依赖数据准备好的第一时间开始跑,每个 rpc 请求都在请求数据准备好的第一时间发起。
  • 相同的数据只请求一次。

于是我们就把一个全局的问题分解成各个局部的问题,就可以分而治之,逐个击破。本文主要讨论如何在 Go 语言开发的服务中管理 rpc 调用,实现上述两点。

下面我们先讨论几种情况。

情况 1 多个相同数据的调用合并为一个#

func func1() {
    resp := RPC()
    handle1(resp)
}

func func2() {
    resp := RPC()
    handle2(resp)
}

func Case1() {
    go func1()
    go func2()
}

func1 与 func2 分别调用同一个 rpc 获得同一个结果,然后分别对结果做处理。这里 func1 与 func2 应该合并调用一次 rpc,减少下游的压力。一个问题是这里并不确定 func1 与 func2 谁会先发起 rpc,解决方法是使用 sync.Once。

情况 2 一个协程生产,一个协程消费#

func func1(resp *Resp) {
    resp = RPC()
}

func func2(resp *Resp) {
    handle(resp)
}

func Case2() {
    var resp Resp
    go func1(&resp)
    go func2(&resp)
}

这里 func2 在 handle 之前必须保证 func1 已经生产出 resp。在 Go,这一般用 channel 来实现。这里不能用 sync.Once 是因为 func2 没有生产能力,func2 调用 sync.Once 会导致 RPC 调用不了。

最初的想法:混合 sync.Once 与 channel#

我想做一个通用的包裹来包住 resp,做一个通用的数据获取接口获取 resp,使其能囊括上面两种情况。于是我混合了 sync.Once 与 channel,写出了下面的代码。核心是提供了 InitAndGet 接口来生产数据,Get 接口来获取数据。

type DataGetter[T any] struct {
    Data     T
    DataOnce sync.Once
    SelfOnce sync.Once
    IsReady  chan struct{}
}

func (d *DataGetter[T]) InitAndGet(initFunc func(*T)) *T {
    d.InitSelf()
    d.DataOnce.Do(func() {
        DoInit(initFunc)
    })
    <-d.IsReady
    return &d.Data
}

func (d *DataGetter[T]) Get() *T {
    d.InitSelf()
    <-d.IsReady
    return &d.Data
}

func (d *DataGetter[T]) InitSelf() {
    d.SelfOnce.Do(func() {
        d.IsReady = make(chan struct{})
    })
}

func (d *DataGetter[T]) DoInit(initFunc func(*T)) {
    defer func() {
        recover()
        close(d.IsReady)
    }()
    if initFunc != nil {
        initFunc(&d.Data)
    }
}

于是上面两个 Case 可作如下改写。

func func1(resp *DataGetter[Resp]) {
    handle1(resp.InitAndGet(func(r *Resp) {
        *r = RPC()
    }))
}

func func2(resp *DataGetter[Resp]) {
    handle2(resp.InitAndGet(func(r *Resp) {
        *r = RPC()
    }))
}

func Case1() {
    var resp DataGetter[Resp]
    go func1(&resp)
    go func2(&resp)
}
func func1(resp *DataGetter[Resp]) {
    resp.InitAndGet(func(r *Resp) {
        *r = RPC()
    })
}

func func2(resp *DataGetter[Resp]) {
    handle(resp.Get())
}

func Case2() {
    var resp DataGetter[Resp]
    go func1(&resp)
    go func2(&resp)
}

Get 接口的危险#

看起来满足需求了。但很快我便发现,Get 接口非常危险。我们看下面 Case3,我把 Case2 的 func1 加了个 if。

func func1(resp *DataGetter[Resp]) {
    if Condition() {
        resp.InitAndGet(func(r *Resp) {
            *r = RPC()
        })
    }
}

func func2(resp *DataGetter[Resp]) {
    handle(resp.Get())
}

func Case3() {
    var resp DataGetter[Resp]
    go func1(&resp)
    go func2(&resp)
}

如果 Condition () 为 false,func1 没有生产数据,func2 就会无限等待。我们要加强 Get 接口的安全性。首先会想到的是加一个超时,这是应该的,但是超时只是最后的兜底,它不会帮你把数据生产出来。我们要做的是加以限制,从逻辑上避免出现有消费但没有生产数据的情况。

我先简单修一下 Case3。

func func1(resp *DataGetter[Resp]) {
    defer resp.InitAndGet(nil)
    if Condition() {
        resp.InitAndGet(func(r *Resp) {
            *r = RPC()
        })
    }
}

func func2(resp *DataGetter[Resp]) {
    handle(resp.Get())
}

func Case3() {
    var resp DataGetter[Resp]
    go func1(&resp)
    go func2(&resp)
}

我在 func1 中加了一行 defer,表示如果 Condition () 为 false,resp 就不会生产了,func2 的 handle 就不会永远阻塞,但是要处理没数据的情况。这个 fix 处理了永远阻塞的情况,但前提是 func1 一定会运行。

func func1(resp *DataGetter[Resp]) {
    if Condition() {
        resp.InitAndGet(func(r *Resp) {
            *r = RPC()
        })
    }
}

func func2(resp *DataGetter[Resp]) {
    handle(resp.Get())
}

func func3(resp *DataGetter[Resp]) {
    defer resp.InitAndGet(nil)
    if Condition2() {
        func1(resp)
    }
}

func Case4() {
    var resp DataGetter[Resp]
    go func3(&resp)
    go func2(&resp)
}

在 Case4 中,func1 并不是一定会运行的,func3 才是一定会运行的,所以 defer 语句挪到 func3。这样产生一个问题,RPC 代码在 func1 不在 func3,看 func3 代码时会觉得莫名其妙。更要命的是,除了梳理代码,没有任何方式来检测我这个 defer 写的位置究竟对不对?defer 究竟应该写在 func1 还是 func3 还是别的函数?有没有漏写?写错的话依然会造成无限等待。怎么办呢?

明确知道 Init 在 Get 之前#

有一个比较简单的 case 是 func2 的 handle 知道,如果 resp 有数据,resp 一定会在 hanle 之前调用 InitAndGet,那么 handle 只需要再调用一次 InitAndGet (nil) 就可以保证不无限等待了。由于只有同步操作能保证先后顺序,为了方便 handle 与 resp 的获取在不同协程上进行,我在 DataGetter 增加了一个 AsyncInit 接口。

func (d *DataGetter[T]) AsyncInit(initFunc func(*T)) {
    d.InitSelf()
    d.DataOnce.Do(func() {
        go DoInit(initFunc)
    })
}

其实这个 AsyncInit 的实现是有问题的,如果在 AsyncInit 之前被调用了 InitAndGet,AsyncInit 就会同步等到 InitAndGet 结束,不符合 AyncInit 给人的感觉。这里为了便于理解先这样写着,本文最后会给出修正后的完整代码。Case5 是一个典型的应用场景。

func Case5() {
    var resp DataGetter[Resp]
    resp.AsyncInit(func(r *Resp) {
        *r = RPC()
    })
    handle(resp.InitAndGet(nil))
}

promise#

对于更一般的情况怎么办呢?于是我借鉴别的编程语言,发现了一个叫 promise 的东西,我决定把的 std::promise 抄过来。promise 的思想是,我在 Get 的时候,要指定一个 promise,这个 promise 并不是说保证会生产数据,而是说保证这个数据只会在 promise 的内生产,也就是说,如果 promise 的生命周期结束了,那么数据就不会再生产。更准确地说,我对 promise 提出了下面几个要求。

  • promise 在 Get 之前必须已初始化。
  • promise 的生命周期一定会结束,不会无限等待。
  • 如果数据会生产,则一定会在 promise 的生命周期结束之前调用 InitAndGet 或 AsyncInit。

promise 用 channel 来实现就好了,用 make 来表示 promise 的初始化,用 close 来表示 promise 生命周期的结束。加上上文提到的要加个超时,因为 DataGetter 作为一个通用的东西,并不知道超时多少是合理的,于是就交给外部来设定,直接采用 context 的超时。于是 Get 接口就改成下面这样。

func (d *DataGetter[T]) Get(ctx context.Context, promise chan struct{}) *T {
    d.InitSelf()
    select {
    case <-promise:
        return d.InitAndGet(nil)
    case <-d.IsReady:
        return &d.Data
    case <-ctx.Done():
        return d.InitAndGet(nil)
    }
}

Case4 就改写成下面这样。

func func1(resp *DataGetter[Resp]) {
    if Condition() {
        resp.InitAndGet(func(r *Resp) {
            *r = RPC()
        })
    }
}

func func2(ctx, context.Context, resp *DataGetter[Resp], func3Promise chan struct{}) {
    handle(resp.Get(ctx, func3Promise))
}

func func3(resp *DataGetter[Resp], func3Promise chan struct{}) {
    defer close(func3Promise)
    if Condition2() {
        func1(resp)
    }
}

func Case4() {
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    defer cancel()
    var resp DataGetter[Resp]
    func3Promise := make(chan struct{})
    go func3(&resp, func3Promise)
    go func2(ctx, &resp, func3Promise)
}

这样做的好处是:

  • func3Promise 是只跟 func3 有关,跟 resp 无关的,即使有 resp2、resp3 等也可以用 func3Promise。在 func3 close func3Promise 也不会觉得奇怪。
  • promise 跟 resp 的生产无关,跟消费有关,是 Get 的时候选择的。生产可以无压力地写生产代码,不用担心某个 ifelse 没覆盖到。不同的地方 Get 的时候可以选择不同的 promise。
  • 如果 Get 选错 promise,promise 在 Get 的时候没有初始化,那么程序会 panic。我认为 panic 是比较好定位的错误,因为可以看堆栈。

缺点是 Get 的时候确实需要谨慎选择 promise,使 promise 满足上面说的 promise 的几个条件。一般在生产与消费分叉之前令 promise 初始化,比如 Case4 函数,在分叉之后的生产方的第一个函数进行 close,比如 func3。

move promise#

有时候并不能简单按上面的 promise 规则,比如下面的 Case6,我把 Case4 中 func3 中的 func1 调用改为了 go func1。

func func1(resp *DataGetter[Resp]) {
    if Condition() {
        resp.InitAndGet(func(r *Resp) {
            *r = RPC()
        })
    }
}

func func2(ctx, context.Context, resp *DataGetter[Resp], func3Promise chan struct{}) {
    handle(resp.Get(ctx, func3Promise))
}

func func3(resp *DataGetter[Resp], func3Promise chan struct{}) {
    defer close(func3Promise)
    if Condition2() {
        go func1(resp)
    }
}

func Case6() {
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    defer cancel()
    var resp DataGetter[Resp]
    func3Promise := make(chan struct{})
    go func3(&resp, func3Promise)
    go func2(ctx, &resp, func3Promise)
}

于是就出问题了,func3Promise 被 close 的时候可能 func1 还没被调用,导致 resp 的生产在 func3Promise 生命周期之后,不满足 promise 定义。怎么办呢?我又把目光投向 C++,发现还有个 std::move 的语义,于是我又把它抄过来了。Go 语言里一般没有 move 的概念,我抄什么呢?抄析构函数调用的时机。在 C++ 里,如果一个 promise 被 move 到新的变量,那么原变量就不会执行析构函数。我就抄这个逻辑。下面是改写后的 Case6。

func func1(resp *DataGetter[Resp], func1Promise chan struct{}) {
    defer close(func1Promise)
    if Condition() {
        resp.InitAndGet(func(r *Resp) {
            *r = RPC()
        })
    }
}

func func2(ctx, context.Context, resp *DataGetter[Resp], func1Promise chan struct{}) {
    handle(resp.Get(ctx, func1Promise))
}

func func3(resp *DataGetter[Resp], func1Promise chan struct{}) {
    isFun1PromiseMoved := false
    defer func() {
        if !isFun1PromiseMoved {
            close(func1Promise)
        }
    }()
    if Condition2() {
        isFun1PromiseMoved = true
        go func1(resp, func1Promise)
    }
}

func Case6() {
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    defer cancel()
    var resp DataGetter[Resp]
    func1Promise := make(chan struct{})
    go func3(&resp, func1Promise)
    go func2(ctx, &resp, func1Promise)
}

函数及在函数生命周期发起的协程的总生命周期#

问题是解决了,但是,这怎么这么复杂?而且并没有很通用。比如下面的 Case7 就不行了,func1 被调用多次,其中第一次并不发起 rpc,第二次才发起。

func func1(resp *DataGetter[Resp], func1Promise chan struct{}, condition bool) {
    defer close(func1Promise)
    if condition {
        resp.InitAndGet(func(r *Resp) {
            *r = RPC()
        })
    }
}

func func2(ctx, context.Context, resp *DataGetter[Resp], func1Promise chan struct{}) {
    handle(resp.Get(ctx, func1Promise))
}

func func3(resp *DataGetter[Resp], func1Promise chan struct{}) {
    isFun1PromiseMoved := false
    defer func() {
        if !isFun1PromiseMoved {
            close(func1Promise)
        }
    }()
    if Condition2() {
        isFun1PromiseMoved = true
        go func1(resp, func1Promise, false)
    }
    if Condition3() {
        isFun1PromiseMoved = true
        go func1(resp, func1Promise, true)
    }
}

func Case7() {
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    defer cancel()
    var resp DataGetter[Resp]
    func1Promise := make(chan struct{})
    go func3(&resp, func1Promise)
    go func2(ctx, &resp, func1Promise)
}

于是我又想了一想,发现我可能抄错作业了,有一个更无脑的语义可以用:func3 及在 func3 生命周期里发起的协程的总的生命周期。于是 Case7 改写成下面这样。

func func1(resp *DataGetter[Resp], func1Promise chan struct{}, condition bool) {
    defer close(func1Promise)
    if condition {
        resp.InitAndGet(func(r *Resp) {
            *r = RPC()
        })
    }
}

func func2(ctx, context.Context, resp *DataGetter[Resp], func3Promise chan struct{}) {
    handle(resp.Get(ctx, func3Promise))
}

func func3(resp *DataGetter[Resp], func3Promise chan struct{}) {
    subPromises := make([]chan struct{}, 0)
    defer func() {
        go func() {
            for _, promise := range subPromises {
                <-promise
            }
            close(func3Promise)
        }()
    }()
    if Condition2() {
        func1Promise := make(chan struct{})
        subPromises = append(subPromises, func1Promise)
        go func1(resp, func1Promise, false)
    }
    if Condition3() {
        func1Promise := make(chan struct{})
        subPromises = append(subPromises, func1Promise)
        go func1(resp, func1Promise, true)
    }
}

func Case7() {
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    defer cancel()
    var resp DataGetter[Resp]
    func3Promise := make(chan struct{})
    go func3(&resp, func3Promise)
    go func2(ctx, &resp, func3Promise)
}

这个语义就无脑多了,每见到一个可能生产数据的函数调用就加一个 subPromise 就行了。

总结#

总的来说,promise 就是要划定一个范围,从生产者消费者分开之前开始,到所有可能发生生产的路径的范围。大多数时候,我们只需要无脑划定一个很粗的范围就足够了,比如 Case4,Case5,Case7(Case4 可看作 promise 在 Get 调用的时候 close 掉)。有时候可能真的需要一些很精细的操作去划定一个很精确的范围,不过我暂时还没遇到过,就这样吧,等我遇到了再来补充。

另外提一句,我后来尴尬地发现,其实用 DataGetter 包住数据的方式是很反人类的,因为这改变了生产者的代码。写生产代码的时候可能并不知道哪个数据需要用 DataGetter 包着,等到写消费者的时候才知道。所以,像 sync.Once 这样,在数据之外自行创建一个变量才是更好的方式。所以直接用 DataGetter [struct {}] 就好了。这样的话实际上不需要用泛型了,可以支持更低版本的 Go。

写了这么多,可能大家会疑惑这跟开头说的最优串并行有什么关系啊?其实我写了这么多,就是为了降低门槛,让大家可以随心所欲地在程序的各个地方起协程发起 rpc,同时可以随心所欲地获取其他协程的 rpc 结果。为了达到全局最优,我的方法不是宏观管控,而是做好每一个微观逻辑。一个典型的新需求的开发流程是这样的:在一个合适的地方起一个协程进入新需求的流程,看看新需求依赖的数据在旧代码的哪里生产,在生产的地方加上 DataGetter,然后在新需求的协程里等待结果,等到结果马上开始生产新需求的数据,并加上 DataGetter,新需求的数据需要合入旧逻辑来改变旧逻辑的输出,那么在需要改变旧逻辑的地方,用 DataGetter 去等待新需求的数据,一但等到了马上开始改变。你看,这不就满足了最优串并行的要求了吗?如果旧逻辑是全局最优的,那么加入新需求后的逻辑也依然是全局最优的。这可维护性拉满了啊。

完整代码#

下面是 DataGetter 完整的代码,修复了上面说的 AsyncInit 的问题,去除了泛型。

type DataGetterState = int32

const (
    DataGetterStateNotStarted DataGetterState = 0
    DataGetterStateStarted    DataGetterState = 1
    DataGetterStateDone       DataGetterState = 2
)

type DataGetter struct {
    State    DataGetterState
    SelfOnce sync.Once
    IsReady  chan struct{}
}

func (d *DataGetter) AsyncInit(initFunc func()) {
    if !d.IsMyTurn() {
        return
    }
    d.InitSelf()
    go d.DoInit(initFunc)
}

func (d *DataGetter) IsMyTurn() bool {
    return atomic.CompareAndSwapInt32(&d.State, DataGetterStateNotStarted, DataGetterStateStarted)
}

func (d *DataGetter) Get(ctx context.Context, promise chan struct{}) {
    if atomic.LoadInt32(&d.State) == DataGetterStateDone {
        return
    }
    d.InitSelf()
    select {
    case <-promise:
        d.InitAndGet(nil)
        return
    case <-d.IsReady:
        return
    case <-ctx.Done():
        d.InitAndGet(nil)
        return
    }
}

func (d *DataGetter) InitAndGet(initFunc func()) {
    if atomic.LoadInt32(&d.State) == DataGetterStateDone {
        return
    }
    d.InitSelf()
    if d.IsMyTurn() {
        d.DoInit(initFunc)
    }
    <-d.IsReady
}

func (d *DataGetter) DoInit(initFunc func()) {
    defer func() {
        recover()
        close(d.IsReady)
        atomic.StoreInt32(&d.State, DataGetterStateDone)
    }()
    if initFunc != nil {
        initFunc()
    }
}

func (d *DataGetter) InitSelf() {
    d.SelfOnce.Do(func() {
        d.IsReady = make(chan struct{})
    })
}
本作品采用《CC 协议》,转载必须注明作者和本文链接
BrianWilliam
刘天承