Part 2C: persistence
Part 2D: log compaction

lab2的C部分需要实现的内容比较少且逻辑都比较简单,但是测试点更加严格,如果之前A和B部分中有Bug则在C的测试点中就很容易能暴露出来,所以C部分的主要目的还是对A和B部分逻辑进行更全面地测试。由于C部分内容不多所以这次将C和D部分合在一起。

头图来源:夕暮れの坂道-しゅろく/shurock-pixiv


Part 2C:persistence

需要对Raft状态(currentTermvotedForlog [])的持久化,实现persist()readPersist()两个接口。

作业代码(部分)

持久化

func (rf *Raft) persist() {
    w := new(bytes.Buffer)

    e := labgob.NewEncoder(w)
    e.Encode(rf.currentTerm)
    e.Encode(rf.votedFor)
    e.Encode(rf.log)

    data := w.Bytes()
    if len(rf.snapShot.data) == 0 {
        rf.persister.SaveStateAndSnapshot(data, nil)
    } else {
        rf.persister.SaveStateAndSnapshot(data, rf.snapShot.data)
    }
}

读取持久化数据

func (rf *Raft) readPersist(data []byte) {
    if data == nil || len(data) < 1 { // bootstrap without any state?
        return
    }

    rf.mu.Lock()
    defer rf.mu.Unlock()
    r := bytes.NewBuffer(data)
    d := labgob.NewDecoder(r)
    var currentTerm int
    var votedFor int
    var logs []Entry
    d.Decode(&currentTerm)
    d.Decode(&votedFor)
    d.Decode(&logs)
    rf.currentTerm = currentTerm
    rf.votedFor = votedFor
    rf.log = logs
}

然后在Make()中增加一行

rf.readPersist(persister.ReadRaftState())

注意点

C部分的重点就是持久化接口persist()的调用时机,注意要在currentTermvotedForlog []发生变化的地方调用。

测试结果

单次测试
$time go test -run 2C
测试100次
$bahs multiTest.sh 2C 100

Part 2D: log compaction

需要实现日志压缩(快照机制),由于真实生产环境中Raft通常会产生大量的log,大量增加的log如果不做压缩处理很快就会占满存储空间,因此需要将已经提交到状态机的log保存下来生成快照。原作业中定义了两个接口CondInstallSnapShotSnapShot需要你去实现,其中CondInstallSnapShot暂时不做要求,只要简单返回true即可,lab2的重点在于SnapShot的实现。此外,作业中给出了下图能帮助理解Raft中server与server之间、server与client之间的交互逻辑以及server之间的同步逻辑。
raft
在这部分中需要对之前的代码进行比较多的修改,因为需要将生成快照的那部分log剪掉,所以不能再简单地用Go切片索引来作log索引,在作业Hint2中给出了提示

A good place to start is to modify your code to so that it is able to store just the part of the log starting at some index X. Initially you can set X to zero and run the 2B/2C tests. Then make Snapshot(index) discard the log before index, and set X equal to index. If all goes well you should now pass the first 2D test.

根据提示,我的做法是在Raft中增加一个firstLogIdx状态用来记录当前存储的log中第一个log的索引,当每次对log进行裁剪的时候更新firstLogIdx,对于之前使用GO切片索引log的情形,将其下标再减去firstLogIdx即可。

作业代码(部分)

AppendEntries

其中需要进行较大幅度修改的就是原来的AppendEntries RPC

func (rf *Raft) AppendEntries(args *AppendEntriesRequest, reply *AppendEntriesResponse) {
    rf.mu.Lock()
    defer rf.mu.Unlock()

    reply.Success = false
    if rf.currentTerm > args.Term {
        reply.Term = rf.currentTerm
        return
    }
    rf.electionTimeout.Reset(randElectionTimeout())
    if rf.currentTerm < args.Term {
        rf.covertToFollower(args.Term)
        rf.votedFor = args.Leader
        rf.persist()
    }
    reply.Term = rf.currentTerm
    lastLogIdx, _ := rf.getLastLogInfo()
    if args.PrevLogIdx != -1 {
        if args.PrevLogIdx > lastLogIdx {
            reply.ConflictIdx, reply.ConflictTerm = lastLogIdx+1, -1
            return
        } else if args.PrevLogIdx >= rf.firstLogIdx && rf.log[args.PrevLogIdx-rf.firstLogIdx].Term != args.PrevLogTerm {
            reply.ConflictTerm = rf.log[args.PrevLogIdx-rf.firstLogIdx].Term
            reply.ConflictIdx = rf.firstIdxByTerm(reply.ConflictTerm)
            return
        }
    }
    index := 0
    if args.PrevLogIdx < rf.firstLogIdx {
        for index < len(args.Entries) {
            if args.Entries[index].Idx == rf.firstLogIdx {
                break
            }
            index++
        }

    }
    for index < len(args.Entries) && args.PrevLogIdx+index+1-rf.firstLogIdx < len(rf.log) {
        if rf.log[args.PrevLogIdx+index+1-rf.firstLogIdx].Term != args.Entries[index].Term {
            break
        }
        index++
    }
    args.Entries = args.Entries[index:]

    if len(args.Entries) > 0 {
        rf.log = append(rf.log[:args.PrevLogIdx+index+1-rf.firstLogIdx], args.Entries...)
        rf.persist()
    }

    //If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry)
    if args.LeaderCommitIdx > rf.commitIdx {
        lastLogIdx,_:=rf.getLastLogInfo()
        rf.commitIdx = min(args.LeaderCommitIdx, lastLogIdx)
        rf.applyCond.Signal()
    }
    reply.Success = true

}

定义SnapShot结构

type SnapShot struct {
    data             []byte
    lastIncludedIdx  int
    lastIncludedTerm int
}

在Raft结构体中增加两个状态

    firstLogIdx int
    snapShot SnapShot

SnapShot

SnapShot()是原作业中已经定义需要你去实现的接口,每当从applyCh读取到额定量的command后测试点会调用SnapShot()为server生成快照。

func (rf *Raft) Snapshot(index int, snapshot []byte) {
    rf.mu.Lock()
    defer rf.mu.Unlock()
    if len(snapshot) > 0 {
        rf.snapShot = SnapShot{
            lastIncludedIdx:  index,
            lastIncludedTerm: rf.log[index-rf.firstLogIdx].Term,
            data:             snapshot,
        }
        rf.log = rf.log[index+1-rf.firstLogIdx:]
        rf.firstLogIdx = index + 1
        rf.persist()
    }
}

读取快照

同样地,在每次Make一个Raft实例后如果之前存有快照需要先读取快照

func (rf *Raft) readSnapShot(data []byte) {
    if data == nil || len(data) < 1 { // bootstrap without any state?
        return
    }

    rf.mu.Lock()
    defer rf.mu.Unlock()
    r := bytes.NewBuffer(data)
    d := labgob.NewDecoder(r)
    var i int
    var logs []interface{}
    d.Decode(&i)
    d.Decode(&logs)
    rf.snapShot=SnapShot{
        lastIncludedIdx: i,
        data: data,
    }
    rf.commitIdx=i
    rf.lastApplied=i
    rf.firstLogIdx=i+1
}

这里需要注意的就是除了更新snapShot外,还需要更新之前的lastAppliedcommitIdx
Make()中增加一行

rf.readSnapShot(persister.snapshot)

实现InstallSnapShot RPC

论文Figure 13中给出了InstallSnapShot RPC的大体逻辑
InstallSnapshot
作业没有对Snapshot进行分割做要求,因此不需要offsetdone

type InsRequest struct {
    Term             int
    Leader           int
    LastIncludedIdx  int
    LastIncludedTerm int

    Data []byte
}

type InsResponse struct {
    Term int
}

func (rf *Raft) InstallSnapShot(args *InsRequest, reply *InsResponse) {
    rf.mu.Lock()
    reply.Term = rf.currentTerm
    if rf.currentTerm > args.Term {
        rf.mu.Unlock()
        return
    }
    rf.electionTimeout.Reset(randElectionTimeout())
    if rf.currentTerm<args.Term{
        rf.covertToFollower(args.Term)
        rf.persist()
    }
    if !rf.hasSnapShot() || rf.hasSnapShot() && rf.snapShot.lastIncludedIdx < args.LastIncludedIdx {
        //save snapshot
        rf.snapShot = SnapShot{
            lastIncludedIdx:  args.LastIncludedIdx,
            lastIncludedTerm: args.LastIncludedTerm,

            data: args.Data,
        }

        if args.LastIncludedIdx-rf.firstLogIdx<len(rf.log)&& rf.log[args.LastIncludedIdx-rf.firstLogIdx].Term == args.LastIncludedTerm {
            rf.log = rf.log[args.LastIncludedIdx+1-rf.firstLogIdx:]
        } else {
            rf.log = make([]Entry, 0)
        }
        rf.commitIdx=args.LastIncludedIdx
        rf.firstLogIdx = args.LastIncludedIdx + 1
        rf.persist()
        rf.mu.Unlock()

        rf.applech<-ApplyMsg{
            SnapshotValid: true,
            Snapshot: rf.snapShot.data,
            SnapshotIndex: rf.snapShot.lastIncludedIdx,
            SnapshotTerm: rf.snapShot.lastIncludedTerm,
        }
        rf.mu.Lock()
        rf.lastApplied=rf.snapShot.lastIncludedIdx
        rf.mu.Unlock()
    } else {
        rf.mu.Unlock()
        return
    }
}

func (rf *Raft) callInstallSnapShot(peerIdx int, req *InsRequest, resp *InsResponse) {
    if rf.sendInstallSnapshot(peerIdx,req,resp){
        rf.mu.Lock()
        defer rf.mu.Unlock()
        if rf.currentTerm<resp.Term{
            rf.covertToFollower(resp.Term)
            rf.electionTimeout.Reset(randElectionTimeout())
            rf.persist()
        }
        rf.nextIdx[peerIdx]=req.LastIncludedIdx+1
    }
}

那么leader什么时候发送快照给follower呢?

Paper page12: Although servers normally take snapshots independently, the leader must occasionally send snapshots to followers that lag behind. This happens when the leader has already discarded the next log entry that it needs to send to a follower
Hint: Next: have the leader send an InstallSnapshot RPC if it doesn't have the log entries required to bring a follower up to date.

结合以上论文和作业的提示,可以有两个时机:1.当需要发给follower下一条日志的Index(即nextIdx)已经不存在于leader的log中;2.leader暂时没有log需要同步到follower时。因此可以按照上面规则修改一下原来的broadcast()

func (rf *Raft) broadCast() {
    rf.updateCommitIdx()
    rf.electionTimeout.Reset(randElectionTimeout())
    for peerIdx := range rf.peers {
        if peerIdx == rf.me {
            continue
        }
        lastLogIdx,_:=rf.getLastLogInfo()
        if rf.snapShot.lastIncludedIdx<rf.nextIdx[peerIdx]||lastLogIdx>rf.commitIdx{
            req := rf.genAppendEntriesRequest(peerIdx)
            resp := new(AppendEntriesResponse)
            go rf.callAppendEntries(peerIdx, req, resp)
        }else{
            req:=rf.genInsRequest()
            resp:=new(InsResponse)
            go rf.callInstallSnapShot(peerIdx,req,resp)
        }
    }
}

为了代码看上去简洁,对原来的日志同步代码做了些封装。

测试结果

单次测试
$time go test -run 2D
测试50次(由于测一轮要花4到5分钟,因此就偷懒只测了50次)
$bash multiTest.sh 2D 50

Last modification:July 3rd, 2022 at 10:52 am
If you think my article is useful to you, please feel free to appreciate