GRPC断线重连

GRPC断线重连

refer

github.com/grpc/grpc/blob/master/d...

github.com/jpillora/backoff

zacard.net/2021/01/13/log-agent-ba...

stackoverflow.com/questions/371259...

当grpc服务端关闭,过了一会重启后,客户端不用重启,总是能够再次连接到服务端。

从源码分析,探究其中发生了什么。

先说结论

当服务端关闭,客户端并不会去主动建立连接,会更新picker,为下次建立连接做好准备。只有当客户端下次调用接口的时候,客户端才会开一个新的协程主动去建立连接(后面调用接口,不会开启协程创建连接),当建立连接失败的时候,会往一个chan中塞一个函数,另一个协程收到函数后,会调用此函数重试建立连接。具体的时候间隔采用指数退避的方式,总体就是失败的越多,下次等待的时候越长。具体的算法参考。ac.dopts.bs.Backoff(ac.backoffIdx)

前置操作

grpc版本v1.5.9

设置为grpc logging 设置为debug

设置环境变量

export GRPC_GO_LOG_VERBOSITY_LEVEL=99
export GRPC_GO_LOG_SEVERITY_LEVEL=info

具体grpc服务端客户端代码

type GrpcDemoServer struct {
    grpcdemo.UnimplementedGrpcDemoServer
}
func (GrpcDemoServer) Call(ctx context.Context,req *grpcdemo.NoticeReaderReq) (*emptypb.Empty, error) {
    return nil, status.Errorf(codes.Unimplemented, "method Call not implemented")
}
func TestServer(t *testing.T) {
    listener, err := net.Listen("tcp", "0.0.0.0:8899")
    if err != nil {
        log.Println("net listen err ", err)
        return
    }
    s := grpc.NewServer()
    grpcdemo.RegisterGrpcDemoServer(s,new(GrpcDemoServer))
    if err := s.Serve(listener); err != nil {
        log.Println("failed to serve...", err)
        return
    }
}
func TestClient(t *testing.T) {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
   conn, err := grpc.DialContext(ctx, "127.0.0.1:8899", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithConnectParams(grpc.ConnectParams{
        Backoff: backoff.Config{
            BaseDelay:  1.0 * time.Second,
            Multiplier: 1.6,
            Jitter:     0.2,
            MaxDelay:   30 * time.Second,
        },
    }))
if err != nil {
        log.Printf("DialContext failed %v", err)
        return
    }
    for {
        time.Sleep(time.Second * 40)
        cli := grpcdemo.NewGrpcDemoClient(conn)
        result, err := cli.Call(context.Background(), &grpcdemo.NoticeReaderReq{
            Msg:       "",
            NoticeWay: &grpcdemo.NoticeReaderReq_Email{Email: "test"},
        })
        if err != nil {
            log.Printf("Call  failed %v", err)
        }
        log.Printf("================================result================================== %v", result)
    }
}

##

日志

2023/11/26 23:49:27 INFO: [core] [Channel #1 SubChannel #2] Subchannel created
2023/11/26 23:49:29 INFO: [core] [Channel #1] Channel Connectivity change to CONNECTING
2023/11/26 23:49:29 INFO: [core] [Channel #1 SubChannel #2] Subchannel Connectivity change to CONNECTING
2023/11/26 23:49:29 INFO: [core] [Channel #1 SubChannel #2] Subchannel picks a new address "127.0.0.1:8899" to connect
2023/11/26 23:49:29 INFO: [core] [pick-first-lb 0xc00027ea80] Received SubConn state update: 0xc00027ebd0, {ConnectivityState:CONNECTING ConnectionError:<nil>}
2023/11/26 23:49:30 INFO: [core] [Channel #1 SubChannel #2] Subchannel Connectivity change to READY
2023/11/26 23:49:30 INFO: [core] [pick-first-lb 0xc00027ea80] Received SubConn state update: 0xc00027ebd0, {ConnectivityState:READY ConnectionError:<nil>}
2023/11/26 23:49:31 INFO: [core] [Channel #1] Channel Connectivity change to READY
//关闭服务端
2023/11/26 23:49:38 INFO: [transport] [client-transport 0xc000350000] Closing: connection error: desc = "error reading from server: read tcp 127.0.0.1:60811->127.0.0.1:8899: wsarecv: An existing connection was forcibly closed by the remote host."
2023/11/26 23:49:38 INFO: [core] [Channel #1 SubChannel #2] Subchannel Connectivity change to IDLE
2023/11/26 23:49:38 INFO: [core] [pick-first-lb 0xc00027ea80] Received SubConn state update: 0xc00027ebd0, {ConnectivityState:IDLE ConnectionError:<nil>}
2023/11/26 23:49:39 INFO: [transport] [client-transport 0xc000350000] loopyWriter exiting with error: transport closed by client
2023/11/26 23:49:39 INFO: [core] [Channel #1] Channel Connectivity change to IDLE

//客户端调用方法
2023/11/26 23:53:17 INFO: [core] [Channel #1 SubChannel #2] Subchannel Connectivity change to CONNECTING
2023/11/26 23:53:17 INFO: [core] [Channel #1 SubChannel #2] Subchannel picks a new address "127.0.0.1:8899" to connect
2023/11/26 23:53:17 INFO: [core] [pick-first-lb 0xc00027ea80] Received SubConn state update: 0xc00027ebd0, {ConnectivityState:CONNECTING ConnectionError:<nil>}
2023/11/26 23:53:18 INFO: [core] [Channel #1] Channel Connectivity change to CONNECTING
2023/11/26 23:53:20 INFO: [core] Creating new client transport to "{Addr: \"127.0.0.1:8899\", ServerName: \"127.0.0.1:8899\", }": connection error: desc = "transport: Error while dialing: dial tcp 127.0.0.1:8899: connectex: No connection could be made because the target machine actively refused it."
2023/11/26 23:53:20 WARNING: [core] [Channel #1 SubChannel #2] grpc: addrConn.createTransport failed to connect to {Addr: "127.0.0.1:8899", ServerName: "127.0.0.1:8899", }. Err: connection error: desc = "transport: Error while dialing: dial tcp 127.0.0.1:8899: connectex: No connection could be made because the target machine actively refused it."
2023/11/26 23:53:20 INFO: [core] [Channel #1 SubChannel #2] Subchannel Connectivity change to TRANSIENT_FAILURE, last error: connection error: desc = "transport: Error while dialing: dial tcp 127.0.0.1:8899: connectex: No connection could be made because the target machine actively refused it."
2023/11/26 23:53:20 INFO: [core] [pick-first-lb 0xc00027ea80] Received SubConn state update: 0xc00027ebd0, {ConnectivityState:TRANSIENT_FAILURE ConnectionError:connection error: desc = "transport: Error while dialing: dial tcp 127.0.0.1:8899: connectex: No connection could be made because the target machine actively refused it."}
2023/11/26 23:53:23 INFO: [core] [Channel #1] Channel Connectivity change to TRANSIENT_FAILURE
2023/11/26 23:53:23 INFO: [core] [Channel #1 SubChannel #2] Subchannel Connectivity change to IDLE, last error: connection error: desc = "transport: Error while dialing: dial tcp 127.0.0.1:8899: connectex: No connection could be made because the target machine actively refused it."
2023/11/26 23:53:23 Call  failed rpc error: code = Unavailable desc = connection error: desc = "transport: Error while dialing: dial tcp 127.0.0.1:8899: connectex: No connection could be made because the target machine actively refused it."

从日志中分析,当关闭连接的时候客户端并没有主动去重试建立连接,只有当调用了具体的方法的时候客户端才主动去建立的建立连接。

调用链分析

关闭服务端,断开连接

当关闭服务端,断开连接。具体调用链如下

func (t *http2Client) reader(errCh chan<- error) {
    defer close(t.readerDone)

    if err := t.readServerPreface(); err != nil {
        errCh <- err
        return
    }
    close(errCh)
    if t.keepaliveEnabled {
        atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
    }

    // loop to keep reading incoming messages on this transport.
    //循环读取
    for {
        t.controlBuf.throttle()
        frame, err := t.framer.fr.ReadFrame()
        if t.keepaliveEnabled {
            atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
        }
        if err != nil {
            // Abort an active stream if the http2.Framer returns a
            // http2.StreamError. This can happen only if the server's response
            // is malformed http2.
            if se, ok := err.(http2.StreamError); ok {
                t.mu.Lock()
                s := t.activeStreams[se.StreamID]
                t.mu.Unlock()
                if s != nil {
                    // use error detail to provide better err message
                    code := http2ErrConvTab[se.Code]
                    errorDetail := t.framer.fr.ErrorDetail()
                    var msg string
                    if errorDetail != nil {
                        msg = errorDetail.Error()
                    } else {
                        msg = "received invalid frame"
                    }
                    t.closeStream(s, status.Error(code, msg), true, http2.ErrCodeProtocol, status.New(code, msg), nil, false)
                }
                continue
            } else {
                // Transport error.
                  //===============调用此函数 出现错误触发此函数===================
                t.Close(connectionErrorf(true, err, "error reading from server: %v", err))
                return
            }
        }
// Close kicks off the shutdown process of the transport. This should be called
// only once on a transport. Once it is called, the transport should not be
// accessed any more.
func (t *http2Client) Close(err error) {
    t.mu.Lock()
    // Make sure we only close once.
    if t.state == closing {
        t.mu.Unlock()
        return
    }
    if t.logger.V(logLevel) {
        t.logger.Infof("Closing: %v", err)
    }
    // Call t.onClose ASAP to prevent the client from attempting to create new
    // streams.
    if t.state != draining {
          //===============调用此函数===================
        t.onClose(GoAwayInvalid)
    }
    t.state = closing
    streams := t.activeStreams
    t.activeStreams = nil
    if t.kpDormant {
        // If the keepalive goroutine is blocked on this condition variable, we
        // should unblock it so that the goroutine eventually exits.
        t.kpDormancyCond.Signal()
    }
    t.mu.Unlock()
    t.controlBuf.finish()
    t.cancel()
    t.conn.Close()
    onClose := func(r transport.GoAwayReason) {
        ac.mu.Lock()
        defer ac.mu.Unlock()
        // adjust params based on GoAwayReason
        ac.adjustParams(r)
        if ctx.Err() != nil {
            // Already shut down or connection attempt canceled.  tearDown() or
            // updateAddrs() already cleared the transport and canceled hctx
            // via ac.ctx, and we expected this connection to be closed, so do
            // nothing here.
            return
        }
        hcancel()
        if ac.transport == nil {
            // We're still connecting to this address, which could error.  Do
            // not update the connectivity state or resolve; these will happen
            // at the end of the tryAllAddrs connection loop in the event of an
            // error.
            return
        }
        ac.transport = nil
        // Refresh the name resolver on any connection loss.
        ac.cc.resolveNow(resolver.ResolveNowOptions{})
        // Always go idle and wait for the LB policy to initiate a new
        // connection attempt.
        //===============更新状态===================
        ac.updateConnectivityState(connectivity.Idle, nil)
    }
// Note: this requires a lock on ac.mu.
func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) {
    if ac.state == s {
        return
    }
    // When changing states, reset the state change channel.
    close(ac.stateChan)
    ac.stateChan = make(chan struct{})
    ac.state = s
    if lastErr == nil {
        channelz.Infof(logger, ac.channelzID, "Subchannel Connectivity change to %v", s)
    } else {
        channelz.Infof(logger, ac.channelzID, "Subchannel Connectivity change to %v, last error: %s", s, lastErr)
    }
       //===============更新服务状态===================
    ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr)
}
// updateSubConnState is invoked by grpc to push a subConn state update to the
// underlying balancer.
func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connectivity.State, err error) {
    ccb.mu.Lock()
     //===============添加此函数作为信号===================
    ccb.serializer.Schedule(func(_ context.Context) {
        // Even though it is optional for balancers, gracefulswitch ensures
        // opts.StateListener is set, so this cannot ever be nil.

        sc.(*acBalancerWrapper).stateListener(balancer.SubConnState{ConnectivityState: s, ConnectionError: err})
    })
    ccb.mu.Unlock()
}
// Schedule adds a callback to be scheduled after existing callbacks are run.
//
// Callbacks are expected to honor the context when performing any blocking
// operations, and should return early when the context is canceled.
//
// Return value indicates if the callback was successfully added to the list of
// callbacks to be executed by the serializer. It is not possible to add
// callbacks once the context passed to NewCallbackSerializer is cancelled.
func (cs *CallbackSerializer) Schedule(f func(ctx context.Context)) bool {
    cs.closedMu.Lock()
    defer cs.closedMu.Unlock()

    if cs.closed {
        return false
    }
      //===============添加此函数作为信号===================
    cs.callbacks.Put(f)
    return true
}
// Put adds t to the unbounded buffer.
func (b *Unbounded) Put(t any) error {
    b.mu.Lock()
    defer b.mu.Unlock()
    if b.closing {
        return errBufferClosed
    }
    if len(b.backlog) == 0 {
        select {
        case b.c <- t:
            return nil
        default:
        }
    }
    b.backlog = append(b.backlog, t)
    return nil
}
func (cs *CallbackSerializer) run(ctx context.Context) {
    defer close(cs.done)

    // TODO: when Go 1.21 is the oldest supported version, this loop and Close
    // can be replaced with:
    //
    // context.AfterFunc(ctx, cs.callbacks.Close)
    for ctx.Err() == nil {
        select {
        case <-ctx.Done():
            // Do nothing here. Next iteration of the for loop will not happen,
            // since ctx.Err() would be non-nil.
            //===============收到信号,获取推过来的函数===================
        case cb := <-cs.callbacks.Get():
            cs.callbacks.Load()
             //===============调用推过来的函数===================
            cb.(func(context.Context))(ctx)
        }
    }

    // Close the buffer to prevent new callbacks from being added.
    cs.callbacks.Close()

    // Run all pending callbacks.
    for cb := range cs.callbacks.Get() {
        cs.callbacks.Load()
        cb.(func(context.Context))(ctx)
    }
}
// updateSubConnState is invoked by grpc to push a subConn state update to the
// underlying balancer.
func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connectivity.State, err error) {
    ccb.mu.Lock()
    ccb.serializer.Schedule(func(_ context.Context) {
        // Even though it is optional for balancers, gracefulswitch ensures
        // 这个是具体的函数。
        sc.(*acBalancerWrapper).stateListener(balancer.SubConnState{ConnectivityState: s, ConnectionError: err})
    })
    ccb.mu.Unlock()
}
// updateSubConnState forwards the update to the appropriate child.
func (gsb *Balancer) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState, cb func(balancer.SubConnState)) {
    gsb.currentMu.Lock()
    defer gsb.currentMu.Unlock()
    gsb.mu.Lock()
    // Forward update to the appropriate child.  Even if there is a pending
    // balancer, the current balancer should continue to get SubConn updates to
    // maintain the proper state while the pending is still connecting.
    var balToUpdate *balancerWrapper
    if gsb.balancerCurrent != nil && gsb.balancerCurrent.subconns[sc] {
        balToUpdate = gsb.balancerCurrent
    } else if gsb.balancerPending != nil && gsb.balancerPending.subconns[sc] {
        balToUpdate = gsb.balancerPending
    }
    if balToUpdate == nil {
        // SubConn belonged to a stale lb policy that has not yet fully closed,
        // or the balancer was already closed.
        gsb.mu.Unlock()
        return
    }
    if state.ConnectivityState == connectivity.Shutdown {
        delete(balToUpdate.subconns, sc)
    }
    gsb.mu.Unlock()
    if cb != nil {
        //======================走到了这里=========================
        cb(state)
    } else {
        balToUpdate.UpdateSubConnState(sc, state)
    }
}
func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
    addrs := state.ResolverState.Addresses
    if len(addrs) == 0 {
        // The resolver reported an empty address list. Treat it like an error by
        // calling b.ResolverError.
        if b.subConn != nil {
            // Shut down the old subConn. All addresses were removed, so it is
            // no longer valid.
            b.subConn.Shutdown()
            b.subConn = nil
        }
        b.ResolverError(errors.New("produced zero addresses"))
        return balancer.ErrBadResolverState
    }

    // We don't have to guard this block with the env var because ParseConfig
    // already does so.
    cfg, ok := state.BalancerConfig.(pfConfig)
    if state.BalancerConfig != nil && !ok {
        return fmt.Errorf("pickfirst: received illegal BalancerConfig (type %T): %v", state.BalancerConfig, state.BalancerConfig)
    }
    if cfg.ShuffleAddressList {
        addrs = append([]resolver.Address{}, addrs...)
        grpcrand.Shuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] })
    }

    if b.logger.V(2) {
        b.logger.Infof("Received new config %s, resolver state %s", pretty.ToJSON(cfg), pretty.ToJSON(state.ResolverState))
    }

    if b.subConn != nil {
        b.cc.UpdateAddresses(b.subConn, addrs)
        return nil
    }

    var subConn balancer.SubConn
    subConn, err := b.cc.NewSubConn(addrs, balancer.NewSubConnOptions{
        StateListener: func(state balancer.SubConnState) {
            //===============走到了这里=================
            b.updateSubConnState(subConn, state)
        },
    })
func (b *pickfirstBalancer) updateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) {
    if b.logger.V(2) {
        b.logger.Infof("Received SubConn state update: %p, %+v", subConn, state)
    }
    if b.subConn != subConn {
        if b.logger.V(2) {
            b.logger.Infof("Ignored state change because subConn is not recognized")
        }
        return
    }
    if state.ConnectivityState == connectivity.Shutdown {
        b.subConn = nil
        return
    }

    switch state.ConnectivityState {
    case connectivity.Ready:
        b.cc.UpdateState(balancer.State{
            ConnectivityState: state.ConnectivityState,
            Picker:            &picker{result: balancer.PickResult{SubConn: subConn}},
        })
    case connectivity.Connecting:
        if b.state == connectivity.TransientFailure {
            // We stay in TransientFailure until we are Ready. See A62.
            return
        }
        b.cc.UpdateState(balancer.State{
            ConnectivityState: state.ConnectivityState,
            Picker:            &picker{err: balancer.ErrNoSubConnAvailable},
        })
    case connectivity.Idle:
        if b.state == connectivity.TransientFailure {
            // We stay in TransientFailure until we are Ready. Also kick the
            // subConn out of Idle into Connecting. See A62.
            b.subConn.Connect()
            return
        }
        //===========走到了这里================
        b.cc.UpdateState(balancer.State{
            ConnectivityState: state.ConnectivityState,
            Picker:            &idlePicker{subConn: subConn},
        })
    case connectivity.TransientFailure:
        b.cc.UpdateState(balancer.State{
            ConnectivityState: state.ConnectivityState,
            Picker:            &picker{err: state.ConnectionError},
        })
    }
    b.state = state.ConnectivityState
}
func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
    if ccb.isIdleOrClosed() {
        return
    }

    // Update picker before updating state.  Even though the ordering here does
    // not matter, it can lead to multiple calls of Pick in the common start-up
    // case where we wait for ready and then perform an RPC.  If the picker is
    // updated later, we could call the "connecting" picker when the state is
    // updated, and then call the "ready" picker after the picker gets updated.
    //走到了这里更新了picker
    ccb.cc.pickerWrapper.updatePicker(s.Picker)
    ccb.cc.csMgr.updateState(s.ConnectivityState)
}

小结,当出现服务端关闭的时候,会触发更新picker,当我们下次调用grpc接口的时候会重连。

客户端调用接口

当我们调用grpc的接口的时候,调用invoke方法的调用链

//不重要的地方简单说明
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {}

func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) {
// Pick the transport to use and create a new stream on the transport.
    // Assign cs.attempt upon success.
    op := func(a *csAttempt) error {
        //走到了这里
        if err := a.getTransport(); err != nil {
            return err
        }
        if err := a.newStream(); err != nil {
            return err
        }
        // Because this operation is always called either here (while creating
        // the clientStream) or by the retry code while locked when replaying
        // the operation, it is safe to access cs.attempt directly.
        cs.attempt = a
        return nil
    }
    if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil {
        return nil, err
    }


}

func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, balancer.PickResult, error) {


            // If the channel is set, it means that the pick call had to wait for a
        // new picker at some point. Either it's the first iteration and this
        // function received the first picker, or a picker errored with
        // ErrNoSubConnAvailable or errored with failfast set to false, which
        // will trigger a continue to the next iteration. In the first case this
        // conditional will hit if this call had to block (the channel is set).
        // In the second case, the only way it will get to this conditional is
        // if there is a new picker.
        if ch != nil {
            for _, sh := range pw.statsHandlers {
                sh.HandleRPC(ctx, &stats.PickerUpdated{})
            }
        }

        ch = pw.blockingCh
        p := pw.picker
        pw.mu.Unlock()
        //走到了这里
        pickResult, err := p.Pick(info)
}
func (i *idlePicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
    //走到了这里
    i.subConn.Connect()
    return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}
func (acbw *acBalancerWrapper) Connect() {
    go acbw.ac.connect()
}
// connect starts creating a transport.
// It does nothing if the ac is not IDLE.
// TODO(bar) Move this to the addrConn section.
func (ac *addrConn) connect() error {
    ac.mu.Lock()
    if ac.state == connectivity.Shutdown {
        if logger.V(2) {
            logger.Infof("connect called on shutdown addrConn; ignoring.")
        }
        ac.mu.Unlock()
        return errConnClosing
    }
    if ac.state != connectivity.Idle {
        if logger.V(2) {
            logger.Infof("connect called on addrConn in non-idle state (%v); ignoring.", ac.state)
        }
        ac.mu.Unlock()
        return nil
    }
    ac.mu.Unlock()
//走到了这里
    ac.resetTransport()
    return nil
}
func (ac *addrConn) resetTransport() {
    ac.mu.Lock()
    acCtx := ac.ctx
    if acCtx.Err() != nil {
        ac.mu.Unlock()
        return
    }

    addrs := ac.addrs
    ////走到了这里,指数避退,失败次数越多下一次等待调用的时间越长
    backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
    // This will be the duration that dial gets to finish.
    dialDuration := minConnectTimeout
    if ac.dopts.minConnectTimeout != nil {
        dialDuration = ac.dopts.minConnectTimeout()
    }

    if dialDuration < backoffFor {
        // Give dial more time as we keep failing to connect.
        dialDuration = backoffFor
    }
    // We can potentially spend all the time trying the first address, and
    // if the server accepts the connection and then hangs, the following
    // addresses will never be tried.
    //
    // The spec doesn't mention what should be done for multiple addresses.
    // https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm
    connectDeadline := time.Now().Add(dialDuration)

    ac.updateConnectivityState(connectivity.Connecting, nil)
    ac.mu.Unlock()
    // =======================重连=================================
    if err := ac.tryAllAddrs(acCtx, addrs, connectDeadline); err != nil {
        //如果失败还是走到这里
        ac.cc.resolveNow(resolver.ResolveNowOptions{})
        ac.mu.Lock()
        if acCtx.Err() != nil {
            // addrConn was torn down.
            ac.mu.Unlock()
            return
        }
        // After exhausting all addresses, the addrConn enters
        // TRANSIENT_FAILURE.
        ac.updateConnectivityState(connectivity.TransientFailure, err)

        // Backoff.
        b := ac.resetBackoff
        ac.mu.Unlock()

        timer := time.NewTimer(backoffFor)
        select {
        case <-timer.C:
            ac.mu.Lock()
            ac.backoffIdx++
            ac.mu.Unlock()
        case <-b:
            timer.Stop()
        case <-acCtx.Done():
            timer.Stop()
            return
        }

        ac.mu.Lock()
        if acCtx.Err() == nil {
            //重连失败了。走到了这里
            ac.updateConnectivityState(connectivity.Idle, err)
        }
        ac.mu.Unlock()
        return
    }
// Note: this requires a lock on ac.mu.
func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) {
    if ac.state == s {
        return
    }
    // When changing states, reset the state change channel.
    close(ac.stateChan)
    ac.stateChan = make(chan struct{})
    ac.state = s
    if lastErr == nil {
        channelz.Infof(logger, ac.channelzID, "Subchannel Connectivity change to %v", s)
    } else {
        channelz.Infof(logger, ac.channelzID, "Subchannel Connectivity change to %v, last error: %s", s, lastErr)
    }
    //============================走到了这里======================
    ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr)
}
// updateSubConnState is invoked by grpc to push a subConn state update to the
// underlying balancer.
func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connectivity.State, err error) {
    ccb.mu.Lock()
    //========================走到了这里,发送信号,触发重试。============================
    // 收到函数
    ccb.serializer.Schedule(func(_ context.Context) {
        // Even though it is optional for balancers, gracefulswitch ensures
        // opts.StateListener is set, so this cannot ever be nil.
        //
        sc.(*acBalancerWrapper).stateListener(balancer.SubConnState{ConnectivityState: s, ConnectionError: err})
    })
    ccb.mu.Unlock()
}
func (cs *CallbackSerializer) run(ctx context.Context) {
    defer close(cs.done)

    // TODO: when Go 1.21 is the oldest supported version, this loop and Close
    // can be replaced with:
    //
    // context.AfterFunc(ctx, cs.callbacks.Close)
    for ctx.Err() == nil {
        select {
        case <-ctx.Done():
            // Do nothing here. Next iteration of the for loop will not happen,
            // since ctx.Err() would be non-nil.
            //===============收到信号,获取推过来的函===================
        case cb := <-cs.callbacks.Get():
            cs.callbacks.Load()
             //===============执行推过来的函数,具体的调用链和断线执行的时候执行差不多===================
            //只有再执行下面的函数有不同
            cb.(func(context.Context))(ctx)
        }
    }
func (b *pickfirstBalancer) updateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) {
    if b.logger.V(2) {
        b.logger.Infof("Received SubConn state update: %p, %+v", subConn, state)
    }
    if b.subConn != subConn {
        if b.logger.V(2) {
            b.logger.Infof("Ignored state change because subConn is not recognized")
        }
        return
    }
    if state.ConnectivityState == connectivity.Shutdown {
        b.subConn = nil
        return
    }

    switch state.ConnectivityState {
    case connectivity.Ready:
        b.cc.UpdateState(balancer.State{
            ConnectivityState: state.ConnectivityState,
            Picker:            &picker{result: balancer.PickResult{SubConn: subConn}},
        })
    case connectivity.Connecting:
        if b.state == connectivity.TransientFailure {
            // We stay in TransientFailure until we are Ready. See A62.
            return
        }
        b.cc.UpdateState(balancer.State{
            ConnectivityState: state.ConnectivityState,
            Picker:            &picker{err: balancer.ErrNoSubConnAvailable},
        })
    case connectivity.Idle:
        if b.state == connectivity.TransientFailure {
            // We stay in TransientFailure until we are Ready. Also kick the
            // subConn out of Idle into Connecting. See A62.
             //===========走到了这里,重新建立连接================
            b.subConn.Connect()
            return
        }

        b.cc.UpdateState(balancer.State{
            ConnectivityState: state.ConnectivityState,
            Picker:            &idlePicker{subConn: subConn},
        })
    case connectivity.TransientFailure:
        b.cc.UpdateState(balancer.State{
            ConnectivityState: state.ConnectivityState,
            Picker:            &picker{err: state.ConnectionError},
        })
    }
    b.state = state.ConnectivityState
}

整个调用链完成。当建立连接失败的时候会往chan中塞一个函数来重试。

backoff机制

func (bc Exponential) Backoff(retries int) time.Duration {
    if retries == 0 {
        return bc.Config.BaseDelay
    }
    backoff, max := float64(bc.Config.BaseDelay), float64(bc.Config.MaxDelay)
    for backoff < max && retries > 0 {
        backoff *= bc.Config.Multiplier
        retries--
    }
    if backoff > max {
        backoff = max
    }
    // Randomize backoff delays so that if a cluster of requests start at
    // the same time, they won't operate in lockstep.
    //会随机加或减随机时间。
    backoff *= 1 + bc.Config.Jitter*(grpcrand.Float64()*2-1)
    if backoff < 0 {
        return 0
    }
    return time.Duration(backoff)
}

// DefaultConfig is a backoff configuration with the default values specfied
// at https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md.
//
// This should be useful for callers who want to configure backoff with
// non-default values only for a subset of the options.
var DefaultConfig = Config{
    BaseDelay:  1.0 * time.Second, //第一次失败等待多久
    Multiplier: 1.6, //乘以倍数
    Jitter:     0.2, //随机的范围。
    MaxDelay:   120 * time.Second, //最大等待时间
}
//可以通过这个grpc.WithConnectParams参数设置。
conn, err := grpc.DialContext(ctx, "127.0.0.1:8899", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithConnectParams(grpc.ConnectParams{
    Backoff: backoff.Config{
        BaseDelay:  1.0 * time.Second,
        Multiplier: 1.6,
        Jitter:     0.2,
        MaxDelay:   30 * time.Second,
    },
}))
if err != nil {
    log.Printf("DialContext failed %v", err)
    return
}
go
本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!