Implement Leader Election Algorithm With Go
前言:刚刚做完6.824 Lab2A,写篇文章整理一下。需要先看一下论文5.2小节,英文不太好理解,可以看这篇翻译,如果看了中文翻译还没理解,可以看这个视频,如果看了视频还没理解,也可以直接看下面的代码,有时候直接看代码反而更容易理解。
除了论文之外,还需要了解一定的Go语言,最起码也要有其他语言的基础。再一个要知道什么是RPC?
本文脱离了MIT 6.824这门课程的背景,也脱离了Raft算法的背景,只讲了Leader Election算法的实现,如果你不明白Leader Election可以用来做什么,那么需要了解容灾、主备相关的知识。
本文的代码只列出了主干,一些未定义的函数或数据成员,你可以通过名字或注释来理解它的含义。
本文的代码并未体现并发同步,一个简单的实现是对于每个函数使用一个大粒度的锁,你也可以选用其余的同步机制来实现性能更好并发同步。
// Current Node
type Server struct {
// Leader | Candidate | Follower
role string
// Logic clock
term int
// Under whose leadership
support int
// All Nodes include current
peers []*rpcClient
// Current peer's ID
me int
}
// Server's main loop
func (srv *Server) ticker() {}
// RPC Handler | Follower or Candidate get votes from other nodes
func (srv *Server) Canvass(args *cArgs, reply *cReply) {}
// RPC Handler | Leader declare sovereignty
func (srv *Server) Dominate(args *dArgs, reply *dReply) {}
func main() {
srv := MakeServer("Follower")
go srv.ticker()
}
上面的代码展示了分布式系统中某个节点的实现,对于一个节点而言,任意时刻有且只有一个身份(Leader、Candiate、Follower),其中Candidate是Follower到Leader的一个中间身份。
初始所有节点都是Follower,当超过一段时间(Election Timeout)感受不到Leader的统治后,Follower会转换为Candidate,并向其余Follower拉票,当票数足够时(半数以上),Candidate会转换为Leader。
Election Timeout应当随机产生,避免多个Candidate同时出现,这样每个Candidate可能都无法获得足够的选票,需要新一轮的选举。
func (srv *Server) ticker() {
for srv.alive() {
time.Sleep(1 * time.Millisecond)
if srv.role == "Leader" {
// 10 times per second
if time.Since(srv.lastBroadcastHeartbeat) > srv.broadcastInterval() {
srv.broadcast()
}
} else {
// random election timeout about 300 ~ 400ms
if time.Since(srv.lastReceiveHeartbeat) > srv.electionTimeout() {
srv.elect()
}
}
}
}
func (srv *Server) broadcast() {
srv.lastBroadcastHeartbeat = time.Now()
for id, peer := range srv.peers {
if id == srv.me {
continue
}
reply := dReply{}
peer.Call("Server.Dominate", &dArgs{}, &reply)
}
}
func (srv *Server) Dominate(args *dArgs, reply *dReply) {
srv.lastReceiveHeartbeat = time.Now()
}
func (srv *Server) elect() {
// reset avoid elect again
srv.lastReceiveHeartbeat = time.Now()
srv.role = "Candidate"
voteCount := 1 // vote for me
for id, peer := range srv.peers {
if id == srv.me {
continue
}
reply := cReply{}
peer.Call("Server.Canvass", &cArgs{CandidateId: srv.me}, &reply)
if reply.VoteGranted {
voteCount++
}
}
if voteCount > len(peers)/2 {
srv.role = "Leader"
srv.lastBroadcastHeartbeat = time.Unix(0, 0)
}
}
func (srv *Server) Canvass(args *cArgs, reply *cReply) {
reply.VoteGranted = false
// only have one vote
if srv.support == -1 {
srv.support = args.CandiateId
reply.VoteGranted = true
srv.lastReceiveHeartbeat = time.Now()
}
}
你应该注意到Server.term
并未被使用,我有意的移除了和任期相关的代码,目的是为了更好的理解Leader Election算法的基本框架。
对于整个系统来说,Leader应当有且只有一个,但也有例外,当系统内产生了网络隔离,每个分区会自成系统,拥有半数节点以上的分区会产生新的Leader,而旧的Leader会在自己的分区内一直存在,当网络隔离消失,就会出现两个Leader同时出现的情况。
任期解决了这一问题。新Leader的任期会比旧Leader的任期大,这样两个Leader相遇后,任期更小的Leader转换为Follower即可解决。
任期即逻辑时钟,这里使用一个递增整型值表示。任期的递增只发生在Leader死亡后,所有候选者拿到新的任期(任期产生),并在上任后将新的任期同步给所有节点(任期生效)。
在任期产生,即选举阶段内,每个节点不论角色,只能拥护(support)一个Candidate,任期生效后,即Leader上任后,support全部置为-1。
func (srv *Server) broadcast() {
srv.lastBroadcastHeartbeat = time.Now()
for id, peer := range srv.peers {
if id == srv.me {
continue
}
reply := dReply{}
peer.Call("Server.Dominate", &dArgs{Term: srv.term}, &reply)
if reply.Term > srv.term {
// A new Leader appeared, update self state
srv.role = "Follower"
srv.term = reply.Term
srv.support = -1
}
}
}
func (srv *Server) Dominate(args *dArgs, reply *dReply) {
reply.Term = srv.term
if args.Term < srv.term {
return // ignore old Leader
}
// A new Leader appeared, update self state
if args.Term > srv.term {
srv.term = args.Term
srv.role = "Follower"
srv.support = -1
}
srv.lastReceiveHeartbeat = time.Now()
}
func (srv *Server) elect() {
// Reset avoid elect again
srv.lastReceiveHeartbeat = time.Now()
srv.role = "Candidate"
// Update term when old leader dead
srv.term++
// vote for self
srv.support = srv.me
voteCount := 1
maxTerm := 0
for id, peer := range srv.peers {
if id == srv.me {
continue
}
reply := cReply{}
peer.Call("Server.Canvass", &cArgs{Term: srv.term, CandidateId: srv.me}, &reply)
if reply.VoteGranted {
voteCount++
}
if reply.Term > maxTerm {
maxTerm = reply.Term
}
}
// The role may became Follower during canvass
// That means another Leader appeared
if srv.role != "Candidate" {
return
}
// A new Leader appeared, update self state
if maxTerm > srv.term {
srv.role = "Follower"
srv.term = maxTerm
srv.support = -1
return
}
if voteCount > len(peers)/2 {
srv.role = "Leader"
srv.support = -1
srv.lastBroadcastHeartbeat = time.Unix(0, 0)
}
}
func (srv *Server) Canvass(args *cArgs, reply *cReply) {
reply.VoteGranted = false
reply.Term = srv.term
if args.Term < srv.term {
return // ignore old Candidate
}
// A new Leader appeared, update self state
if args.Term > srv.term {
srv.term = args.Term
srv.role = "Follower"
srv.support = -1
}
// only have one vote
if srv.support == -1 {
srv.support = args.CandiateId
reply.VoteGranted = true
srv.lastReceiveHeartbeat = time.Now()
}
}
Term是Leader Election中的核心概念,Term、Role总是同时出现,算法整体的运行正是依赖着Term、Role这两个状态的转换,希望这篇文章能够对你有所帮助。
本作品采用《CC 协议》,转载必须注明作者和本文链接