Distributed System - MIT 6.824 Raft Lab 2

Lab 4B

Lab 2A need to code the reliable election process that you will make sure the system electect a leader even with network failure. The basic logic follows Figure 2 from the paper.

Raft Election

For now, we add the following states:

	logs              Log
	currentTerm       int
	state             ServerState
	commitId        int
	lastApplied     int
	nextId  		[]int  
	matchId 		[]int 

Log can use array directly, but it is better to encapsulate it and define operations on the struct for later use. Log incudes:

type Entry struct {
	Command interface{}
	Term    int
	Index   int
}

type Log struct {
	entries    []Entry
}

The raft need to detect state change and perform corresponding actions, i.e., for Fllower and Candidate, send vote request after time out, for Leader, send heartbeat/log entries every heartbeat time. The way I did is to maintain two varialbes for each raft: startElectionAt, and nextHeartBeatTime, both are time.Time variable. Whenever the state changed: reset these timers to next available time. The next question is what state changes should reset these timers?

For Fllower

A leader or candiate changed to follower, reset startElectionAt Every time it received a something(heartbeat) from effective leader, reset startElectionAt

For Candidate

Once a follower start election, it converts to Candidate, and reset startElectionAt

For Leader

Once elected as Leader, send heartbeat and reset nextHeartBeatTime After send heartbeat, reset nextHeartBeatTime

Following this logic, I structured the backend raft state thread as:

func (rf *Raft) raftStateThread2() {

	stateChanged := rf.stateSignal.Subscribe()

	waitUntil := time.Now().Add(1 * time.Millisecond)
	for !rf.killed() {
		select {
		// when state changed, wait time is updated for all server state
		case <-stateChanged:
			rf.mu.Lock()
			switch rf.state {
			case FOLLOWER:
				waitUntil = rf.startElectionAt
			case CANDIDATE:
				if !rf.requestVoteDone {
					rf.startNewElection()
				} else {
					waitUntil = rf.startElectionAt
				}
			case LEADER:
				waitUntil = rf.nextHeartbeatTime
			}
			rf.mu.Unlock()

		// when time is out, either election time or heartbeat time, perform server state function
		case <-time.After(time.Until(waitUntil)):

			rf.mu.Lock()
			switch rf.state {
			case FOLLOWER:
				rf.startNewElection()
				waitUntil = rf.startElectionAt
			case CANDIDATE:
				rf.startNewElection()
				waitUntil = rf.startElectionAt
			case LEADER:
				rf.sendLogEntries()
				rf.setNextHeartBeatTime()
				waitUntil = rf.nextHeartbeatTime
			}
			rf.mu.Unlock()
		}
	}
}

The stateChanged variable is a channel that accept state change that reset the timers. Wherever the timer need to be reset, send a signal to this channel. If it does not receive signal from the channel, it will timeout and perform function accordingly, e.g., startNewElction() or sendLogEntries().

Where the startElectionAt timer need to be reset?

  • AppendEntries is successful;
  • Candidate/Leader stepDownToFollower;
  • After raft started new election;

Where the nextHeartBeatTime timer need to be reset?

  • After send out logentries/heartbeat

Define the following function to reset timer, start new election, state change:

func (rf *Raft) setElectionTimer() {

	rand.Seed(time.Now().UnixNano())
	n := rand.Intn(ElectionTimeout)
	rf.startElectionAt = time.Now().Add(time.Duration(ElectionTimeout+n) * time.Millisecond)
}

func (rf *Raft) setNextHeartBeatTime() {
	rf.nextHeartbeatTime = time.Now().Add(time.Duration(BroadcastTime) * time.Millisecond)
}

func (rf *Raft) startNewElection() {
	rf.currentTerm++
	rf.votedFor = rf.me
	for i := 0; i < len(rf.hasVote); i++ {
		rf.hasVote[i] = false
	}
	rf.hasVote[rf.me] = true
	rf.state = CANDIDATE

	DPrintf("Candidate %d Start new election with Term %d", rf.me, rf.currentTerm)

	rf.setElectionTimer()
	rf.sendToVote()
	rf.requestVoteDone = true
	rf.persist()

	go rf.stateSignal.NotifyAll(atomic.LoadInt32(&rf.dead))
}

func (rf *Raft) stepDownToFollower(Term int) {
	if Term > rf.currentTerm {
		rf.votedFor = -1
		rf.requestVoteDone = false
		rf.currentTerm = Term
	}
	if rf.state != FOLLOWER {
		DPrintf("Server %d become Follower at Term %d", rf.me, rf.currentTerm)
		rf.state = FOLLOWER
	}
	rf.setElectionTimer()
	go rf.stateSignal.NotifyAll(atomic.LoadInt32(&rf.dead))
	rf.persist()
}

func (rf *Raft) becomeLeader() {
	rf.state = LEADER
	for i := 0; i < len(rf.nextId); i++ {
		rf.nextId[i] = rf.getLastLogId() + 1
	}
	rf.matchId[rf.me] = rf.getLastLogId()
	rf.sendLogEntries()
	rf.setNextHeartBeatTime()
	go rf.stateSignal.NotifyAll(atomic.LoadInt32(&rf.dead))
	rf.persist()
}

Next step is when should these functions be called? They are mainly called by AppendEntries and RequestVote and the response function to these two. Following Figure 2 in the paper, RequestVote RPC is as follow, compare log and term, if both OK, vote for the candidate.

func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
	// Your code here (2A, 2B).

	rf.mu.Lock()
	defer rf.mu.Unlock()

	// log compare
	logIsOk := args.LastLogTerm > rf.logs.getLastLogTerm() || (rf.logs.getLastLogTerm() == args.LastLogTerm && args.LastLogId >= rf.getLastLogId())

	// term compare
	if args.CandidateTerm > rf.currentTerm {
		rf.stepDownToFollower(args.CandidateTerm)
	}

	if args.CandidateTerm == rf.currentTerm {
		if logIsOk && rf.votedFor == -1 {
			rf.votedFor = args.CandidateId
			rf.stepDownToFollower(args.CandidateTerm)
		}
	}

	reply.Term = rf.currentTerm
	reply.VoteGranted = (args.CandidateTerm == rf.currentTerm && rf.votedFor == args.CandidateId)
}

startNewElection calls sendToVote() and handle the vote results. Following Figure 2, sendToVote() is as follows. Raft maintains a array hasVoted[], and whenever a server replys, check the array to find whether the current server get voteQuorum, if yes, call becomeLeader().

func (rf *Raft) sendToVote() {

	if rf.state != CANDIDATE { return }

	args := RequestVoteArgs{
		CandidateTerm: rf.currentTerm,
		CandidateId:   rf.me,
		LastLogId:     rf.getLastLogId(),
		LastLogTerm:   rf.logs.getLastLogTerm(),
	}

	rf.setElectionTimer()
	for server := range rf.peers {

		if server == rf.me { continue }

		go func(server int) {
			var reply RequestVoteReply
			ok := rf.sendRequestVote(server, &args, &reply)
			if !ok { return }

			rf.mu.Lock()
			defer rf.mu.Unlock()

			if rf.state != CANDIDATE { return }
			if reply.Term > rf.currentTerm {
				rf.stepDownToFollower(reply.Term)
				return
			}
			rf.hasVote[server] = reply.VoteGranted
			if rf.voteQuorum() {
				rf.becomeLeader()
			}
		}(server)
	}
}

Lab 2B

This lab should include log entries in the raft. First thinking what should raft do when a new entries is coming: increment index, and put the entry to logs. If the current raft is not leader, return false; Because leader send logentries to follower through matchId array, leader should increment matchId too. Following code:

func (rf *Raft) Start(command interface{}) (int, int, bool) {
	rf.mu.Lock()
	defer rf.mu.Unlock()
	index := -1
	term := -1
	isLeader := (rf.state == LEADER)

	if !isLeader {
		return index, term, false
	}
	term = rf.currentTerm
	index = rf.logs.nextIndex()
	rf.logs.appendEntry(Entry{Command: command, Term: term, Index: index})
	rf.matchId[rf.me] = rf.getLastLogId()

	return index, term, isLeader
}

How to update log entries to Follower?

The logic is relatively simple, pack entries from nextId to the end of the leader’s logs. If nextId is more than leader logs, send an empty log entry to follower, that is heartbeat. This way unifies heartbeat and log entries behavior into one function. If AppendEntries replies wrong match, then decrement nextId and send log entries again. I used this method in my first implementation and can’t pass a few of the tests. By revisiting the lecture video, it turned out, it is too slow.

Following the lecture’s advise, I added two additional variables in AppendEntryReply: XTerm, XIndex. XTerm is the conflit term of PrevLogId, XIndex is the previous index of the first index that has a term of XTerm. If PrevLogId is more than current lastLogId, then XIndex = lastLogId. When it return to leader, leader’s nextId will be XIndex + 1.

When the leader get reply from server, it updates its state according the term, and if successful, nextId and matchId. If not, using XTerm and XIndex to update nextId through: find the last index that is less than XTerm, and nextId should be this index plus 1. The basic logic is: if XTerm is the conflict Term, then try from the previous Term from both Leader anf follower, which ever is minimum.

I’ve tried linear search in the two searches, it was slow, so I changed to bineary search which speed up a bit. AppendEntries RPC and sendLogEntries are as follows:

func (rf *Raft) AppendEntries(args *AppendEntryArgs, reply *AppendEntryReply) {

	rf.mu.Lock()
	defer rf.mu.Unlock()

	reply.Term = rf.currentTerm
	reply.Success = false

	reply.XTerm = -1  //conflit term
	reply.XIndex = -1 //conflit index

	if args.Term < rf.currentTerm {	return }
	if args.Term > rf.currentTerm {
		reply.Term = args.Term
	}

	rf.stepDownToFollower(args.Term)
	if args.PrevLogId > rf.getLastLogId() {
		reply.XIndex = rf.getLastLogId()
		return
	}

	if args.PrevLogId >= rf.lastSnapshotIndex && args.PrevLogId <= rf.getLastLogId() && rf.logs.getLogTerm(args.PrevLogId) != args.PrevLogTerm {
		reply.XTerm = rf.logs.getLogTerm(args.PrevLogId)
		leftId := rf.lastSnapshotIndex
		rightId := args.PrevLogId
		for leftId <= rightId {
			midId := leftId + (rightId-leftId)/2
			if rf.logs.getLogTerm(midId) >= reply.XTerm {
				rightId = midId - 1
			} else {
				leftId = midId + 1
			}
		}
		reply.XIndex = rightId
		return
	}

	reply.Success = true

	for i := 0; i < len(args.Entries); i++ {
		index := args.Entries[i].Index
		term := args.Entries[i].Term

		if index <= rf.getLastLogId() && term == rf.logs.getLogTerm(index) {
			continue
		} else {
			rf.logs.removeEntriesAfterIndex(index - 1)
			rf.logs.replicateEntries(args.Entries[i:])
			break
		}
	}
	rf.advanceCommitIndex(args.LeaderCommitId)

	rf.setElectionTimer()
	go rf.stateSignal.NotifyAll(atomic.LoadInt32(&rf.dead))

}

func (rf *Raft) sendLogEntries() {

	DPrintf("Leader %d Send Logs to Servers, next ID: %v, match Id: %v, Last Snapshot Index: %d", rf.me, rf.nextId, rf.matchId, rf.lastSnapshotIndex)

	for server := range rf.peers {
		if server == rf.me { continue	}

		go func(server int) {
			rf.mu.Lock()
			if rf.state != LEADER {
				rf.mu.Unlock()
				return
			}

			args := AppendEntryArgs{
				Term:        rf.currentTerm,
				LeaderId:    rf.me,
				PrevLogId:   rf.nextId[server] - 1,
				PrevLogTerm: rf.logs.getLogTerm(rf.nextId[server] - 1),
			}
			args.Entries = rf.logs.getEntries(rf.nextId[server], rf.getLastLogId())
			args.LeaderCommitId = rf.commitId
			rf.mu.Unlock()

			var reply AppendEntryReply
			ok := rf.sendAppendEntry(server, &args, &reply)

			rf.mu.Lock()
			defer rf.mu.Unlock()

			if !ok || rf.state != LEADER || rf.currentTerm != args.Term { return }

			if reply.Term > rf.currentTerm {
				rf.stepDownToFollower(reply.Term)
				return
			}
			if reply.Success {
				rf.matchId[server] = args.PrevLogId + len(args.Entries)
				rf.nextId[server] = rf.matchId[server] + 1
				newCommitId := rf.matchQuorum()
				rf.advanceCommitIndex(newCommitId)
				return
			}
			nextId := reply.XIndex + 1
			if reply.XTerm != -1 {
				leftId := rf.lastSnapshotIndex
				rightId := reply.XIndex
				for leftId <= rightId {
					midId := leftId + (rightId-leftId)/2
					if rf.logs.getLogTerm(midId) <= reply.XTerm {
						leftId = midId + 1
					} else {
						rightId = midId - 1
					}
				}
				nextId = leftId
			}
			rf.nextId[server] = nextId

		}(server)
	}
}

What leader need to act after it send entries to follower?

If the majority of the follower get the entries, then the leader should commit it. If the follower received a leaderCommitId more than its own commitId, the follower should update its commitId. After the commitId is updated, apply entries to state machine. I tried to use another backend go routine to constantly check commitId and lastAppliedId, when commitId is more than lastAppliedId, apply to state machine. It works but complicated the process. A simpler way is apply to state machine whenever commitId is updated. For Leader, commitId is updated by count matchId after sendLogEntries. For Follower, commitId is updated by leaderCommitId in AppendEntries RPC. The two main functions are:

func (rf *Raft) advanceCommitIndex(newCommitId int) {

	if rf.logs.getLogTerm(newCommitId) != rf.currentTerm {
		return
	}
	if rf.commitId >= newCommitId {
		return
	}
	if newCommitId > rf.getLastLogId() {
		newCommitId = rf.getLastLogId()
	}
	rf.commitId = newCommitId
	rf.applyStateMachine()
}

func (rf *Raft) applyStateMachine() {

	for rf.commitId > rf.lastApplied && rf.lastApplied < rf.getLastLogId() {
		rf.lastApplied++
		entry := rf.logs.getEntry(rf.lastApplied)
		msg := ApplyMsg{CommandValid: true, Command: entry.Command, CommandIndex: entry.Index, CommandTerm: entry.Term}
		rf.applyCh <- msg
	}
}

Lab 2C

Lab 2C adds persister to the server, so that the server could still service after reboot. currentTerm, votedFor, and logs need to be persisted. Code persist() and readPersist() is relatively simple and straight forward. Just note that a temp variable need to create when read from persister.

When should the server persist?

In my first version, I used a backend go routine to check the state every 100 millisecond, and do persist. Again, it somehow waster some time and resource. According to the persisted state, only these states changed, the persist() need to be called. Just insert rf.persist() to wherever these state changed.

These conclude Lab 2. Complete Code: github

Written on April 21, 2021