Part 2A: leader election
头图来源:夕暮れの坂道-しゅろく/shurock-pixiv
Raft简介
Raft是一种分布式共识算法,算法中每个节点均有三个状态:leader、follower和candidate。
- leader:leader用来处理所有来自于客户端的请求,客户端并不用关心与之通信的节点是不是leader,只需要向集群中发送请求,如果是follower接收到该请求则会将这个请求转发给leader处理。当leader挂了以后,将通过新一轮选举产生新的leader。
- follower:follower是被动的,它们不处理来自客户端的请求,仅对来自leader和candidate的请求作响应。
- candidate:follower在隔一段时间后(electionTimeou)未收到leader发送的包,则认为leader已经挂掉了,于是改follower将状态转变为candidate并将任期加一同时发起新一轮的选举。
该算法可两大部分:leader election和log replication。其中leader election就是本次所要实现的部分。
作业代码(部分)
Raft结构
const (
leader State = iota
follower
candidate
)
//
// A Go object implementing a single Raft peer.
//
type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]
dead int32 // set by Kill()
// Your data here (2A, 2B, 2C).
// Look at the paper's Figure 2 for a description of what
// state a Raft server must maintain.
currentTerm int
votedFor int
state State
electionTimeout *time.Timer
heartbeatTimeout *time.Timer
}
在A部分中需要有以下属性:
- currentTerm:表示当前任期,从0开始计算。每个任期有且仅有一个leader
- votedFor:表示节点给谁投票,-1表示还未投票
- state:当前节点状态
- electionTimeout:节点在electionTimout这段时间里未收到任何来自leader的包则开始新一轮的选举
- heartbeatTimeout:leader节点每隔heartbeatTimeout时间发送一次心跳包electionTimeout是个随机值,以确保不会有大量节点同时成为candidate。范围一般为150~300ms,且electionTimeout>> broadcast time(广播时间),防止选举过程还未结束其他节点就成为candidate的情况
请求投票
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
// Your code here (2A, 2B).
rf.mu.Lock()
defer rf.mu.Unlock()
if args.Term < rf.currentTerm||(args.Term==rf.currentTerm&&rf.votedFor!=-1&&rf.votedFor!=args.PeerId) {
if rf.votedFor!=-1{
//fmt.Printf("Node %d reject voting for Node %d (has voted for Node %d)\n",rf.me,args.PeerId,rf.votedFor)
}
reply.VoteForMe = false
reply.Term=rf.currentTerm
return
}
//投票
if args.Term>rf.currentTerm{
rf.state=follower
rf.currentTerm=args.Term
rf.votedFor=args.PeerId
}
rf.electionTimeout.Reset(randElectionTimeout())
reply.Term=args.Term
reply.VoteForMe=true
}
请求投票RPC的实现,没什么特别的,就是要注意一些条件的判断(任期、是否投过票等),以及在投过票之后需要重置electionTimeout。
触发选举
func (rf *Raft) ticker() {
for rf.killed() == false {
// Your code here to check if a leader election should
// be started and to randomize sleeping time using
// time.Sleep().
select {
case <-rf.electionTimeout.C:
rf.mu.Lock()
if rf.state!=leader{
rf.startElection()
}
rf.mu.Unlock()
}
time.Sleep(10*time.Millisecond)
}
}
当electionTimeout结束后,follower将自身任期加1并转化状态为candidate,发起新的一轮选举。
选举过程
func (rf *Raft) startElection() {
voteCnt := 1 //候选者会先投票给自己
rf.votedFor = rf.me
rf.persist() //将当前状态持久化
rf.state = candidate //修改状态为候选者
rf.currentTerm += 1
rf.electionTimeout.Reset(randElectionTimeout())
req := rf.newVoteRequest()
for peerIdx,_:=range rf.peers{
if peerIdx==rf.me{
continue
}
go func(peerIdx int){
resp:=new(RequestVoteReply)
if rf.sendRequestVote(peerIdx,req,resp){
rf.mu.Lock()
defer rf.mu.Unlock()
//fmt.Printf("{Node %d} Response received from Node %d (Term:%d)\n",rf.me,peerIdx,rf.currentTerm)
//再次判断term和state
if rf.currentTerm==req.Term&&rf.state==candidate{
if resp.VoteForMe{
voteCnt++
if voteCnt>len(rf.peers)/2{
//fmt.Printf("{Node %d} Receives votes from a majority of the servers and become a leader (Term:%d)\n",rf.me,rf.currentTerm)
rf.state=leader
rf.broadCast()
rf.heartbeatTimeout.Reset(getHeartbeatTimeout())
}
}else if resp.Term>rf.currentTerm{
//返回的信息中Term大于请求时的Term,说明有新的leader已经产生
//fmt.Printf("{Node %d} Find a new leader {Node %d} (Term:%d)\n",rf.me,peerIdx,resp.Term)
rf.covertTofollower(resp.Term)
rf.electionTimeout.Reset(randElectionTimeout())
}
}
}
}(peerIdx)
}
}
candidate向系统中每个节点发送选举请求,其中判断条件rf.currentTerm==req.Term
的用意就是如果选举过程中candidate的electionTimeout再次耗尽又发起一次选举(虽然是很少发生但确实会存在这种情况),此时旧term发起的选举就应作废转而执行最新的一轮选举。
结果测试
A部分总共有三个测试点
1.初始化选举
该测试点测试初始化后是否会选举出一个leader,并对term进行检查。同时在网络不出现崩溃的情况下经过几轮选举时间(election timeout)后term是否会发生变化。要完美通过该测试点,就要保证在系统初始化后选举出一个leader,且term应该大于等于1,同时如果网络没出现崩溃leader应每隔一段时间(heartbeatTimeout)发送心跳包以确保没有新的选举产生。
警告提示网络没崩溃但是又有选举产生了(任期发生了变化)
要解决这个问题需要每隔heartbeatTimeout发送一次心跳包,A部分仅需发送一个不带任何信息的AppendEntry即可
type AppendEntriesRequest struct{
Term int
Leader int
}
type AppendEntriesResponse struct{
Term int
Success bool
}
func(rf *Raft)NewHeartbeat()*AppendEntriesRequest{
return &AppendEntriesRequest{
Term: rf.currentTerm,
Leader: rf.me,
}
}
func(rf *Raft)AppendEntries(args *AppendEntriesRequest,reply *AppendEntriesResponse){
rf.mu.Lock()
defer rf.mu.Unlock()
if rf.currentTerm>args.Term{
reply.Term=rf.currentTerm
reply.Success=false
return
}else if rf.currentTerm<args.Term{
rf.coverTofollower(args.Term)
}
rf.electionTimeout.Reset(randElectionTimeout())
reply.Term=rf.currentTerm
reply.Success=true
}
func(rf *Raft)broadCast(){
for peerIdx:=range rf.peers{
if peerIdx==rf.me{
continue
}
go func(peerIdx int){
req:=rf.NewHeartbeat()
resp:=new(AppendEntriesResponse)
ok:=rf.sendAppendEntries(peerIdx,req,resp)
rf.mu.Lock()
defer rf.mu.Unlock()
if rf.state!=leader{
return
}
if ok&&!resp.Success&&rf.currentTerm<resp.Term{
rf.covertTofollower(resp.Term)
rf.electionTimeout.Reset(randElectionTimeout())
}
}(peerIdx)
}
}
然后在前面的ticker()
中添加如下代码
case <-rf.heartbeatTimeout.C:
rf.mu.Lock()
if rf.state == leader {
rf.broadCast()
rf.heartbeatTimeout.Reset(getHeartbeatTimeout())
}
rf.mu.Unlock()
checkOneLeader()
会遍历地检查每个term中在线的节点,如果有个term中出现leader数量大于1的情况或在执行过程均没有leader产生则测试不通过;checkTerms()
则检查每个节点的term,如果出现节点term不一致的情况则测试不通过
2.再选举
该测试点会在选举出一名leader之后将leader从系统节点中强制下线,之后又重新上线,这种情况下系统应保证leader下线后重新选举出一名新的leader,每轮term有且仅有一名leader。
3.多次选举
该测试点循环十次,其中每一次循环会随机无差别地下线三个节点(被下线的节点可能是follower,或者leader),下线节点后调用chackOneLeader()
检查,之后再上线之前被踢下线的三个节点再调用checkOneLeader()
检查。
注意
在leader election阶段有几个要点值得注意:
- 关于Reset electionTimeout的时机:1.当节点收到来自当前leader的AppendEntries请求时(如果发送AppendEntries的节点已经不是当前term的leader时则不执行Reset);2.当节点发起election时(当electionTimeout耗尽时不管一个candidtate发起的election是否结束都需要再次重新再发起一次election,这样是为了防止RPC因网络延迟或丢失导致选举失败系统从而整个因此停滞);3.当一个节点给其他节点投票时。
- Term在Raft中有绝对的话语权,当一个RPC的请求或返回参数中的Term大于节点的term时,节点将currentTerm设为更大的Term,同时将状态转换为follower。
多并发编程的测试不能过了就算过了,有时候可能是侥幸,要多测试几次。测试时加上-race
查看竞争情况并进行调试。
参考
- 论文:https://pages.cs.wisc.edu/~remzi/Classes/739/Spring2004/Papers/raft.pdf
- MIT6.824 lab2(2022):https://pdos.csail.mit.edu/6.824/labs/lab-raft.html
- 作业指引:https://thesquareplanet.com/blog/students-guide-to-raft/
- 思路参考:https://www.cnblogs.com/pxlsdz/p/15557635.html