Implement Leader Election Algorithm With Go

前言:刚刚做完6.824 Lab2A,写篇文章整理一下。需要先看一下论文5.2小节,英文不太好理解,可以看这篇翻译,如果看了中文翻译还没理解,可以看这个视频,如果看了视频还没理解,也可以直接看下面的代码,有时候直接看代码反而更容易理解。

除了论文之外,还需要了解一定的Go语言,最起码也要有其他语言的基础。再一个要知道什么是RPC?

本文脱离了MIT 6.824这门课程的背景,也脱离了Raft算法的背景,只讲了Leader Election算法的实现,如果你不明白Leader Election可以用来做什么,那么需要了解容灾、主备相关的知识。

本文的代码只列出了主干,一些未定义的函数或数据成员,你可以通过名字或注释来理解它的含义。

本文的代码并未体现并发同步,一个简单的实现是对于每个函数使用一个大粒度的锁,你也可以选用其余的同步机制来实现性能更好并发同步。

// Current Node
type Server struct {
    // Leader | Candidate | Follower
    role    string
    // Logic clock
    term    int
    // Under whose leadership
    support int
    // All Nodes include current
    peers   []*rpcClient
    // Current peer's ID
    me      int
}

// Server's main loop
func (srv *Server) ticker() {}

// RPC Handler | Follower or Candidate get votes from other nodes
func (srv *Server) Canvass(args *cArgs, reply *cReply) {}

// RPC Handler | Leader declare sovereignty
func (srv *Server) Dominate(args *dArgs, reply *dReply) {}

func main() {
    srv := MakeServer("Follower")
    go srv.ticker()
}

上面的代码展示了分布式系统中某个节点的实现,对于一个节点而言,任意时刻有且只有一个身份(Leader、Candiate、Follower),其中Candidate是Follower到Leader的一个中间身份。

初始所有节点都是Follower,当超过一段时间(Election Timeout)感受不到Leader的统治后,Follower会转换为Candidate,并向其余Follower拉票,当票数足够时(半数以上),Candidate会转换为Leader。

Election Timeout应当随机产生,避免多个Candidate同时出现,这样每个Candidate可能都无法获得足够的选票,需要新一轮的选举。

func (srv *Server) ticker() {
    for srv.alive() {
        time.Sleep(1 * time.Millisecond)
        if srv.role == "Leader" {
            // 10 times per second
            if time.Since(srv.lastBroadcastHeartbeat) > srv.broadcastInterval() {
                srv.broadcast()
            }
        } else {
            // random election timeout about 300 ~ 400ms
            if time.Since(srv.lastReceiveHeartbeat) > srv.electionTimeout() {
                srv.elect()
            }
        }
    }
}

func (srv *Server) broadcast() {
    srv.lastBroadcastHeartbeat = time.Now()
    for id, peer := range srv.peers {
        if id == srv.me {
            continue
        }
        reply := dReply{}
        peer.Call("Server.Dominate", &dArgs{}, &reply)
    }
}

func (srv *Server) Dominate(args *dArgs, reply *dReply) {
    srv.lastReceiveHeartbeat = time.Now()
}

func (srv *Server) elect() {
    // reset avoid elect again
    srv.lastReceiveHeartbeat = time.Now()
    srv.role = "Candidate"

    voteCount := 1    // vote for me
    for id, peer := range srv.peers {
        if id == srv.me {
            continue
        }
        reply := cReply{}
        peer.Call("Server.Canvass", &cArgs{CandidateId: srv.me}, &reply)
        if reply.VoteGranted {
            voteCount++
        }
    }

    if voteCount > len(peers)/2 {
        srv.role = "Leader"
        srv.lastBroadcastHeartbeat = time.Unix(0, 0)
    }
}

func (srv *Server) Canvass(args *cArgs, reply *cReply) {
    reply.VoteGranted = false
    // only have one vote
    if srv.support == -1 {
        srv.support = args.CandiateId
        reply.VoteGranted = true
        srv.lastReceiveHeartbeat = time.Now()
    }
}

你应该注意到Server.term并未被使用,我有意的移除了和任期相关的代码,目的是为了更好的理解Leader Election算法的基本框架。

对于整个系统来说,Leader应当有且只有一个,但也有例外,当系统内产生了网络隔离,每个分区会自成系统,拥有半数节点以上的分区会产生新的Leader,而旧的Leader会在自己的分区内一直存在,当网络隔离消失,就会出现两个Leader同时出现的情况。

任期解决了这一问题。新Leader的任期会比旧Leader的任期大,这样两个Leader相遇后,任期更小的Leader转换为Follower即可解决。

任期即逻辑时钟,这里使用一个递增整型值表示。任期的递增只发生在Leader死亡后,所有候选者拿到新的任期(任期产生),并在上任后将新的任期同步给所有节点(任期生效)。

在任期产生,即选举阶段内,每个节点不论角色,只能拥护(support)一个Candidate,任期生效后,即Leader上任后,support全部置为-1。

func (srv *Server) broadcast() {
    srv.lastBroadcastHeartbeat = time.Now()
    for id, peer := range srv.peers {
        if id == srv.me {
            continue
        }
        reply := dReply{}
        peer.Call("Server.Dominate", &dArgs{Term: srv.term}, &reply)
        if reply.Term > srv.term {
            // A new Leader appeared, update self state
            srv.role = "Follower"
            srv.term = reply.Term
            srv.support = -1
        }
    }
}

func (srv *Server) Dominate(args *dArgs, reply *dReply) {
    reply.Term = srv.term
    if args.Term < srv.term {
        return    // ignore old Leader
    }
    // A new Leader appeared, update self state
    if args.Term > srv.term {
        srv.term = args.Term
        srv.role = "Follower"
        srv.support = -1
    }
    srv.lastReceiveHeartbeat = time.Now()
}

func (srv *Server) elect() {
    // Reset avoid elect again
    srv.lastReceiveHeartbeat = time.Now()
    srv.role = "Candidate"
    // Update term when old leader dead
    srv.term++
    // vote for self
    srv.support = srv.me
    voteCount := 1

    maxTerm := 0
    for id, peer := range srv.peers {
        if id == srv.me {
            continue
        }
        reply := cReply{}
        peer.Call("Server.Canvass", &cArgs{Term: srv.term, CandidateId: srv.me}, &reply)
        if reply.VoteGranted {
            voteCount++
        }
        if reply.Term > maxTerm {
            maxTerm = reply.Term
        }
    }

    // The role may became Follower during canvass
    // That means another Leader appeared
    if srv.role != "Candidate" {
        return
    }
    // A new Leader appeared, update self state
    if maxTerm > srv.term {
        srv.role = "Follower"
        srv.term = maxTerm
        srv.support = -1
        return
    }
    if voteCount > len(peers)/2 {
        srv.role = "Leader"
        srv.support = -1
        srv.lastBroadcastHeartbeat = time.Unix(0, 0)
    }
}

func (srv *Server) Canvass(args *cArgs, reply *cReply) {
    reply.VoteGranted = false
    reply.Term = srv.term

    if args.Term < srv.term {
        return    // ignore old Candidate
    }
    // A new Leader appeared, update self state
    if args.Term > srv.term {
        srv.term = args.Term
        srv.role = "Follower"
        srv.support = -1
    }
    // only have one vote
    if srv.support == -1 {
        srv.support = args.CandiateId
        reply.VoteGranted = true
        srv.lastReceiveHeartbeat = time.Now()
    }
}

Term是Leader Election中的核心概念,Term、Role总是同时出现,算法整体的运行正是依赖着Term、Role这两个状态的转换,希望这篇文章能够对你有所帮助。

本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!