Part 3A: Key/value service without snapshots

头图来源:夏の中で生きる君-loundraw-pixiv


Part 3A: Key/value service without snapshots

lab3需要实现的是基于raft算法的上层应用——一个可容错的kv存储器,具备基本的PutGetAppend

相关代码

客户端

需要注意的是为了确保客户端请求的幂等性,需要为每个客户端请求设定一个唯一的ID。GenCommandId()通过当前时间戳为每个客户端请求生成一个请求ID。

Get

//
// fetch the current value for a key.
// returns "" if the key does not exist.
// keeps trying forever in the face of all other errors.
//
// you can send an RPC with code like this:
// ok := ck.servers[i].Call("KVServer.Get", &args, &reply)
//
// the types of args and reply (including whether they are pointers)
// must match the declared types of the RPC handler function's
// arguments. and reply must be passed as a pointer.
//
func (ck *Clerk) Get(key string) string {
    // You will have to modify this function.
    commandId := GenCommandId()
    args := &GetArgs{
        Key:       key,
        CommandId: commandId,
    }
    reply := new(GetReply)
    serverId := int(nrand()) % len(ck.servers)
    for {
        if ck.servers[serverId].Call("KVServer.Get", args, reply) {
            DPrintf("reply from server%d %v",serverId,reply)
            switch reply.Err {
            case OK:
                return reply.Value
            case ErrNoKey:
                return ""
            case ErrWrongLeader:
                serverId = (serverId + 1) % len(ck.servers)
            case CommandOverwritten:
                continue
            }
        }else{
            serverId = (serverId + 1) % len(ck.servers)
        }
    }
}

Put&Append

//
// shared by Put and Append.
//
// you can send an RPC with code like this:
// ok := ck.servers[i].Call("KVServer.PutAppend", &args, &reply)
//
// the types of args and reply (including whether they are pointers)
// must match the declared types of the RPC handler function's
// arguments. and reply must be passed as a pointer.
//
func (ck *Clerk) PutAppend(key string, value string, op string) {
    // You will have to modify this function.
    commandId := GenCommandId()
    args := &PutAppendArgs{
        Key:       key,
        Value:     value,
        Op:        op,
        CommandId: commandId,
    }
    reply := new(PutAppendReply)
    serverId := int(nrand()) % len(ck.servers)

    for {
        if ck.servers[serverId].Call("KVServer.PutAppend", args, reply) {
            switch reply.Err {
            case OK:
                return
            case ErrNoKey:
                return
            case ErrWrongLeader:
                serverId = (serverId + 1) % len(ck.servers)
            case ErrRepeatedCommand:
                return
            case CommandOverwritten:
                continue
            }
        } else {
            serverId = (serverId + 1) % len(ck.servers)
        }
    }

}

服务端

该部分重点还是在于服务端的实现,客户端通过调用Get()PutAppend()RPC与服务端进行交互。大体逻辑就是服务端处理来自于客户端的请求并该将请求命令同步到服务端集群中的各个节点,当节点同步完成并成功提交后服务端处理请求然后响应客户端。这里需要处理的一个难点就是服务端要知道请求什么时候完成同步,我的思路是使用一个条件变量阻塞将处理请求的RPC阻塞,然后添加一个nowCommand属性用来存储从applyCh获取到的来自于底层raft提交的指令,每次从applyCh获取到指令唤醒RPC并比对指令的Index,如果相等再执行接下来一系列逻辑。

kvServer结构

type Op struct {
    Index     int
    CommandId int64
    Command   interface{}
}

type KVServer struct {
    mu      sync.Mutex
    cmdMu   sync.Mutex //确保线性一致
    me      int
    rf      *raft.Raft
    applyCh chan raft.ApplyMsg
    dead    int32 // set by Kill()

    maxraftstate int // snapshot if log grows this big

    // Your definitions here.
    data map[string]string 

    cond *sync.Cond

    cmd map[int64]bool //确保请求幂等

    nowCommand Op
}

applier

同样地,为服务端实现了一个applier()用于获取并应用底层raft的提交

func (kv *KVServer) applier() {
    for !kv.killed() {
        msg := <-kv.applyCh
        kv.handleMsg(msg)
    }
}

func(kv *KVServer) handleMsg(msg raft.ApplyMsg){
    if msg.CommandValid{
        kv.mu.Lock()
        switch msg.Command.(type) {
        case GetArgs:
            cmd := msg.Command.(GetArgs)
            kv.cmd[cmd.CommandId] = true
            kv.nowCommand = Op{
                CommandId: cmd.CommandId,
                Index:     msg.CommandIndex,
                Command:   cmd,
            }
        case PutAppendArgs:
            cmd := msg.Command.(PutAppendArgs)
            kv.nowCommand = Op{
                CommandId: cmd.CommandId,
                Index:     msg.CommandIndex,
                Command:   cmd,
            }
            kv.doCommand(cmd)            
        default:
            kv.nowCommand = Op{
                CommandId: -1,
                Index:     msg.CommandIndex,
                Command:   -1,
            }
        }
        kv.condMakeSnapshot()
        kv.cond.Broadcast()
        kv.mu.Unlock()
    }else{
        kv.cond.Broadcast()
        kv.cmdMu.Lock()
        if kv.rf.CondInstallSnapshot(msg.SnapshotTerm,msg.SnapshotIndex,msg.Snapshot){
            kv.mu.Lock()
            kv.ingestSnap(msg.Snapshot)
            kv.mu.Unlock()
        }
        kv.cmdMu.Unlock()
    }
    
}

func (kv *KVServer) doCommand(cmd PutAppendArgs) {
    if _, ok := kv.cmd[cmd.CommandId]; ok {
        return
    }
    kv.cmd[cmd.CommandId] = true
    switch cmd.Op {
    case "Put":
        kv.data[cmd.Key] = cmd.Value
    case "Append":
        if v, ok := kv.data[cmd.Key]; ok {
            var s strings.Builder
            s.WriteString(v)
            s.WriteString(cmd.Value)
            kv.data[cmd.Key] = s.String()
        } else {
            kv.data[cmd.Key] = cmd.Value
        }
    }
}

Get RPC

这里为了确保请求的幂等就简单粗暴地为RPC上了锁(cmdMu)

func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
    // Your code here.
    kv.cmdMu.Lock()
    defer kv.cmdMu.Unlock()
    var index int
    var isLeader bool
    reply.Err = OK
    index, _, isLeader = kv.rf.Start(*args)
    if !isLeader {
        reply.Err = ErrWrongLeader
        return
    }
    kv.cond.L.Lock()
    for kv.nowCommand.Index < index {
        kv.cond.Wait()
        if _, isLeader := kv.rf.GetState(); !isLeader {
            reply.Err = ErrWrongLeader
            kv.cond.L.Unlock()
            return
        }
    }
    //确保日志未被覆盖
    if index!=kv.nowCommand.Index||!isCommandEqual(*args, kv.nowCommand.Command) {
        kv.cond.L.Unlock()
        reply.Err = CommandOverwritten
        return
    }

    if value, ok := kv.data[args.Key]; ok {
        reply.Value = value
    } else {
        reply.Err = ErrNoKey
    }
    kv.mu.Unlock()
}

PutAppend RPC

func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
    // Your code here.
    kv.cmdMu.Lock()
    defer kv.cmdMu.Unlock()
    var index int
    var isLeader bool
    reply.Err = OK
    kv.cond.L.Lock()
    if _, ok := kv.cmd[args.CommandId]; ok {
        reply.Err = ErrRepeatedCommand
        kv.cond.L.Unlock()
        return
    }
    kv.cond.L.Unlock()
    index, _, isLeader = kv.rf.Start(*args)
    if !isLeader {
        reply.Err = ErrWrongLeader
        return
    }
    kv.cond.L.Lock()
    for kv.nowCommand.Index < index {
        kv.cond.Wait()
        if _, isLeader := kv.rf.GetState(); !isLeader {
            reply.Err = ErrWrongLeader
            kv.cond.L.Unlock()
            return
        }
    }
    //确保日志未被覆盖
    if index==kv.nowCommand.Index&&isCommandEqual(*args, kv.nowCommand.Command) {
        kv.cond.L.Unlock()
        return
    } else {
        reply.Err = CommandOverwritten
        kv.cond.L.Unlock()
        return
    }
}

注意点

之前有提到,Raft算法中如果当前最新可提交日志的Term不等于leader的currentTerm,leader不会执行提交操作。这并没有任何问题,但对于lab3(测试点:Test: partitions, one client)就可能会出现系统停滞的尴尬情况,原因就在于新leader产生后不久大多数节点当机,而旧log(logTerm小于currentTerm)还未提交并应用,由于新的log无法完成同步导致旧log一直无法应用。论文中对于这种情况有以下提示

To find out, it needs to commit an entry from its term. Raft handles this by having each leader commit a blank no-op entry into the log at the start of its
term.

于是需要对原raft中startElection()进行一些修改,当节点票数过半成功当选leader时产生并同步一条无意义日志以此来使未提交的旧log能够提交至状态机。

if rf.currentTerm == req.Term && resp.VoteForMe {
                    voteCnt++
                    if voteCnt > len(rf.peers)/2 && rf.state != leader {
                        rf.state = leader
                        
                        index, _ := rf.getLastLogInfo()
                        index += 1
                        newEntry := Entry{
                            Idx:     index,
                            Term:    rf.currentTerm,
                            Command: 0,
                        }
                        rf.log = append(rf.log, newEntry)
                        rf.persist()
                        rf.initialize()
                        rf.broadCast()
                        rf.heartbeatTimeout.Reset(getHeartbeatTimeout())
                    }
                }

测试结果

单次测试
time go test -run 3A
测试50次
$bash multiTest.sh 3A 50

2022.07.14 更新

这里用条件变量通知server同步完成有概率出现死锁(按常规理解一般是先wait后signal,但事实上也可能出现先signal后wait的情况,这样一来server将会永远被阻塞)导致server无法正常返回,这在3A中没测出来但3B中最后一个测试点就有概率会出现。解决办法是用通道来替代条件变量通知server同步结果,需要对源代码进行比较大幅度的修改,这部分坑就先留着等以后再填吧。

Last modification:July 14th, 2022 at 02:09 pm
If you think my article is useful to you, please feel free to appreciate