Distributed System - MIT 6.824 Raft Lab 3

Lab 3 build up a Key Value system based on Raft.

Lab 3A

Understanding high lever logic

Key value server is a server to store key/value pairs. Raft is used to replicate the server to improve availability. Normally, each server has 3 - 5 replicas, and they are based on Raft to obtain strong consistency. Recall raft has one leader and others are followers. All request from clients should only go through leader to obtain strong consistency.

What data KV server need to maintain

Logically, a map with key/value is required.

How does KV server communicate with Raft

KV server receives request (Get, Put, Append), and sends to raft for agreement by(raft.Start()). Raft rejects the request if it is not leader. The leader then saves the request (command) to its logs, and sends to followers for agreement. Once leader gets agreement and commits the current entry, it sends apply message through go channel to KV server. KV server then replys to client. At the same time, followers also send commited logs to their associated server to make sure every KV server has the same copy. See below sequence diagram.

KV Server sequence

How does client communicate with KV server

Client doesn’t know which server is leader, so it send request to all servers, the server has to decide to reject the request or accept it. Client has to keep sending request until it receives a OK message which means its request is accepted. Once the client get a OK message, that means it finds the leader. THe leader might change though, but the client can still save the current leaderId for later use.

The client might send repeat request to the same leader before it receives OK message. So the server has to have some mechanism to detecte duplicate request. The following approach is used in this implementation:

  • Each client maintains a unique clientId and sequenceId for its request. Each request has only one sequenceId, and sequenceId is monotonic increasing.
  • Each KV server maintains a map of request record, where the key is the clientId, and value is sequenceId. In the Get, PutAppend RPC, the server detects the request, if it saw the same clientId and sequenceId in the map, then return.
  • The KV server also need to check duplicate request before it implement the Key/Value to the kv store. If not, update request record and implement the request, else, ignore the request. This is because repeat request could be sent to Raft too.

Duplicate request check

requestRecord maintains a map between cliendId and the max sequenceId from the client. If the current seqId is more than the exiting sequenceId, that means the server has never seen this request before. Note, to maintain a uniform interface, the map’s key/vale are both string. So a conversion between int64 to string is performed.

    func (kv *KVServer) isDuplicateRequest(clientId int64, seqId int64) bool {
        clientSeq, ok := kv.requestRecord.Get(strconv.FormatInt(clientId, 10))
        maxSeq, _ := strconv.ParseInt(clientSeq, 10, 64)
        if ok == OK && maxSeq >= seqId {
            return true
        }
        return false
    }

What information should be exchanged between KV Server and Raft

Because all the operation has to be performed through leader, and followers don’t see anything from clients. The only thing followers see the logs from Leader. That means, all information implemented in Leader server need to be in the Log, so that followers could also implement them. Therefore,

  • Key/Value is a must.
  • Operation Type (Get, Put, Append);
  • ClientId and sequenceId for followers to detect duplication and record request. The followers also need to update duplicate tabe (request record), because once it is elected Leader later, it has all the informatin that previous leader has.

Below is Op struct that send to raft:

    type Op struct {
        OpId     int64
        ClientId int64
        OpType   string
        Key      string
        Value    string
        Index    int
        Term     int
    }

The server pack the message and sent to raft in Get, PutAppend RPC:

	op := Op{OpId: args.SequenceId, OpType: GET, Key: args.Key, ClientId: args.ClientId}

	var isLeader bool
	op.Index, op.Term, isLeader = kv.rf.Start(op)
	if !isLeader {
		reply.Err = ErrWrongLeader
		return
	}

How does the KV server update its state

A backend go routine for the server to contantly check message from Raft. Raft sends message to server through applyCh, server has to poll out message from applyCh and handle the message.

    func (kv *KVServer) serverStateThread() {

        for !kv.killed() {

            select {
            case applyMsg := <-kv.applyCh:
                kv.mu.Lock()
                    kv.handleStateMessage(applyMsg)
                kv.mu.Unlock()
            }
        }
    }

How to handle apply message from Raft

Apply message include Index, Term and Op that the server send to raft. The first thing is to convert the message to Op struct, and then check whether this Op is already implemented in the server. If not, then implement it by put or append or get.

How does the backend thread send message to Get, PutAppend RPC and return to client.

This is a asychronous operation, i.e., Get/PutAppend RPC send to raft and wait the answer from backend routine. A channel could be used for the RPC to wait information from backend routine. How to ensure that the received Op is the one that the RPC previously sent? Two ways to ensure that: 1. Index the created channel using the log index from raft; 2. Comparing the input Op to the received Op. Note that during leader transition, it might received differnt Op from other leader with the same log index.

Function to get a channel for each index and wait for backend routine to fullfil the channel. If no response more than 1 second, assume this server lose leadership or is down.

    func (kv *KVServer) getAppliedOperation(index int) Op {

        kv.mu.Lock()
        ch := kv.getOpChannel(index)
        kv.mu.Unlock()
        op := Op{}
        select {
        case appliedOp := <-ch:
            op = appliedOp

        case <-time.After(1000 * time.Millisecond):
        }

        kv.mu.Lock()
        kv.removeOpChannel(index)
        kv.mu.Unlock()

        return op
    }

When the apply message is implemented successfully, send the Op to the channel

    func (kv *KVServer) sendOpSignal(op Op) {

        ch, ok := kv.opChannel[op.Index]
        if ok {
            if len(ch) > 0 { // if the ch is blocked, comsume it
                <-ch
            }
            ch <- op
        }
    }

    func (kv *KVServer) handleStateMessage(applyMsg raft.ApplyMsg) {

        op, ok := applyMsg.Command.(Op)

        if ok {
            op.Index = applyMsg.CommandIndex
            op.Term = applyMsg.CommandTerm
            if !kv.isDuplicateRequest(op.ClientId, op.OpId) {
                kv.updateServiceState(&op)
            }
            kv.sendOpSignal(op)
        }
    }

    func (kv *KVServer) updateServiceState(op *Op) {

        switch op.OpType {
        case PUT:
            kv.kvData.Put(op.Key, op.Value)
        case APPEND:
            kv.kvData.Append(op.Key, op.Value)
        }

        kv.requestRecord.Put(strconv.FormatInt(op.ClientId, 10), strconv.FormatInt(op.OpId, 10))
        kv.lastAppliedIndex = op.Index
        kv.lastAppliedTerm = op.Term
    }

Get, PutAppend response to client

Get the applied Op, and comparing with the sendint Op, and return to client.

 	if !isLeader {
		reply.Err = ErrWrongLeader
		return
	}
	//DPrintf("Server %d PutAppend request from Client %d, op: %v", kv.me, args.ClientId, op)

	appliedOp := kv.getAppliedOperation(op.Index)

	if equalOperation(op, appliedOp) {
		reply.Err = OK
	} else {
		reply.Err = ErrWrongLeader
	}

Lab 3B

Lab 3B adds snapshot of the KV server, so that raft logs could be truncated and save storage space. The basic logic is: When the KV server detect the raft state size is equal or more than the maxraftsize, it start to take snapshot, and send the snapshot to its raft for storage. Each of the KV server takes and maintain its snapshot independently, i.e., the leader does not send snapshot to followers normally. However, when the follower is too backward, and require the leader to send logs before the leaders snapshot, the leader need to send snapshot directly to the follower, because the leader doesn’t have logs that the follower requires. Below picture shows the relationship between clients, KV server, and Raft.

KV Server Model

Take and install snapshot in server

This is simple at server side, just check the raft state size and encode the state, and then send to raft. Install snapshot at server side just do the reverse of take snapshot. Note that the server need to add another state: lastAppliedIndex, to track the latest state.

    func (kv *KVServer) takeSnapshot(lastAppliedIndex int, lastAppliedTerm int) {

        if kv.maxraftstate == -1 || kv.persister.RaftStateSize() < kv.maxraftstate {
            return
        }

        w := new(bytes.Buffer)
        e := labgob.NewEncoder(w)
        e.Encode(kv.requestRecord)
        e.Encode(kv.kvData)
        e.Encode(lastAppliedIndex)
        snapshot := w.Bytes()

        go kv.rf.Snapshot(snapshot, lastAppliedIndex, lastAppliedTerm)

    }

    // install snapshot to server
    func (kv *KVServer) installSnapshot(data []byte) {

        if data == nil || len(data) < 1 { // bootstrap without any state?
            return
        }
        r := bytes.NewBuffer(data)
        d := labgob.NewDecoder(r)

        reqMap, kvDat := Datastore{}, Datastore{}
        d.Decode(&reqMap)
        d.Decode(&kvDat)
        var lastAppliedId int
        d.Decode(&lastAppliedId)
        d.Decode(&lastAppliedTm)
        kv.lastAppliedIndex = lastAppliedId
        kv.requestRecord = reqMap
        kv.kvData = kvDat
    }

When the snapshot should be taken?

In my first implementation, I used a snapshot thread and constantly check the raft size in the backend go routine. It worked, but the snapshot is not uniform, i.e, sometimes the raft state size goes very big before snapshot is taken. I then changed my logic by inserting snapshot at every state change, i.e., at when the server receives information from raft.

What should raft do when server calls it to snapshot?

Raft needs to truncate logs to the lastAppliedIndex from KV server, and save snapshot and persist. Because raft might accept two snapshots: one from its own KV server, the other from Leader. This concurrency requires raft to check whether more recent snapshot has been installed everytime when it trys to install snapshot. A state lastSnapshotIndex alsore need to add to raft.

    func (rf *Raft) Snapshot(snapshot []byte, lastIndex int, lastTerm int) {
        rf.mu.Lock()
        defer rf.mu.Unlock()

        if lastIndex < rf.lastSnapshotIndex {
            return
        }

        rf.lastSnapshotIndex = lastIndex
        rf.lastSnapshotTerm = lastTerm
        rf.saveStateAndSnapshot(snapshot, lastIndex, lastTerm)
    }

Follower install snapshot from Leader

Similarily to AppendEntries RPC, the follower need to check its term and leader’s term. If the follower’s lastApplied is more recent than leader’s snapshot, then follower should disrecard this snapshot, because that means its KV server is more recent. The Follower then truncates its logs and send snapshot to its service and updates its lastApplied and commitId. Raft sends snapshot through applyCh to server:

    func (rf *Raft) sendSnapshotToServices(snapshot []byte, lastIncludedId int, lastIncludedTerm int) {

        msg := ApplyMsg{}
        msg.LastIncludedIndex = lastIncludedId
        msg.LastIncludedTerm = lastIncludedTerm
        msg.Snapshot = snapshot
        msg.CommandValid = false
        rf.applyCh <- msg
    }

What does KV server do when it receives snapshot from raft?

Make sure the snapshot is more recent than its current state. And Install the snapshot to KV store. A variable int applyCh is used to check whether its Op or snapshot.

    func (kv *KVServer) updateServiceSnapshot(applyMsg raft.ApplyMsg) {

        // old snapshot should not installed
        if applyMsg.LastIncludedIndex < kv.lastAppliedIndex {
            return
        }
        kv.installSnapshot(applyMsg.Snapshot)
        kv.lastAppliedIndex = applyMsg.LastIncludedIndex
    }

    // monitor and receive concensus agreement state
    func (kv *KVServer) serverStateThread() {

        for !kv.killed() {

            select {
            case applyMsg := <-kv.applyCh:
                kv.mu.Lock()
                if !applyMsg.CommandValid {
                    kv.updateServiceSnapshot(applyMsg)
                } else {
                    kv.handleStateMessage(applyMsg)
                    kv.takeSnapshot(applyMsg.CommandIndex, applyMsg.CommandTerm)
                }
                kv.mu.Unlock()
            }
        }
    }

When should the leader send snapshot to follower

When the leader already snapshoted the logs that the follower requires, i.e., when nextId[server] < lastSnapshotIndex. So this operation could be added to sendLogEntries and replace log entry sending with snapshot sending.

    func (rf *Raft) leaderOperation() {

        for server := range rf.peers {
            if server == rf.me {
                continue
            }
            if rf.nextId[server] <= rf.lastSnapshotIndex {
                go rf.sendSnapshot(server)
            } else {
                go rf.sendLogEntries(server)
            }
        }
        rf.setNextHeartBeatTime()
    }

Send snapshot and sendLogEntries are similar, pack the input, and if sending is succesful, update nextId and matchId.

For now, the snapshot function is completed. Here is the test result, tested 200 times.

KV Server Test

A few notes during the implementation:

  1. Double check every mu.Lock() has a Unlock() when the function return. This takes me a lot of time to debug.
  2. To speed up the agreement, after every Start(), perform leader operation so the leader could send logs to its follower.
  3. Check the most recent state before install snapshot is very important.
Written on April 22, 2021