Part 2C: persistence
Part 2D: log compaction
lab2的C部分需要实现的内容比较少且逻辑都比较简单,但是测试点更加严格,如果之前A和B部分中有Bug则在C的测试点中就很容易能暴露出来,所以C部分的主要目的还是对A和B部分逻辑进行更全面地测试。由于C部分内容不多所以这次将C和D部分合在一起。
头图来源:夕暮れの坂道-しゅろく/shurock-pixiv
Part 2C:persistence
需要对Raft状态(currentTerm
、votedFor
和log []
)的持久化,实现persist()
和readPersist()
两个接口。
作业代码(部分)
持久化
func (rf *Raft) persist() {
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(rf.currentTerm)
e.Encode(rf.votedFor)
e.Encode(rf.log)
data := w.Bytes()
if len(rf.snapShot.data) == 0 {
rf.persister.SaveStateAndSnapshot(data, nil)
} else {
rf.persister.SaveStateAndSnapshot(data, rf.snapShot.data)
}
}
读取持久化数据
func (rf *Raft) readPersist(data []byte) {
if data == nil || len(data) < 1 { // bootstrap without any state?
return
}
rf.mu.Lock()
defer rf.mu.Unlock()
r := bytes.NewBuffer(data)
d := labgob.NewDecoder(r)
var currentTerm int
var votedFor int
var logs []Entry
d.Decode(¤tTerm)
d.Decode(&votedFor)
d.Decode(&logs)
rf.currentTerm = currentTerm
rf.votedFor = votedFor
rf.log = logs
}
然后在Make()
中增加一行
rf.readPersist(persister.ReadRaftState())
注意点
C部分的重点就是持久化接口persist()
的调用时机,注意要在currentTerm
、votedFor
和log []
发生变化的地方调用。
测试结果
单次测试
测试100次
Part 2D: log compaction
需要实现日志压缩(快照机制),由于真实生产环境中Raft通常会产生大量的log,大量增加的log如果不做压缩处理很快就会占满存储空间,因此需要将已经提交到状态机的log保存下来生成快照。原作业中定义了两个接口CondInstallSnapShot
和SnapShot
需要你去实现,其中CondInstallSnapShot
暂时不做要求,只要简单返回true
即可,lab2的重点在于SnapShot
的实现。此外,作业中给出了下图能帮助理解Raft中server与server之间、server与client之间的交互逻辑以及server之间的同步逻辑。
在这部分中需要对之前的代码进行比较多的修改,因为需要将生成快照的那部分log剪掉,所以不能再简单地用Go切片索引来作log索引,在作业Hint2中给出了提示
A good place to start is to modify your code to so that it is able to store just the part of the log starting at some index X. Initially you can set X to zero and run the 2B/2C tests. Then make Snapshot(index) discard the log before index, and set X equal to index. If all goes well you should now pass the first 2D test.
根据提示,我的做法是在Raft中增加一个firstLogIdx
状态用来记录当前存储的log中第一个log的索引,当每次对log进行裁剪的时候更新firstLogIdx
,对于之前使用GO切片索引log的情形,将其下标再减去firstLogIdx
即可。
作业代码(部分)
AppendEntries
其中需要进行较大幅度修改的就是原来的AppendEntries RPC
func (rf *Raft) AppendEntries(args *AppendEntriesRequest, reply *AppendEntriesResponse) {
rf.mu.Lock()
defer rf.mu.Unlock()
reply.Success = false
if rf.currentTerm > args.Term {
reply.Term = rf.currentTerm
return
}
rf.electionTimeout.Reset(randElectionTimeout())
if rf.currentTerm < args.Term {
rf.covertToFollower(args.Term)
rf.votedFor = args.Leader
rf.persist()
}
reply.Term = rf.currentTerm
lastLogIdx, _ := rf.getLastLogInfo()
if args.PrevLogIdx != -1 {
if args.PrevLogIdx > lastLogIdx {
reply.ConflictIdx, reply.ConflictTerm = lastLogIdx+1, -1
return
} else if args.PrevLogIdx >= rf.firstLogIdx && rf.log[args.PrevLogIdx-rf.firstLogIdx].Term != args.PrevLogTerm {
reply.ConflictTerm = rf.log[args.PrevLogIdx-rf.firstLogIdx].Term
reply.ConflictIdx = rf.firstIdxByTerm(reply.ConflictTerm)
return
}
}
index := 0
if args.PrevLogIdx < rf.firstLogIdx {
for index < len(args.Entries) {
if args.Entries[index].Idx == rf.firstLogIdx {
break
}
index++
}
}
for index < len(args.Entries) && args.PrevLogIdx+index+1-rf.firstLogIdx < len(rf.log) {
if rf.log[args.PrevLogIdx+index+1-rf.firstLogIdx].Term != args.Entries[index].Term {
break
}
index++
}
args.Entries = args.Entries[index:]
if len(args.Entries) > 0 {
rf.log = append(rf.log[:args.PrevLogIdx+index+1-rf.firstLogIdx], args.Entries...)
rf.persist()
}
//If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry)
if args.LeaderCommitIdx > rf.commitIdx {
lastLogIdx,_:=rf.getLastLogInfo()
rf.commitIdx = min(args.LeaderCommitIdx, lastLogIdx)
rf.applyCond.Signal()
}
reply.Success = true
}
定义SnapShot结构
type SnapShot struct {
data []byte
lastIncludedIdx int
lastIncludedTerm int
}
在Raft结构体中增加两个状态
firstLogIdx int
snapShot SnapShot
SnapShot
SnapShot()
是原作业中已经定义需要你去实现的接口,每当从applyCh
读取到额定量的command后测试点会调用SnapShot()
为server生成快照。
func (rf *Raft) Snapshot(index int, snapshot []byte) {
rf.mu.Lock()
defer rf.mu.Unlock()
if len(snapshot) > 0 {
rf.snapShot = SnapShot{
lastIncludedIdx: index,
lastIncludedTerm: rf.log[index-rf.firstLogIdx].Term,
data: snapshot,
}
rf.log = rf.log[index+1-rf.firstLogIdx:]
rf.firstLogIdx = index + 1
rf.persist()
}
}
读取快照
同样地,在每次Make一个Raft实例后如果之前存有快照需要先读取快照
func (rf *Raft) readSnapShot(data []byte) {
if data == nil || len(data) < 1 { // bootstrap without any state?
return
}
rf.mu.Lock()
defer rf.mu.Unlock()
r := bytes.NewBuffer(data)
d := labgob.NewDecoder(r)
var i int
var logs []interface{}
d.Decode(&i)
d.Decode(&logs)
rf.snapShot=SnapShot{
lastIncludedIdx: i,
data: data,
}
rf.commitIdx=i
rf.lastApplied=i
rf.firstLogIdx=i+1
}
这里需要注意的就是除了更新snapShot外,还需要更新之前的lastApplied
和commitIdx
。
在Make()
中增加一行
rf.readSnapShot(persister.snapshot)
实现InstallSnapShot RPC
论文Figure 13中给出了InstallSnapShot RPC的大体逻辑
作业没有对Snapshot进行分割做要求,因此不需要offset
和done
type InsRequest struct {
Term int
Leader int
LastIncludedIdx int
LastIncludedTerm int
Data []byte
}
type InsResponse struct {
Term int
}
func (rf *Raft) InstallSnapShot(args *InsRequest, reply *InsResponse) {
rf.mu.Lock()
reply.Term = rf.currentTerm
if rf.currentTerm > args.Term {
rf.mu.Unlock()
return
}
rf.electionTimeout.Reset(randElectionTimeout())
if rf.currentTerm<args.Term{
rf.covertToFollower(args.Term)
rf.persist()
}
if !rf.hasSnapShot() || rf.hasSnapShot() && rf.snapShot.lastIncludedIdx < args.LastIncludedIdx {
//save snapshot
rf.snapShot = SnapShot{
lastIncludedIdx: args.LastIncludedIdx,
lastIncludedTerm: args.LastIncludedTerm,
data: args.Data,
}
if args.LastIncludedIdx-rf.firstLogIdx<len(rf.log)&& rf.log[args.LastIncludedIdx-rf.firstLogIdx].Term == args.LastIncludedTerm {
rf.log = rf.log[args.LastIncludedIdx+1-rf.firstLogIdx:]
} else {
rf.log = make([]Entry, 0)
}
rf.commitIdx=args.LastIncludedIdx
rf.firstLogIdx = args.LastIncludedIdx + 1
rf.persist()
rf.mu.Unlock()
rf.applech<-ApplyMsg{
SnapshotValid: true,
Snapshot: rf.snapShot.data,
SnapshotIndex: rf.snapShot.lastIncludedIdx,
SnapshotTerm: rf.snapShot.lastIncludedTerm,
}
rf.mu.Lock()
rf.lastApplied=rf.snapShot.lastIncludedIdx
rf.mu.Unlock()
} else {
rf.mu.Unlock()
return
}
}
func (rf *Raft) callInstallSnapShot(peerIdx int, req *InsRequest, resp *InsResponse) {
if rf.sendInstallSnapshot(peerIdx,req,resp){
rf.mu.Lock()
defer rf.mu.Unlock()
if rf.currentTerm<resp.Term{
rf.covertToFollower(resp.Term)
rf.electionTimeout.Reset(randElectionTimeout())
rf.persist()
}
rf.nextIdx[peerIdx]=req.LastIncludedIdx+1
}
}
那么leader什么时候发送快照给follower呢?
Paper page12: Although servers normally take snapshots independently, the leader must occasionally send snapshots to followers that lag behind. This happens when the leader has already discarded the next log entry that it needs to send to a follower
Hint: Next: have the leader send an InstallSnapshot RPC if it doesn't have the log entries required to bring a follower up to date.
结合以上论文和作业的提示,可以有两个时机:1.当需要发给follower下一条日志的Index(即nextIdx
)已经不存在于leader的log中;2.leader暂时没有log需要同步到follower时。因此可以按照上面规则修改一下原来的broadcast()
func (rf *Raft) broadCast() {
rf.updateCommitIdx()
rf.electionTimeout.Reset(randElectionTimeout())
for peerIdx := range rf.peers {
if peerIdx == rf.me {
continue
}
lastLogIdx,_:=rf.getLastLogInfo()
if rf.snapShot.lastIncludedIdx<rf.nextIdx[peerIdx]||lastLogIdx>rf.commitIdx{
req := rf.genAppendEntriesRequest(peerIdx)
resp := new(AppendEntriesResponse)
go rf.callAppendEntries(peerIdx, req, resp)
}else{
req:=rf.genInsRequest()
resp:=new(InsResponse)
go rf.callInstallSnapShot(peerIdx,req,resp)
}
}
}
为了代码看上去简洁,对原来的日志同步代码做了些封装。
测试结果
单次测试
测试50次(由于测一轮要花4到5分钟,因此就偷懒只测了50次)