Part 2A: leader election

头图来源:夕暮れの坂道-しゅろく/shurock-pixiv


Raft简介

Raft是一种分布式共识算法,算法中每个节点均有三个状态:leader、follower和candidate。

  • leader:leader用来处理所有来自于客户端的请求,客户端并不用关心与之通信的节点是不是leader,只需要向集群中发送请求,如果是follower接收到该请求则会将这个请求转发给leader处理。当leader挂了以后,将通过新一轮选举产生新的leader。
  • follower:follower是被动的,它们不处理来自客户端的请求,仅对来自leader和candidate的请求作响应。
  • candidate:follower在隔一段时间后(electionTimeou)未收到leader发送的包,则认为leader已经挂掉了,于是改follower将状态转变为candidate并将任期加一同时发起新一轮的选举。

该算法可两大部分:leader election和log replication。其中leader election就是本次所要实现的部分。

作业代码(部分)

Raft结构

const (
    leader State = iota
    follower
    candidate
)

//
// A Go object implementing a single Raft peer.
//
type Raft struct {
    mu        sync.Mutex          // Lock to protect shared access to this peer's state
    peers     []*labrpc.ClientEnd // RPC end points of all peers
    persister *Persister          // Object to hold this peer's persisted state
    me        int                 // this peer's index into peers[]
    dead      int32               // set by Kill()

    // Your data here (2A, 2B, 2C).
    // Look at the paper's Figure 2 for a description of what
    // state a Raft server must maintain.
    currentTerm int

    votedFor int

    state State 

    electionTimeout *time.Timer 

    heartbeatTimeout *time.Timer 
}

在A部分中需要有以下属性:

  • currentTerm:表示当前任期,从0开始计算。每个任期有且仅有一个leader
  • votedFor:表示节点给谁投票,-1表示还未投票
  • state:当前节点状态
  • electionTimeout:节点在electionTimout这段时间里未收到任何来自leader的包则开始新一轮的选举
  • heartbeatTimeout:leader节点每隔heartbeatTimeout时间发送一次心跳包
    electionTimeout是个随机值,以确保不会有大量节点同时成为candidate。范围一般为150~300ms,且electionTimeout>> broadcast time(广播时间),防止选举过程还未结束其他节点就成为candidate的情况

请求投票

func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
    // Your code here (2A, 2B).
    rf.mu.Lock()
    defer rf.mu.Unlock()
    if args.Term < rf.currentTerm||(args.Term==rf.currentTerm&&rf.votedFor!=-1&&rf.votedFor!=args.PeerId) {
        if rf.votedFor!=-1{
            //fmt.Printf("Node %d reject voting for Node %d (has voted for Node %d)\n",rf.me,args.PeerId,rf.votedFor)
        }
        reply.VoteForMe = false
        reply.Term=rf.currentTerm
        return
    }
    //投票
    if args.Term>rf.currentTerm{
        rf.state=follower
        rf.currentTerm=args.Term
        rf.votedFor=args.PeerId
    }
    rf.electionTimeout.Reset(randElectionTimeout())
    reply.Term=args.Term
    reply.VoteForMe=true
}

请求投票RPC的实现,没什么特别的,就是要注意一些条件的判断(任期、是否投过票等),以及在投过票之后需要重置electionTimeout。

触发选举

func (rf *Raft) ticker() {
    for rf.killed() == false {

        // Your code here to check if a leader election should
        // be started and to randomize sleeping time using
        // time.Sleep().
        select {
        case <-rf.electionTimeout.C:
            rf.mu.Lock()
            if rf.state!=leader{
                rf.startElection()
            }
            rf.mu.Unlock()
        }
        time.Sleep(10*time.Millisecond)
    }
}

当electionTimeout结束后,follower将自身任期加1并转化状态为candidate,发起新的一轮选举。

选举过程

func (rf *Raft) startElection() {
    voteCnt := 1 //候选者会先投票给自己
    rf.votedFor = rf.me
    rf.persist()         //将当前状态持久化
    rf.state = candidate //修改状态为候选者
    rf.currentTerm += 1
    rf.electionTimeout.Reset(randElectionTimeout())
    req := rf.newVoteRequest()
    for peerIdx,_:=range rf.peers{
        if peerIdx==rf.me{
            continue
        }
        go func(peerIdx int){
            resp:=new(RequestVoteReply)
            
            if rf.sendRequestVote(peerIdx,req,resp){
                rf.mu.Lock()
                defer rf.mu.Unlock()
                //fmt.Printf("{Node %d} Response received from Node %d (Term:%d)\n",rf.me,peerIdx,rf.currentTerm)
                //再次判断term和state
                if rf.currentTerm==req.Term&&rf.state==candidate{
                    if resp.VoteForMe{
                        voteCnt++
                        if voteCnt>len(rf.peers)/2{
                            //fmt.Printf("{Node %d} Receives votes from a majority of the servers and become a leader (Term:%d)\n",rf.me,rf.currentTerm)
                            rf.state=leader
                            rf.broadCast()
                                                        rf.heartbeatTimeout.Reset(getHeartbeatTimeout())
                        }
                    }else if resp.Term>rf.currentTerm{
                        //返回的信息中Term大于请求时的Term,说明有新的leader已经产生
                        //fmt.Printf("{Node %d} Find a new leader {Node %d} (Term:%d)\n",rf.me,peerIdx,resp.Term)
                        rf.covertTofollower(resp.Term)
                        rf.electionTimeout.Reset(randElectionTimeout())
                    }
                }
            }
        }(peerIdx)
    }
}

candidate向系统中每个节点发送选举请求,其中判断条件rf.currentTerm==req.Term的用意就是如果选举过程中candidate的electionTimeout再次耗尽又发起一次选举(虽然是很少发生但确实会存在这种情况),此时旧term发起的选举就应作废转而执行最新的一轮选举。

结果测试

A部分总共有三个测试点

1.初始化选举

该测试点测试初始化后是否会选举出一个leader,并对term进行检查。同时在网络不出现崩溃的情况下经过几轮选举时间(election timeout)后term是否会发生变化。要完美通过该测试点,就要保证在系统初始化后选举出一个leader,且term应该大于等于1,同时如果网络没出现崩溃leader应每隔一段时间(heartbeatTimeout)发送心跳包以确保没有新的选举产生。
warning
警告提示网络没崩溃但是又有选举产生了(任期发生了变化)
要解决这个问题需要每隔heartbeatTimeout发送一次心跳包,A部分仅需发送一个不带任何信息的AppendEntry即可

type AppendEntriesRequest struct{
    Term int
    Leader int
}

type AppendEntriesResponse struct{
    Term int
    Success bool
}

func(rf *Raft)NewHeartbeat()*AppendEntriesRequest{
    return &AppendEntriesRequest{
        Term: rf.currentTerm,
        Leader: rf.me,
    }
}

func(rf *Raft)AppendEntries(args *AppendEntriesRequest,reply *AppendEntriesResponse){
    rf.mu.Lock()
    defer rf.mu.Unlock()
    if rf.currentTerm>args.Term{
        reply.Term=rf.currentTerm
        reply.Success=false
        return
    }else if rf.currentTerm<args.Term{
        rf.coverTofollower(args.Term)
    }
    rf.electionTimeout.Reset(randElectionTimeout())
    reply.Term=rf.currentTerm
    reply.Success=true
}

func(rf *Raft)broadCast(){
    for peerIdx:=range rf.peers{
        if peerIdx==rf.me{
            continue
        }
        go func(peerIdx int){
            req:=rf.NewHeartbeat()
            resp:=new(AppendEntriesResponse)
            ok:=rf.sendAppendEntries(peerIdx,req,resp)
            rf.mu.Lock()
            defer rf.mu.Unlock()
            if rf.state!=leader{
                return
            }
            if ok&&!resp.Success&&rf.currentTerm<resp.Term{
                rf.covertTofollower(resp.Term)
                rf.electionTimeout.Reset(randElectionTimeout())
            }
        }(peerIdx)
    }
}

然后在前面的ticker()中添加如下代码

        case <-rf.heartbeatTimeout.C:
            rf.mu.Lock()
            if rf.state == leader {
                rf.broadCast()
                rf.heartbeatTimeout.Reset(getHeartbeatTimeout())
            }
            rf.mu.Unlock()

测试结果

  • checkOneLeader()会遍历地检查每个term中在线的节点,如果有个term中出现leader数量大于1的情况或在执行过程均没有leader产生则测试不通过;
  • checkTerms()则检查每个节点的term,如果出现节点term不一致的情况则测试不通过

2.再选举

该测试点会在选举出一名leader之后将leader从系统节点中强制下线,之后又重新上线,这种情况下系统应保证leader下线后重新选举出一名新的leader,每轮term有且仅有一名leader。

3.多次选举

该测试点循环十次,其中每一次循环会随机无差别地下线三个节点(被下线的节点可能是follower,或者leader),下线节点后调用chackOneLeader()检查,之后再上线之前被踢下线的三个节点再调用checkOneLeader()检查。

注意

在leader election阶段有几个要点值得注意:

  • 关于Reset electionTimeout的时机:1.当节点收到来自当前leader的AppendEntries请求时(如果发送AppendEntries的节点已经不是当前term的leader时则不执行Reset);2.当节点发起election时(当electionTimeout耗尽时不管一个candidtate发起的election是否结束都需要再次重新再发起一次election,这样是为了防止RPC因网络延迟或丢失导致选举失败系统从而整个因此停滞);3.当一个节点给其他节点投票时。
  • Term在Raft中有绝对的话语权,当一个RPC的请求或返回参数中的Term大于节点的term时,节点将currentTerm设为更大的Term,同时将状态转换为follower。
    多并发编程的测试不能过了就算过了,有时候可能是侥幸,要多测试几次。测试时加上-race查看竞争情况并进行调试。

参考

相关文章

Last modification:June 24th, 2022 at 12:44 pm
If you think my article is useful to you, please feel free to appreciate