Part 2B: log replication

这部分让我头疼了足足两周多的时间,出bug调试完后原先过的测试点又Fail,再debug又出现一系列新问题...各种打log分析log,整个过程痛苦居多,不过感觉整个人也升华了(各种意义上)。

头图来源:夕暮れの坂道-しゅろく/shurock-pixiv


Log Replication

这部分主要工作是完成raft中日志的复制同步、追加。

作业代码(部分)

Election restriction

//leader election restriction.If the logs have last entries
//with different terms, then the log with the later term is
//more up-to-date. If the logs end with the same term, then
//whichever log is longer is more up-to-date
func (rf *Raft) isLogToUpdate(term, idx int) bool {
    lastlog := rf.log[len(rf.log)-1]
    return term > lastlog.Term || term == lastlog.Term && idx >= lastlog.Idx
}

需要在之前RequestVote()中添加一下代码

if args.Term < rf.currentTerm || (args.Term == rf.currentTerm && rf.votedFor != -1 && rf.votedFor != args.PeerId) || !rf.isLogUpTodate(args.LastLogTerm, args.LastLogIdx) {
        reply.VoteForMe = false
        reply.Term = rf.currentTerm
        return
    }

AppendEntries

func (rf *Raft) AppendEntries(args *AppendEntriesRequest, reply *AppendEntriesResponse) {
    rf.mu.Lock()
    defer rf.mu.Unlock()
    reply.Success = false
    if rf.currentTerm > args.Term {
        reply.Term = rf.currentTerm
        return
    }
    rf.electionTimeout.Reset(randElectionTimeout())
    rf.covertToFollower(args.Term)
    reply.Term = rf.currentTerm
    if args.PrevLogIdx >= len(rf.log) {
        reply.ConflictIdx, reply.ConflictTerm = len(rf.log), -1
        return
    } else if rf.log[args.PrevLogIdx].Term != args.PrevLogTerm {
        reply.ConflictTerm = rf.log[args.PrevLogIdx].Term
        reply.ConflictIdx = rf.firstIdxByTerm(reply.ConflictTerm)
        return
    }

    index := 0
    for index < len(args.Entries) && args.PrevLogIdx+index+1 < len(rf.log) {
        if rf.log[args.PrevLogIdx+index+1].Term != args.Entries[index].Term {
            break
        }
        index++
    }
    args.Entries = args.Entries[index:]

    if len(args.Entries) > 0 {
        rf.log = append(rf.log[:args.PrevLogIdx+index+1], args.Entries...)
    }

    //If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry)
    if args.LeaderCommitIdx > rf.commitIdx {
        rf.commitIdx = min(args.LeaderCommitIdx, rf.log[len(rf.log)-1].Idx)
        rf.applyCond.Signal()
    }
    reply.Success = true
}

AppendEntries RPC的实现是本部分的难点,关于AppendEntries RPC的请求返回参数以及一些基本逻辑在论文中作出了详细说明,具体可参考下图
AppendEntries

日志同步

func (rf *Raft) broadCast() {
    rf.electionTimeout.Reset(randElectionTimeout())
    rf.updateCommitIdx()
    for peerIdx := range rf.peers {
        if peerIdx == rf.me {
            continue
        }
        req := rf.genAppendEntriesRequest(peerIdx)
        resp := new(AppendEntriesResponse)
        go func(peerIdx int,req *AppendEntriesRequest,resp *AppendEntriesResponse) {
            if rf.sendAppendEntries(peerIdx, req, resp) {
                rf.mu.Lock()
                if req.Term == rf.currentTerm {
                    if rf.currentTerm < resp.Term {
                        rf.covertToFollower(resp.Term)
                        rf.electionTimeout.Reset(randElectionTimeout())
                        rf.mu.Unlock()
                        return
                    }
                    if resp.Success {
                        rf.matchIdx[peerIdx] = req.PrevLogIdx + len(req.Entries)
                        rf.nextIdx[peerIdx] = rf.matchIdx[peerIdx] + 1
                    } else {
                        nextIdx := rf.lastIdxByTerm(resp.ConflictTerm)
                        if nextIdx != -1 {
                            rf.nextIdx[peerIdx] = nextIdx + 1
                        } else {
                            rf.nextIdx[peerIdx] = resp.ConflictIdx
                        }
                    }
                }
                rf.mu.Unlock()
            }
        }(peerIdx,req,resp)
    }
}

如果有节点的日志需要同步,需要同步的日志会出现在leader发送的心跳包中,需要注意的是无论leader发送过来的心跳包是否包含日志follower均需执行AppendEntries()

func (rf *Raft) genAppendEntriesRequest(peerIdx int) *AppendEntriesRequest {
    req := &AppendEntriesRequest{
        Term:            rf.currentTerm,
        Leader:          rf.me,
        LeaderCommitIdx: rf.commitIdx,
    }
    if len(rf.log)-1 >= rf.nextIdx[peerIdx] {
        req.Entries = rf.log[rf.nextIdx[peerIdx]:]
    }
    prevIdx := rf.nextIdx[peerIdx] - 1
    req.PrevLogIdx, req.PrevLogTerm = rf.log[prevIdx].Idx, rf.log[prevIdx].Term
    return req
}

可以看到当leader发现自身日志的Index大于或等于待发送给follower节点下一日志的Index(len(rf.log) >= rf.nextIdx[peerIdx])时,会将大于或等于rf.nextIdx[peerIdx]这部分日志装载进心跳包中。

应用日志

func (rf *Raft) applier() {
    for rf.killed() == false {
        rf.applyCond.L.Lock()
        for rf.lastApplied >= rf.commitIdx {
            rf.applyCond.Wait()
        }

        entries := rf.log[rf.lastApplied+1 : rf.commitIdx+1]

        if len(entries) > 0 {
            for _, entry := range entries {
                rf.applech <- ApplyMsg{
                    CommandValid: true,
                    Command:      entry.Command,
                    CommandIndex: entry.Idx,
                }
            }
            rf.lastApplied = rf.commitIdx
        }
        rf.applyCond.L.Unlock()

    }
}

applier()用来实现日志应用,这里需要用到条件变量,如果lastapplied大于或等于commitIndex时阻塞applier(),当节点更新commitIndex时通过调用Signal()唤醒applier()来应用日志。

测试点

1.basic agreement
最基本的日志同步,发一条指令检查日志同步情况
2.RPC byte count
通过计算RPC包的大小检查是否包含了太多内容不必要的内容
3.agreement after follower reconnects
启动三个节点,先发送一条指令检查同步,之后挂掉一个follower再发送一些指令,正确的情况应该是剩余两个节点正常提交并应用这些指令日志,当挂掉的follower恢复时检查该follower是否能同步之前提交的指令日志
4.no agreement if too many followers disconnect
检查当过多(超过总节点半数)节点挂掉后leader是否仍然提交了日志。
5.concurrent Start()s
模拟多台客户机同时向节点发送指令的情形,检查该情况下是否正确并按序地提交了日志。
6.rejoin of partitioned leader
检验leader挂掉后又恢复集群选举出新leader的情况下日志是否能正常同步。
7.leader backs up quickly over incorrect follower logs
模拟follower在长时间当机且遗漏了许多日志的情况,检验follower重新恢复后日志是否能及时同步。这部分考察的是PrevLogIndex日志冲突的情况下leader如何更新NextIndex以高效地同步日志,如果只是简单的对NextIndex逐步减1,则这该测试点很可能不通过。
8. RPC counts aren't too high
检查节点日志同步是否发送了过多不必要的RPC

注意

1. MatchIndex与NextIndex的关系

nextIndex is a guess as to what prefix the leader shares with a given follower. It is generally quite 》optimistic (we share everything), and is moved backwards only on negative responses. For example, when a >leader has just been elected, nextIndex is set to be index index at the end of the log. In a way, >nextIndex is used for performance – you only need to send these things to this peer.

matchIndex is used for safety. It is a conservative measurement of what prefix of the log the leader >shares with a given follower. matchIndex cannot ever be set to a value that is too high, as this may >cause the commitIndex to be moved too far forward. This is why matchIndex is initialized to -1 (i.e., we >agree on no prefix), and only updated when a follower positively acknowledges an AppendEntries RPC.

2. 日志的commit和apply
首先要明确日志的commit和apply。当客户机向leader发送指令时,leader将这条指令封装成日志append到自身的log[]中,这个步骤并不是commit,至于什么时候commit这条日志,则需要等超过半数的follower将这条日志append到自身的log[]中时,该条日志才允许被提交,在论文中有这么一段话

If there exists an N such that N > commitIndex, a majority
of matchIndex[i] ≥ N, and log[N].term == currentTerm:
set commitIndex = N

需要正确地实现这段话的逻辑来更新commitIndex。其中为什么需要log[N].term == currentTerm,想是由于避免出现论文中Figure 8的情况
Figure8
如上图,S1(currentTerm:2)将index 2的log(logTerm:2)还没同步到大多数节点上就crash了。之后如果S5被选举为leader,再接收到index 2的log(logTerm:3)后crash,S5 crash后S1又被重新选举为leader(currentTerm:4),如果S1 将index 2的log(logTerm:2)同步到除S5以外的大多数节点上时如果没有log[N].term == currentTerm就会提交该log,这是非常危险的,因为如果后面S1再次crash而S5恢复,根据isLogUpToDate()的逻辑S5仍然有可能会被选举为leader,这就意味着S5会将其index2 的log(logTerm:3)同步到其他节点并成功提交,这时候其他节点已经提交了的index2 log(logTerm:2)就会被覆盖,系统出现不一致提交。

leader提交日志时还需要检查日志的Term是否等于自身的Term,leader仅提交于自身Term中的日志

3. Term的判断
在A部分中也提到,leader只有在确保请求时的term等于当前term时才处理RPC响应,这也是一个容易遗漏的地方,这样做的意义就在于leader不会处理已经过期的RPC请求。
4. 性能调优
之前在测试点7中也提到,当leader发送过来的PrevLogIndex不存在于follower日志中,或与follower中PrevLogIndex所在日志冲突时leader需要对NextIndex进行回滚,如果每次冲突将NextIndex减一,这也许是个解决办法但完全谈不上效率,在助教的博客中就提供了一些解决办法

If a follower does not have prevLogIndex in its log, it should return with conflictIndex = len(log) and >conflictTerm = None.

If a follower does have prevLogIndex in its log, but the term does not match, it should return >conflictTerm = log[prevLogIndex].Term, and then search its log for the first index whose entry has term >equal to conflictTerm.

Upon receiving a conflict response, the leader should first search its log for conflictTerm. If it finds >an entry in its log with that term, it should set nextIndex to be the one beyond the index of the last >entry in that term in its log.

If it does not find an entry with that term, it should set nextIndex = conflictIndex.

实验结果

Results

结果自查

总时间不宜超过1分钟,总CPU时间不宜超过5秒。具体自查点可参考作业提示

If your solution uses much more than a minute of real time for the 2B tests, or much more than 5 seconds of CPU time, you may run into trouble later on. Look for time spent sleeping or waiting for RPC timeouts, loops that run without sleeping or waiting for conditions or channel messages, or large numbers of RPCs sent.

多次测试结果

该lab中的测试不能仅仅测一次成功就算完成了,因为Raft存在着大量的不确定因素。为了确保写的代码的鲁棒性,方便反复多次地重复测试,这边写了个多测脚本(几个月没写shell脚本都有些生疏了orz)。测了几遍后发现老是有几次第七个测试点过不去(差不多100次中有4次左右吧)。

测试脚本

#!/usr/bin/env bash
failed_times=0
total_time=0

for i in $(seq 1 $2)
do
    echo "---TEST(${i}/$2)---"
    
    startTime_s=`date +%s`
    output=`go test -run $1|grep -B 1 -iE 'FAIL|error'`
    endTime_s=`date +%s`
    perTime=$[ $endTime_s - $startTime_s ]
    if [[ "${output}" != "" ]]
    then
        failed_times=`expr $failed_times + 1`
        echo $output
    else
        echo "PASS(time costs:${perTime}s)"
    fi
    total_time=`expr $perTime + $total_time`
done

if [ 0 -eq ${failed_times} ]
then
    echo "---PASS ALL---"
else
    echo "---FAIL SOME TESTS(${failed_times})---"
fi

echo "Total time:${total_time}s"

食用示例

#测试2B 100次
$sh multiTest.sh 2B 100

总结

通过这次的实验对并发编程的调试也算有了些经验,同时也对共享变量临界区的管理有了全新的认知(是否有必要加锁、加锁的粒度如何控制才不会造成资源的浪费,哪些资源可能被其他协程所使用等等)。再者就是不要光顾写代码而要多回头看看论文,de再多的bug有时不如回头多看看论文检查是否遗漏了哪些重要的细节,同时也能够加深理解。

2022.06.12 更新

之前提到100次测试中总有那么几次测试点7不通过,这次经过反复打log看log终于发现是之前A部分中RequestVote()逻辑有误导致一直无法选举出leader(收到RequestVote RPC后只要拒绝投票就直接返回,这是不对的,如果拒绝投票的同时请求参数Term大于当前Term的时候节点还需要将状态转换成follower)。因此对之前的实现进行了些修改,这下终于能够全部通过了

    if args.Term < rf.currentTerm || (args.Term == rf.currentTerm && rf.votedFor != -1 && rf.votedFor != args.PeerId) || !rf.isLogUpTodate(args.LastLogTerm, args.LastLogIdx) {        
        //虽然candidate的日志过期但是如果它的Term大于自身Term仍然需要将状态转换成follower
       if args.Term > rf.currentTerm {
            rf.covertToFollower(args.Term)
            rf.electionTimeout.Reset(randElectionTimeout())
        }
        return
    }

测试结果

参考

相关博客

Last modification:June 24th, 2022 at 12:43 pm
If you think my article is useful to you, please feel free to appreciate