Go 实现 Raft 第三篇:命令和日志复制

客户端交互

在第一篇文章中我们简要讨论了客户端交互,如果不清晰建议可以再回顾一下。这里我们先不关注客户端如何找到领导者,将重点讨论当找到一个领导者时会发生什么。

  1. 首先,客户端将命令提交给领导者。在Raft集群中,命令通常只提交给单个节点。

  2. 领导者将命令复制到其跟随者。

  3. 最后,如果大多数集群节点都承认在其日志中有该命令,该命令将被提交,并向所有客户端通知新的提交。

注意提交和提交命令之间的不对称性 - 在检查我们即将讨论的实现决策时,这一点很重要。命令被提交到单个Raft节点,但是多个节点(特别是所有已连接/活动的节点啊)会在一段时间后将其提交并通知其客户端。

回顾此图:

Go 实现 Raft 第三篇:命令和日志复制

状态机代表使用Raft进行复制的任意服务。

然后我们在Raft ConsensusModule模块的上下文中讨论客户端,我们通常指的是此服务,因为这是将提交报告到的地方。换句话说,从 Consensus 模块到服务状态机的黑色箭头就是该通知。

提交管道

在我们的实现中,当一个 ConsensusModule 被创建时,它接受一个提交管道 - 一个用来向调用者发送提交命令的通道:commitChan chan<-CommitEntry。定义如下:

// CommitEntry is the data reported by Raft to the commit channel. Each commit
// entry notifies the client that consensus was reached on a command and it can
// be applied to the client's state machine.
type CommitEntry struct {
  // Command is the client command being committed.
  Command interface{}

  // Index is the log index at which the client command is committed.
  Index int

  // Term is the Raft term at which the client command is committed.
  Term int
}

使用通道是一种设计选择,但不是唯一方式。也可以改用回调。创建ConsensusModule时,调用者将注册一个回调函数,只要有要提交的命令,就会调用该回调函数。

在实现通道上发送条目的功能之前。我们需要先讨论 Raft 服务器如何复制命令并确定命令是否已提交。

Raft日志

在文章中多次提到 Raft 日志,但还没有详细介绍。日志只是应该应用于状态机的线性命令序列;如果有需要,日志应该足以从某个开始状态“重放”状态机。在正常运行期间,所有 Raft 节点的日志都是相同的;当领导者收到新命令时,将其存放在自己的日志中,然后复制到跟随者。跟随者将命令放在日志中,并确认给领导者,领导者将保留已安全复制到群集中大多数服务器的最新日志索引的计数。

Go 实现 Raft 第三篇:命令和日志复制

每个框都是一个日志条目;框顶部的数字是将其添加到日志中的任期。底部是此日志包含的键值命令。每个日志条目都有一个线性索引。框的颜色是任期的另一种表示形式。

如果将此日志应用于空键值存储,则最终结果将具有值x = 4,y = 7。

如果不懂日志储存方式,传送门
在我们的实现中,日志条目由以下形式表示:

type LogEntry struct {
    Command interface{}
    Term int
}

每个ConsensusModule的日志都只是log []LogEntry。用户端通常不在乎任期。任期对Raft的正确性至关重要,在阅读代码时务必牢记。

提交新的命令

新的 Submit 方法,使客户端可以提交新命令:

func (cm *ConsensusModule) Submit(command interface{}) bool {
  cm.mu.Lock()
  defer cm.mu.Unlock()

  cm.dlog("Submit received by %v: %v", cm.state, command)
  if cm.state == Leader {
    cm.log = append(cm.log, LogEntry{Command: command, Term: cm.currentTerm})
    cm.dlog("... log=%v", cm.log)
    return true
  }
  return false
}

很简单,如果此CM是领导者,则将新命令附加到日志中并返回true。否则,将被忽略并返回false。

问:“提交”返回的真实值是否表明客户端已向领导者提交了命令?

答:在极少数情况下,领导者可能会与其他 Raft 服务器分开,而后者在一段时间后会继续选举新的领导者。但是,客户可能仍在与旧的领导者通信。客户端应等待一段合理的时间,以使其提交的命令出现在提交通道上;如果不是,则表示它联系了错误的领导者,应与其他领导者重试。

复制日志条目

我们看到,提交给领导者的新命令被添加到日志的末尾。这个新命令如何到达跟随者?领导者遵循的步骤在Raft论文中进行了精确描述。我们在 leaderSendHeartbeats 中完成实现。

func (cm *ConsensusModule) leaderSendHeartbeats() {
  cm.mu.Lock()
  savedCurrentTerm := cm.currentTerm
  cm.mu.Unlock()

  for _, peerId := range cm.peerIds {
    go func(peerId int) {
      cm.mu.Lock()
      ni := cm.nextIndex[peerId]
      prevLogIndex := ni - 1
      prevLogTerm := -1
      if prevLogIndex >= 0 {
        prevLogTerm = cm.log[prevLogIndex].Term
      }
      entries := cm.log[ni:]

      args := AppendEntriesArgs{
        Term:         savedCurrentTerm,
        LeaderId:     cm.id,
        PrevLogIndex: prevLogIndex,
        PrevLogTerm:  prevLogTerm,
        Entries:      entries,
        LeaderCommit: cm.commitIndex,
      }
      cm.mu.Unlock()
      cm.dlog("sending AppendEntries to %v: ni=%d, args=%+v", peerId, ni, args)
      var reply AppendEntriesReply
      if err := cm.server.Call(peerId, "ConsensusModule.AppendEntries", args, &reply); err == nil {
        cm.mu.Lock()
        defer cm.mu.Unlock()
        if reply.Term > savedCurrentTerm {
          cm.dlog("term out of date in heartbeat reply")
          cm.becomeFollower(reply.Term)
          return
        }

        if cm.state == Leader && savedCurrentTerm == reply.Term {
          if reply.Success {
            cm.nextIndex[peerId] = ni + len(entries)
            cm.matchIndex[peerId] = cm.nextIndex[peerId] - 1
            cm.dlog("AppendEntries reply from %d success: nextIndex := %v, matchIndex := %v", peerId, cm.nextIndex, cm.matchIndex)

            savedCommitIndex := cm.commitIndex
            for i := cm.commitIndex + 1; i < len(cm.log); i++ {
              if cm.log[i].Term == cm.currentTerm {
                matchCount := 1
                for _, peerId := range cm.peerIds {
                  if cm.matchIndex[peerId] >= i {
                    matchCount++
                  }
                }
                if matchCount*2 > len(cm.peerIds)+1 {
                  cm.commitIndex = i
                }
              }
            }
            if cm.commitIndex != savedCommitIndex {
              cm.dlog("leader sets commitIndex := %d", cm.commitIndex)
              cm.newCommitReadyChan <- struct{}{}
            }
          } else {
            cm.nextIndex[peerId] = ni - 1
            cm.dlog("AppendEntries reply from %d !success: nextIndex := %d", peerId, ni-1)
          }
        }
      }
    }(peerId)
  }
}

这比我们在上一部分中所做的要复杂得多,但实际上它仅遵循本文的图2。关于此代码的一些注意事项:

  • 现在已完全填充了AE RPC的字段:有关其含义,请参见本文中的图2。

  • AE响应有一个 success 字段,该字段告诉领导者跟随者是否看到prevLogIndex 和 prevLogTerm 匹配。领导者基于此字段更新此跟随者的nextIndex。

  • commitIndex 根据复制特定日志索引的关注者的数量进行更新。如果索引被多数复制,则 commitIndex 前进到该索引。

与我们之前讨论的用户端交互有关,这部分代码特别重要:

if cm.commitIndex != savedCommitIndex {
  cm.dlog("leader sets commitIndex := %d", cm.commitIndex)
  cm.newCommitReadyChan <- struct{}{}
}

newCommitReadyChan 是CM内部使用的通道,用于指示已准备好将新条目通过提交通道发送到客户端。它由在CM启动时在goroutine中运行的以下方法起作用:

func (cm *ConsensusModule) commitChanSender() {
  for range cm.newCommitReadyChan {
    // Find which entries we have to apply.
    cm.mu.Lock()
    savedTerm := cm.currentTerm
    savedLastApplied := cm.lastApplied
    var entries []LogEntry
    if cm.commitIndex > cm.lastApplied {
      entries = cm.log[cm.lastApplied+1 : cm.commitIndex+1]
      cm.lastApplied = cm.commitIndex
    }
    cm.mu.Unlock()
    cm.dlog("commitChanSender entries=%v, savedLastApplied=%d", entries, savedLastApplied)

    for i, entry := range entries {
      cm.commitChan <- CommitEntry{
        Command: entry.Command,
        Index:   savedLastApplied + i + 1,
        Term:    savedTerm,
      }
    }
  }
  cm.dlog("commitChanSender done")
}

此方法更新 lastApplied 状态变量以确定哪些条目已经发送到客户端,并且仅发送新条目。

更新跟随者的日志

我们已经看到了领导者如何处理新的日志条目。现在介绍跟随者的代码实现。特别是 AppendEntries RPC。

func (cm *ConsensusModule) AppendEntries(args AppendEntriesArgs, reply *AppendEntriesReply) error {
  cm.mu.Lock()
  defer cm.mu.Unlock()
  if cm.state == Dead {
    return nil
  }
  cm.dlog("AppendEntries: %+v", args)

  if args.Term > cm.currentTerm {
    cm.dlog("... term out of date in AppendEntries")
    cm.becomeFollower(args.Term)
  }

  reply.Success = false
  if args.Term == cm.currentTerm {
    if cm.state != Follower {
      cm.becomeFollower(args.Term)
    }
    cm.electionResetEvent = time.Now()

    // Does our log contain an entry at PrevLogIndex whose term matches
    // PrevLogTerm? Note that in the extreme case of PrevLogIndex=-1 this is
    // vacuously true.
    if args.PrevLogIndex == -1 ||
      (args.PrevLogIndex < len(cm.log) && args.PrevLogTerm == cm.log[args.PrevLogIndex].Term) {
      reply.Success = true

      // Find an insertion point - where there's a term mismatch between
      // the existing log starting at PrevLogIndex+1 and the new entries sent
      // in the RPC.
      logInsertIndex := args.PrevLogIndex + 1
      newEntriesIndex := 0

      for {
        if logInsertIndex >= len(cm.log) || newEntriesIndex >= len(args.Entries) {
          break
        }
        if cm.log[logInsertIndex].Term != args.Entries[newEntriesIndex].Term {
          break
        }
        logInsertIndex++
        newEntriesIndex++
      }
      // At the end of this loop:
      // - logInsertIndex points at the end of the log, or an index where the
      //   term mismatches with an entry from the leader
      // - newEntriesIndex points at the end of Entries, or an index where the
      //   term mismatches with the corresponding log entry
      if newEntriesIndex < len(args.Entries) {
        cm.dlog("... inserting entries %v from index %d", args.Entries[newEntriesIndex:], logInsertIndex)
        cm.log = append(cm.log[:logInsertIndex], args.Entries[newEntriesIndex:]...)
        cm.dlog("... log is now: %v", cm.log)
      }

      // Set commit index.
      if args.LeaderCommit > cm.commitIndex {
        cm.commitIndex = intMin(args.LeaderCommit, len(cm.log)-1)
        cm.dlog("... setting commitIndex=%d", cm.commitIndex)
        cm.newCommitReadyChan <- struct{}{}
      }
    }
  }

  reply.Term = cm.currentTerm
  cm.dlog("AppendEntries reply: %+v", *reply)
  return nil
}

选举安全

目前为止,我们已经研究了添加的新代码以支持日志复制。但是,日志也会影响Raft的选举。Raft使用选举程序来防止候选人赢得选举,除非其日志至少与集群中大多数节点的日志一样。

因此,RV包含 lastLogIndex 和 lastLogTerm 字段。当候选人发出RV时,将使用有关其最后一个日志条目的信息填充这些RV。跟随者将这些字段与自己的字段进行比较,并确定候选人是否是最新的才可以被选举

func (cm *ConsensusModule) startElection() {
  cm.state = Candidate
  cm.currentTerm += 1
  savedCurrentTerm := cm.currentTerm
  cm.electionResetEvent = time.Now()
  cm.votedFor = cm.id
  cm.dlog("becomes Candidate (currentTerm=%d); log=%v", savedCurrentTerm, cm.log)

  var votesReceived int32 = 1

  // Send RequestVote RPCs to all other servers concurrently.
  for _, peerId := range cm.peerIds {
    go func(peerId int) {
      cm.mu.Lock()
      savedLastLogIndex, savedLastLogTerm := cm.lastLogIndexAndTerm()
      cm.mu.Unlock()

      args := RequestVoteArgs{
        Term:         savedCurrentTerm,
        CandidateId:  cm.id,
        LastLogIndex: savedLastLogIndex,
        LastLogTerm:  savedLastLogTerm,
      }

      cm.dlog("sending RequestVote to %d: %+v", peerId, args)
      var reply RequestVoteReply
      if err := cm.server.Call(peerId, "ConsensusModule.RequestVote", args, &reply); err == nil {
        cm.mu.Lock()
        defer cm.mu.Unlock()
        cm.dlog("received RequestVoteReply %+v", reply)

        if cm.state != Candidate {
          cm.dlog("while waiting for reply, state = %v", cm.state)
          return
        }

        if reply.Term > savedCurrentTerm {
          cm.dlog("term out of date in RequestVoteReply")
          cm.becomeFollower(reply.Term)
          return
        } else if reply.Term == savedCurrentTerm {
          if reply.VoteGranted {
            votes := int(atomic.AddInt32(&votesReceived, 1))
            if votes*2 > len(cm.peerIds)+1 {
              // Won the election!
              cm.dlog("wins election with %d votes", votes)
              cm.startLeader()
              return
            }
          }
        }
      }
    }(peerId)
  }

  // Run another election timer, in case this election is not successful.
  go cm.runElectionTimer()
}

lastLogIndexAndTerm是一个新的帮助器方法:

// lastLogIndexAndTerm returns the last log index and the last log entry's term
// (or -1 if there's no log) for this server.
// Expects cm.mu to be locked.
func (cm *ConsensusModule) lastLogIndexAndTerm() (int, int) {
  if len(cm.log) > 0 {
    lastIndex := len(cm.log) - 1
    return lastIndex, cm.log[lastIndex].Term
  } else {
    return -1, -1
  }
}

我们的实现是基于0的索引,而不是基于1的Raft索引。因此-1经常作为一个标记值。

这是一个更新的 RV 处理程序,实现选举安全检查:

func (cm *ConsensusModule) RequestVote(args RequestVoteArgs, reply *RequestVoteReply) error {
  cm.mu.Lock()
  defer cm.mu.Unlock()
  if cm.state == Dead {
    return nil
  }
  lastLogIndex, lastLogTerm := cm.lastLogIndexAndTerm()
  cm.dlog("RequestVote: %+v [currentTerm=%d, votedFor=%d, log index/term=(%d, %d)]", args, cm.currentTerm, cm.votedFor, lastLogIndex, lastLogTerm)

  if args.Term > cm.currentTerm {
    cm.dlog("... term out of date in RequestVote")
    cm.becomeFollower(args.Term)
  }

  if cm.currentTerm == args.Term &&
    (cm.votedFor == -1 || cm.votedFor == args.CandidateId) &&
    (args.LastLogTerm > lastLogTerm ||
      (args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastLogIndex)) {
    reply.VoteGranted = true
    cm.votedFor = args.CandidateId
    cm.electionResetEvent = time.Now()
  } else {
    reply.VoteGranted = false
  }
  reply.Term = cm.currentTerm
  cm.dlog("... RequestVote reply: %+v", reply)
  return nil
}

我们的实现是基于0的索引,而不是基于1的Raft索引。因此-1经常作为一个标记值。

这是一个更新的RV处理程序,实现选举安全检查:

func (cm *ConsensusModule) RequestVote(args RequestVoteArgs, reply *RequestVoteReply) error {
  cm.mu.Lock()
  defer cm.mu.Unlock()
  if cm.state == Dead {
    return nil
  }
  lastLogIndex, lastLogTerm := cm.lastLogIndexAndTerm()
  cm.dlog("RequestVote: %+v [currentTerm=%d, votedFor=%d, log index/term=(%d, %d)]", args, cm.currentTerm, cm.votedFor, lastLogIndex, lastLogTerm)

  if args.Term > cm.currentTerm {
    cm.dlog("... term out of date in RequestVote")
    cm.becomeFollower(args.Term)
  }

  if cm.currentTerm == args.Term &&
    (cm.votedFor == -1 || cm.votedFor == args.CandidateId) &&
    (args.LastLogTerm > lastLogTerm ||
      (args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastLogIndex)) {
    reply.VoteGranted = true
    cm.votedFor = args.CandidateId
    cm.electionResetEvent = time.Now()
  } else {
    reply.VoteGranted = false
  }
  reply.Term = cm.currentTerm
  cm.dlog("... RequestVote reply: %+v", reply)
  return nil
}

参考

在目前的Raft实现中,有一个问题是没有进行持久化操作。如果服务器故障重启,将会造成信息丢失。为此,我们将在下一部分增加持久化操作,以及对本篇中部分功能进行优化。敬请关注!

Raft参考:https://raft.github.io/raft.pdf

代码参考:https://github.com/eliben/raft

转载自 平台开发 360云计算

本作品采用《CC 协议》,转载必须注明作者和本文链接
快乐就是解决一个又一个的问题!
CrazyZard
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!