Go 实现 Raft 第四篇:持久化和调优

持久化

像 Raft 这样的共识算法的目标是通过在隔离的服务器之间复制任务来创建一个比其各个部分具有更高可用性的系统。到目前为止,我们一直专注于网络分区的故障情况,其中群集中的某些服务器与其他服务器(或与客户端)断开连接。 失败的另一种模式是崩溃,其中服务器停止工作并重新启动。

对于其他服务器,它看起来像一个网络分区-服务器暂时断开连接,而对于崩溃的服务器本身,情况则大不相同,因为重新启动后,其所有内存状态都会丢失。

正是由于这个原因,Raft 论文中的图2清楚地标记了哪个状态应该保持不变;持久状态将在每次更新时写入并刷新到持久化存储中。在服务器发出下一个 RPC 或答复正在进行的 RPC 之前,服务器必须保留的任何状态都将保留。

Raft 只能通过保留其状态的子集来实现,即:

  • currentTerm - 此服务器观察到的最新任期

  • votedFor - 此服务器在最新任期为其投票的节点 ID

  • log - Raft 日志条目

命令传递语义

在 Raft 中,视不同情况,一个命令可以多次传递给客户端。有几种可能发生这种情况的场景,包括崩溃导致重新启动(再次重播日志时)。

就消息传递语义而言,Raft 选择的是”至少一次”。提交命令后,它将最终复制到所有客户端,但是某些客户端可能多次看到同一命令。因此,建议命令带有唯一的 ID,并且客户端应忽略已交付的命令。这在 Raft 论文中的第8节有更详细的描述。

存储接口

为了实现持久性,我们在代码中添加了以下接口:

type Storage interface {
  Set(key string, value []byte)

  Get(key string) ([]byte, bool)

  // HasData returns true iff any Sets were made on this Storage.
  HasData() bool
}

可以将它看作是一个映射,从字符串映射到一个由持久存储支持的通用字节切片。

恢复和保存状态

CM 构造函数现在将接受一个 Storage 作为参数并调用:

if cm.storage.HasData() {
cm.restoreFromStorage(cm.storage)
}

restoreFromStorage 方法也是新增。它从存储中加载持久状态变量,使用标准的 encoding/gob 包对它们进行反序列化

func (cm *ConsensusModule) restoreFromStorage(storage Storage) {
  if termData, found := cm.storage.Get("currentTerm"); found {
    d := gob.NewDecoder(bytes.NewBuffer(termData))
    if err := d.Decode(&cm.currentTerm); err != nil {
      log.Fatal(err)
    }
  } else {
    log.Fatal("currentTerm not found in storage")
  }
  if votedData, found := cm.storage.Get("votedFor"); found {
    d := gob.NewDecoder(bytes.NewBuffer(votedData))
    if err := d.Decode(&cm.votedFor); err != nil {
      log.Fatal(err)
    }
  } else {
    log.Fatal("votedFor not found in storage")
  }
  if logData, found := cm.storage.Get("log"); found {
    d := gob.NewDecoder(bytes.NewBuffer(logData))
    if err := d.Decode(&cm.log); err != nil {
      log.Fatal(err)
    }
  } else {
    log.Fatal("log not found in storage")
  }
}

镜像方法为 persistToStorage - 将所有这些状态变量编码并保存到提供的 Storage 中:

func (cm *ConsensusModule) persistToStorage() {
  var termData bytes.Buffer
  if err := gob.NewEncoder(&termData).Encode(cm.currentTerm); err != nil {
    log.Fatal(err)
  }
  cm.storage.Set("currentTerm", termData.Bytes())

  var votedData bytes.Buffer
  if err := gob.NewEncoder(&votedData).Encode(cm.votedFor); err != nil {
    log.Fatal(err)
  }
  cm.storage.Set("votedFor", votedData.Bytes())

  var logData bytes.Buffer
  if err := gob.NewEncoder(&logData).Encode(cm.log); err != nil {
    log.Fatal(err)
  }
  cm.storage.Set("log", logData.Bytes())
}

我们只需在这些状态变量发生变化的每个点调用 pesistToStorage 来实现持久化。如果看一下第2部分中CM的代码与本部分之间的区别,会发现它们散布在少数地方。

当然,这不是实现持久性的最有效的方法,但是简单有效,所以足以满足我们的需要。效率最低的是保存整个日志,这在实际应用中可能很大。为了真正解决这个问题,Raft 有一个日志压缩机制,该机制在本文的第7节中进行了描述。我们不打算实现压缩,但是可以将其作为练习添加到我们的实现中。

崩溃伸缩

实施持久性后,我们的 Raft 集群在一定程度上可以应对崩溃。只要集群中的少数节点崩溃并在以后的某个时间点重新启动,集群就将对客户端保持可用。具有2N + 1个服务器的 Raft 群集将容忍N台故障服务器,并且只要其他 N + 1 台服务器仍保持相互连接,便会保持可用。

如果查看此部分的测试,会注意到添加了许多新测试。崩溃伸缩可以测试更大范围的人为情况组合,本文中也对此进行了一定程度的描述

不可靠的RPC交付

需要注意的另一个方面是不可靠的 RPC 交付。到目前为止,我们已经假设在连接的服务器之间发送的 RPC 可能到达目的地的时间很短。如果查看 server.go ,会注意到它使用了一种称为 RPCProxy 的类型来实现这些延迟。每个 RPC 都会延迟1-5毫秒,以模拟位于同一数据中心的节点的真实性。

RPCProxy 让我们实现的另一件事是可选的不可靠交付。启用 RAFT_UNRELIABLE_RPC 环境变量后,RPC 有时会明显延迟(延迟75毫秒)或完全中断。模拟了实际的网络故障。

我们可以在 RAFT_UNRELIABLE_RPC 开启的情况下重新运行所有测试,并观察 Raft 群集在出现这些故障时的行为。如果有兴趣,可以尝试调整 RPCProxy,不仅让 RPC 请求延迟,还可以让 RPC 答复延迟

优化发送 AppendEntries

正如在第2部分中简要提到的,当前的领导者执行效率很低。领导者在 LeaderSendHeartbeats 中发送AE,定时器每隔50毫秒调用一次。假设提交了一条新命令;领导者将等到下一个50毫秒的边界,而不是立即通知跟随者。更糟的是,因为需要两次AE往返来通知跟随者命令已提交。如图:

Go实现Raft第四篇:持久化和调优

在时间(1),领导者将心跳 AE 发送给跟随者,并在几毫秒内获得响应。例如,在35毫秒后提交了新命令。领导者一直等到下一个50毫秒边界(2)才将更新的日志发送给跟随者。跟随者答复该命令已成功添加到日志(3)。此时,领导者已经提高了提交索引(假设它获得了多数),可以立即通知跟随者,但是它一直等到下一个50毫秒边界(4)为止。最后,当跟随者收到更新的 leaderCommit时,它可以将新提交的命令通知其自己的客户端。

在领导者的 Submit(X) 和跟随者的 commitChan <-X 之间经过的大部分时间对于实现来讲都是不必要的。

真正想要的是使序列看起来像这样:

Go实现Raft第四篇:持久化和调优

看一下实现的新部分,从startLeader开始。

func (cm *ConsensusModule) startLeader() {
  cm.state = Leader

  for _, peerId := range cm.peerIds {
    cm.nextIndex[peerId] = len(cm.log)
    cm.matchIndex[peerId] = -1
  }
  cm.dlog("becomes Leader; term=%d, nextIndex=%v, matchIndex=%v; log=%v", cm.currentTerm, cm.nextIndex, cm.matchIndex, cm.log)

  // This goroutine runs in the background and sends AEs to peers:
  // * Whenever something is sent on triggerAEChan
  // * ... Or every 50 ms, if no events occur on triggerAEChan
  go func(heartbeatTimeout time.Duration) {
    // Immediately send AEs to peers.
    cm.leaderSendAEs()

    t := time.NewTimer(heartbeatTimeout)
    defer t.Stop()
    for {
      doSend := false
      select {
      case <-t.C:
        doSend = true

        // Reset timer to fire again after heartbeatTimeout.
        t.Stop()
        t.Reset(heartbeatTimeout)
      case _, ok := <-cm.triggerAEChan:
        if ok {
          doSend = true
        } else {
          return
        }

        // Reset timer for heartbeatTimeout.
        if !t.Stop() {
          <-t.C
        }
        t.Reset(heartbeatTimeout)
      }

      if doSend {
        cm.mu.Lock()
        if cm.state != Leader {
          cm.mu.Unlock()
          return
        }
        cm.mu.Unlock()
        cm.leaderSendAEs()
      }
    }
  }(50 * time.Millisecond)
}

不仅要等待50 ms的计时,startLeader中的循环还要等待两个可能的事件之一:

  • 在cm.triggerAEChan上发送
  • 计时器计数50毫秒

我们将很快看到触发 cm.triggerAEChan 的原因。这是现在应该发送AE的信号。每当触发通道时,计时器都会重置,并执行心跳逻辑-如果领导者没有新的要报告的内容,则最多等待50毫秒。

还要注意,实际发送AE的方法已从 leaderSendHeartbeats 重命名为 leaderSendAE,可以更好地在新代码中反映其目的。

我们所期望的,触发cm.triggerAEChan的方法之一是SubmitL:

func (cm *ConsensusModule) Submit(command interface{}) bool {
  cm.mu.Lock()
  cm.dlog("Submit received by %v: %v", cm.state, command)
  if cm.state == Leader {
    cm.log = append(cm.log, LogEntry{Command: command, Term: cm.currentTerm})
    cm.persistToStorage()
    cm.dlog("... log=%v", cm.log)
    cm.mu.Unlock()
    cm.triggerAEChan <- struct{}{}
    return true
  }

  cm.mu.Unlock()
  return false
}

修改成:

每当提交新命令时,都会调用cm.persistToStorage来保留新的日志条目。

一个空结构在 cm.triggerAEChan 上发送。将通知领导者goroutine中的循环。

锁定处理将重新排序;在发送cm.triggerAEChan时不想保持锁定,因为在某些情况下可能导致死锁。
在领导者中处理AE答复并推进提交索引的代码中 cm.triggerAEChan 将被通知。

if cm.commitIndex != savedCommitIndex {
    cm.dlog("leader sets commitIndex := %d", cm.commitIndex)
    // Commit index changed: the leader considers new entries to be
    // committed. Send new entries on the commit channel to this
    // leader's clients, and notify followers by sending them AEs.
    cm.newCommitReadyChan <- struct{}{}
    cm.triggerAEChan <- struct{}{}
}

这个优化很重要,它使实现比以前对新命令的响应速度更快。

批量处理命令提交

现在,每次调用 Submit 都会触发很多活动 - 领导者立即向所有跟随者广播 RPC。如果想一次提交多个命令,连接Raft群集的网络可能会被RPC 淹没。

尽管它看起来效率低,但是安全。Raft的 RPC 都是幂等的,也就是说多次获得具有基本相同信息的 RPC 不会造成任何危害。

如果担心一次要频繁提交许多命令时的网络流量,那么批处理应该很容易实现。最简单的方法是提供一种将整个命令片段传递到 Submit 的方法。这样 Raft 实现中的代码改动会很小,并且客户端将能够提交整个命令组,而不会产生太多的 RPC 通信。有兴趣的可以尝试一下!

摘抄: 360云计算

本作品采用《CC 协议》,转载必须注明作者和本文链接
快乐就是解决一个又一个的问题!
CrazyZard
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!