Part 3A: Key/value service without snapshots
Part 3A: Key/value service without snapshots
lab3需要实现的是基于raft算法的上层应用——一个可容错的kv存储器,具备基本的Put
、Get
和Append
。
相关代码
客户端
需要注意的是为了确保客户端请求的幂等性,需要为每个客户端请求设定一个唯一的ID。GenCommandId()
通过当前时间戳为每个客户端请求生成一个请求ID。
Get
//
// fetch the current value for a key.
// returns "" if the key does not exist.
// keeps trying forever in the face of all other errors.
//
// you can send an RPC with code like this:
// ok := ck.servers[i].Call("KVServer.Get", &args, &reply)
//
// the types of args and reply (including whether they are pointers)
// must match the declared types of the RPC handler function's
// arguments. and reply must be passed as a pointer.
//
func (ck *Clerk) Get(key string) string {
// You will have to modify this function.
commandId := GenCommandId()
args := &GetArgs{
Key: key,
CommandId: commandId,
}
reply := new(GetReply)
serverId := int(nrand()) % len(ck.servers)
for {
if ck.servers[serverId].Call("KVServer.Get", args, reply) {
DPrintf("reply from server%d %v",serverId,reply)
switch reply.Err {
case OK:
return reply.Value
case ErrNoKey:
return ""
case ErrWrongLeader:
serverId = (serverId + 1) % len(ck.servers)
case CommandOverwritten:
continue
}
}else{
serverId = (serverId + 1) % len(ck.servers)
}
}
}
Put&Append
//
// shared by Put and Append.
//
// you can send an RPC with code like this:
// ok := ck.servers[i].Call("KVServer.PutAppend", &args, &reply)
//
// the types of args and reply (including whether they are pointers)
// must match the declared types of the RPC handler function's
// arguments. and reply must be passed as a pointer.
//
func (ck *Clerk) PutAppend(key string, value string, op string) {
// You will have to modify this function.
commandId := GenCommandId()
args := &PutAppendArgs{
Key: key,
Value: value,
Op: op,
CommandId: commandId,
}
reply := new(PutAppendReply)
serverId := int(nrand()) % len(ck.servers)
for {
if ck.servers[serverId].Call("KVServer.PutAppend", args, reply) {
switch reply.Err {
case OK:
return
case ErrNoKey:
return
case ErrWrongLeader:
serverId = (serverId + 1) % len(ck.servers)
case ErrRepeatedCommand:
return
case CommandOverwritten:
continue
}
} else {
serverId = (serverId + 1) % len(ck.servers)
}
}
}
服务端
该部分重点还是在于服务端的实现,客户端通过调用Get()
和PutAppend()
RPC与服务端进行交互。大体逻辑就是服务端处理来自于客户端的请求并该将请求命令同步到服务端集群中的各个节点,当节点同步完成并成功提交后服务端处理请求然后响应客户端。这里需要处理的一个难点就是服务端要知道请求什么时候完成同步,我的思路是使用一个条件变量阻塞将处理请求的RPC阻塞,然后添加一个nowCommand
属性用来存储从applyCh
获取到的来自于底层raft提交的指令,每次从applyCh
获取到指令唤醒RPC并比对指令的Index,如果相等再执行接下来一系列逻辑。
kvServer结构
type Op struct {
Index int
CommandId int64
Command interface{}
}
type KVServer struct {
mu sync.Mutex
cmdMu sync.Mutex //确保线性一致
me int
rf *raft.Raft
applyCh chan raft.ApplyMsg
dead int32 // set by Kill()
maxraftstate int // snapshot if log grows this big
// Your definitions here.
data map[string]string
cond *sync.Cond
cmd map[int64]bool //确保请求幂等
nowCommand Op
}
applier
同样地,为服务端实现了一个applier()
用于获取并应用底层raft的提交
func (kv *KVServer) applier() {
for !kv.killed() {
msg := <-kv.applyCh
kv.handleMsg(msg)
}
}
func(kv *KVServer) handleMsg(msg raft.ApplyMsg){
if msg.CommandValid{
kv.mu.Lock()
switch msg.Command.(type) {
case GetArgs:
cmd := msg.Command.(GetArgs)
kv.cmd[cmd.CommandId] = true
kv.nowCommand = Op{
CommandId: cmd.CommandId,
Index: msg.CommandIndex,
Command: cmd,
}
case PutAppendArgs:
cmd := msg.Command.(PutAppendArgs)
kv.nowCommand = Op{
CommandId: cmd.CommandId,
Index: msg.CommandIndex,
Command: cmd,
}
kv.doCommand(cmd)
default:
kv.nowCommand = Op{
CommandId: -1,
Index: msg.CommandIndex,
Command: -1,
}
}
kv.condMakeSnapshot()
kv.cond.Broadcast()
kv.mu.Unlock()
}else{
kv.cond.Broadcast()
kv.cmdMu.Lock()
if kv.rf.CondInstallSnapshot(msg.SnapshotTerm,msg.SnapshotIndex,msg.Snapshot){
kv.mu.Lock()
kv.ingestSnap(msg.Snapshot)
kv.mu.Unlock()
}
kv.cmdMu.Unlock()
}
}
func (kv *KVServer) doCommand(cmd PutAppendArgs) {
if _, ok := kv.cmd[cmd.CommandId]; ok {
return
}
kv.cmd[cmd.CommandId] = true
switch cmd.Op {
case "Put":
kv.data[cmd.Key] = cmd.Value
case "Append":
if v, ok := kv.data[cmd.Key]; ok {
var s strings.Builder
s.WriteString(v)
s.WriteString(cmd.Value)
kv.data[cmd.Key] = s.String()
} else {
kv.data[cmd.Key] = cmd.Value
}
}
}
Get RPC
这里为了确保请求的幂等就简单粗暴地为RPC上了锁(cmdMu)
func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
// Your code here.
kv.cmdMu.Lock()
defer kv.cmdMu.Unlock()
var index int
var isLeader bool
reply.Err = OK
index, _, isLeader = kv.rf.Start(*args)
if !isLeader {
reply.Err = ErrWrongLeader
return
}
kv.cond.L.Lock()
for kv.nowCommand.Index < index {
kv.cond.Wait()
if _, isLeader := kv.rf.GetState(); !isLeader {
reply.Err = ErrWrongLeader
kv.cond.L.Unlock()
return
}
}
//确保日志未被覆盖
if index!=kv.nowCommand.Index||!isCommandEqual(*args, kv.nowCommand.Command) {
kv.cond.L.Unlock()
reply.Err = CommandOverwritten
return
}
if value, ok := kv.data[args.Key]; ok {
reply.Value = value
} else {
reply.Err = ErrNoKey
}
kv.mu.Unlock()
}
PutAppend RPC
func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
// Your code here.
kv.cmdMu.Lock()
defer kv.cmdMu.Unlock()
var index int
var isLeader bool
reply.Err = OK
kv.cond.L.Lock()
if _, ok := kv.cmd[args.CommandId]; ok {
reply.Err = ErrRepeatedCommand
kv.cond.L.Unlock()
return
}
kv.cond.L.Unlock()
index, _, isLeader = kv.rf.Start(*args)
if !isLeader {
reply.Err = ErrWrongLeader
return
}
kv.cond.L.Lock()
for kv.nowCommand.Index < index {
kv.cond.Wait()
if _, isLeader := kv.rf.GetState(); !isLeader {
reply.Err = ErrWrongLeader
kv.cond.L.Unlock()
return
}
}
//确保日志未被覆盖
if index==kv.nowCommand.Index&&isCommandEqual(*args, kv.nowCommand.Command) {
kv.cond.L.Unlock()
return
} else {
reply.Err = CommandOverwritten
kv.cond.L.Unlock()
return
}
}
注意点
之前有提到,Raft算法中如果当前最新可提交日志的Term不等于leader的currentTerm,leader不会执行提交操作。这并没有任何问题,但对于lab3(测试点:Test: partitions, one client)就可能会出现系统停滞的尴尬情况,原因就在于新leader产生后不久大多数节点当机,而旧log(logTerm小于currentTerm)还未提交并应用,由于新的log无法完成同步导致旧log一直无法应用。论文中对于这种情况有以下提示
To find out, it needs to commit an entry from its term. Raft handles this by having each leader commit a blank no-op entry into the log at the start of its
term.
于是需要对原raft中startElection()
进行一些修改,当节点票数过半成功当选leader时产生并同步一条无意义日志以此来使未提交的旧log能够提交至状态机。
if rf.currentTerm == req.Term && resp.VoteForMe {
voteCnt++
if voteCnt > len(rf.peers)/2 && rf.state != leader {
rf.state = leader
index, _ := rf.getLastLogInfo()
index += 1
newEntry := Entry{
Idx: index,
Term: rf.currentTerm,
Command: 0,
}
rf.log = append(rf.log, newEntry)
rf.persist()
rf.initialize()
rf.broadCast()
rf.heartbeatTimeout.Reset(getHeartbeatTimeout())
}
}
测试结果
单次测试
测试50次
2022.07.14 更新
这里用条件变量通知server同步完成有概率出现死锁(按常规理解一般是先wait后signal,但事实上也可能出现先signal后wait的情况,这样一来server将会永远被阻塞)导致server无法正常返回,这在3A中没测出来但3B中最后一个测试点就有概率会出现。解决办法是用通道来替代条件变量通知server同步结果,需要对源代码进行比较大幅度的修改,这部分坑就先留着等以后再填吧。