Distributed System - MIT 6.824 Raft Lab 2
Lab 4B
Lab 2A need to code the reliable election process that you will make sure the system electect a leader even with network failure. The basic logic follows Figure 2 from the paper.
For now, we add the following states:
logs Log
currentTerm int
state ServerState
commitId int
lastApplied int
nextId []int
matchId []int
Log can use array directly, but it is better to encapsulate it and define operations on the struct for later use. Log incudes:
type Entry struct {
Command interface{}
Term int
Index int
}
type Log struct {
entries []Entry
}
The raft need to detect state change and perform corresponding actions, i.e., for Fllower and Candidate, send vote request after time out, for Leader, send heartbeat/log entries every heartbeat time. The way I did is to maintain two varialbes for each raft: startElectionAt, and nextHeartBeatTime, both are time.Time variable. Whenever the state changed: reset these timers to next available time. The next question is what state changes should reset these timers?
For Fllower
A leader or candiate changed to follower, reset startElectionAt Every time it received a something(heartbeat) from effective leader, reset startElectionAt
For Candidate
Once a follower start election, it converts to Candidate, and reset startElectionAt
For Leader
Once elected as Leader, send heartbeat and reset nextHeartBeatTime After send heartbeat, reset nextHeartBeatTime
Following this logic, I structured the backend raft state thread as:
func (rf *Raft) raftStateThread2() {
stateChanged := rf.stateSignal.Subscribe()
waitUntil := time.Now().Add(1 * time.Millisecond)
for !rf.killed() {
select {
// when state changed, wait time is updated for all server state
case <-stateChanged:
rf.mu.Lock()
switch rf.state {
case FOLLOWER:
waitUntil = rf.startElectionAt
case CANDIDATE:
if !rf.requestVoteDone {
rf.startNewElection()
} else {
waitUntil = rf.startElectionAt
}
case LEADER:
waitUntil = rf.nextHeartbeatTime
}
rf.mu.Unlock()
// when time is out, either election time or heartbeat time, perform server state function
case <-time.After(time.Until(waitUntil)):
rf.mu.Lock()
switch rf.state {
case FOLLOWER:
rf.startNewElection()
waitUntil = rf.startElectionAt
case CANDIDATE:
rf.startNewElection()
waitUntil = rf.startElectionAt
case LEADER:
rf.sendLogEntries()
rf.setNextHeartBeatTime()
waitUntil = rf.nextHeartbeatTime
}
rf.mu.Unlock()
}
}
}
The stateChanged variable is a channel that accept state change that reset the timers. Wherever the timer need to be reset, send a signal to this channel. If it does not receive signal from the channel, it will timeout and perform function accordingly, e.g., startNewElction() or sendLogEntries().
Where the startElectionAt timer need to be reset?
- AppendEntries is successful;
- Candidate/Leader stepDownToFollower;
- After raft started new election;
Where the nextHeartBeatTime timer need to be reset?
- After send out logentries/heartbeat
Define the following function to reset timer, start new election, state change:
func (rf *Raft) setElectionTimer() {
rand.Seed(time.Now().UnixNano())
n := rand.Intn(ElectionTimeout)
rf.startElectionAt = time.Now().Add(time.Duration(ElectionTimeout+n) * time.Millisecond)
}
func (rf *Raft) setNextHeartBeatTime() {
rf.nextHeartbeatTime = time.Now().Add(time.Duration(BroadcastTime) * time.Millisecond)
}
func (rf *Raft) startNewElection() {
rf.currentTerm++
rf.votedFor = rf.me
for i := 0; i < len(rf.hasVote); i++ {
rf.hasVote[i] = false
}
rf.hasVote[rf.me] = true
rf.state = CANDIDATE
DPrintf("Candidate %d Start new election with Term %d", rf.me, rf.currentTerm)
rf.setElectionTimer()
rf.sendToVote()
rf.requestVoteDone = true
rf.persist()
go rf.stateSignal.NotifyAll(atomic.LoadInt32(&rf.dead))
}
func (rf *Raft) stepDownToFollower(Term int) {
if Term > rf.currentTerm {
rf.votedFor = -1
rf.requestVoteDone = false
rf.currentTerm = Term
}
if rf.state != FOLLOWER {
DPrintf("Server %d become Follower at Term %d", rf.me, rf.currentTerm)
rf.state = FOLLOWER
}
rf.setElectionTimer()
go rf.stateSignal.NotifyAll(atomic.LoadInt32(&rf.dead))
rf.persist()
}
func (rf *Raft) becomeLeader() {
rf.state = LEADER
for i := 0; i < len(rf.nextId); i++ {
rf.nextId[i] = rf.getLastLogId() + 1
}
rf.matchId[rf.me] = rf.getLastLogId()
rf.sendLogEntries()
rf.setNextHeartBeatTime()
go rf.stateSignal.NotifyAll(atomic.LoadInt32(&rf.dead))
rf.persist()
}
Next step is when should these functions be called? They are mainly called by AppendEntries and RequestVote and the response function to these two. Following Figure 2 in the paper, RequestVote RPC is as follow, compare log and term, if both OK, vote for the candidate.
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
// Your code here (2A, 2B).
rf.mu.Lock()
defer rf.mu.Unlock()
// log compare
logIsOk := args.LastLogTerm > rf.logs.getLastLogTerm() || (rf.logs.getLastLogTerm() == args.LastLogTerm && args.LastLogId >= rf.getLastLogId())
// term compare
if args.CandidateTerm > rf.currentTerm {
rf.stepDownToFollower(args.CandidateTerm)
}
if args.CandidateTerm == rf.currentTerm {
if logIsOk && rf.votedFor == -1 {
rf.votedFor = args.CandidateId
rf.stepDownToFollower(args.CandidateTerm)
}
}
reply.Term = rf.currentTerm
reply.VoteGranted = (args.CandidateTerm == rf.currentTerm && rf.votedFor == args.CandidateId)
}
startNewElection calls sendToVote() and handle the vote results. Following Figure 2, sendToVote() is as follows. Raft maintains a array hasVoted[], and whenever a server replys, check the array to find whether the current server get voteQuorum, if yes, call becomeLeader().
func (rf *Raft) sendToVote() {
if rf.state != CANDIDATE { return }
args := RequestVoteArgs{
CandidateTerm: rf.currentTerm,
CandidateId: rf.me,
LastLogId: rf.getLastLogId(),
LastLogTerm: rf.logs.getLastLogTerm(),
}
rf.setElectionTimer()
for server := range rf.peers {
if server == rf.me { continue }
go func(server int) {
var reply RequestVoteReply
ok := rf.sendRequestVote(server, &args, &reply)
if !ok { return }
rf.mu.Lock()
defer rf.mu.Unlock()
if rf.state != CANDIDATE { return }
if reply.Term > rf.currentTerm {
rf.stepDownToFollower(reply.Term)
return
}
rf.hasVote[server] = reply.VoteGranted
if rf.voteQuorum() {
rf.becomeLeader()
}
}(server)
}
}
Lab 2B
This lab should include log entries in the raft. First thinking what should raft do when a new entries is coming: increment index, and put the entry to logs. If the current raft is not leader, return false; Because leader send logentries to follower through matchId array, leader should increment matchId too. Following code:
func (rf *Raft) Start(command interface{}) (int, int, bool) {
rf.mu.Lock()
defer rf.mu.Unlock()
index := -1
term := -1
isLeader := (rf.state == LEADER)
if !isLeader {
return index, term, false
}
term = rf.currentTerm
index = rf.logs.nextIndex()
rf.logs.appendEntry(Entry{Command: command, Term: term, Index: index})
rf.matchId[rf.me] = rf.getLastLogId()
return index, term, isLeader
}
How to update log entries to Follower?
The logic is relatively simple, pack entries from nextId to the end of the leader’s logs. If nextId is more than leader logs, send an empty log entry to follower, that is heartbeat. This way unifies heartbeat and log entries behavior into one function. If AppendEntries replies wrong match, then decrement nextId and send log entries again. I used this method in my first implementation and can’t pass a few of the tests. By revisiting the lecture video, it turned out, it is too slow.
Following the lecture’s advise, I added two additional variables in AppendEntryReply: XTerm, XIndex. XTerm is the conflit term of PrevLogId, XIndex is the previous index of the first index that has a term of XTerm. If PrevLogId is more than current lastLogId, then XIndex = lastLogId. When it return to leader, leader’s nextId will be XIndex + 1.
When the leader get reply from server, it updates its state according the term, and if successful, nextId and matchId. If not, using XTerm and XIndex to update nextId through: find the last index that is less than XTerm, and nextId should be this index plus 1. The basic logic is: if XTerm is the conflict Term, then try from the previous Term from both Leader anf follower, which ever is minimum.
I’ve tried linear search in the two searches, it was slow, so I changed to bineary search which speed up a bit. AppendEntries RPC and sendLogEntries are as follows:
func (rf *Raft) AppendEntries(args *AppendEntryArgs, reply *AppendEntryReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
reply.Term = rf.currentTerm
reply.Success = false
reply.XTerm = -1 //conflit term
reply.XIndex = -1 //conflit index
if args.Term < rf.currentTerm { return }
if args.Term > rf.currentTerm {
reply.Term = args.Term
}
rf.stepDownToFollower(args.Term)
if args.PrevLogId > rf.getLastLogId() {
reply.XIndex = rf.getLastLogId()
return
}
if args.PrevLogId >= rf.lastSnapshotIndex && args.PrevLogId <= rf.getLastLogId() && rf.logs.getLogTerm(args.PrevLogId) != args.PrevLogTerm {
reply.XTerm = rf.logs.getLogTerm(args.PrevLogId)
leftId := rf.lastSnapshotIndex
rightId := args.PrevLogId
for leftId <= rightId {
midId := leftId + (rightId-leftId)/2
if rf.logs.getLogTerm(midId) >= reply.XTerm {
rightId = midId - 1
} else {
leftId = midId + 1
}
}
reply.XIndex = rightId
return
}
reply.Success = true
for i := 0; i < len(args.Entries); i++ {
index := args.Entries[i].Index
term := args.Entries[i].Term
if index <= rf.getLastLogId() && term == rf.logs.getLogTerm(index) {
continue
} else {
rf.logs.removeEntriesAfterIndex(index - 1)
rf.logs.replicateEntries(args.Entries[i:])
break
}
}
rf.advanceCommitIndex(args.LeaderCommitId)
rf.setElectionTimer()
go rf.stateSignal.NotifyAll(atomic.LoadInt32(&rf.dead))
}
func (rf *Raft) sendLogEntries() {
DPrintf("Leader %d Send Logs to Servers, next ID: %v, match Id: %v, Last Snapshot Index: %d", rf.me, rf.nextId, rf.matchId, rf.lastSnapshotIndex)
for server := range rf.peers {
if server == rf.me { continue }
go func(server int) {
rf.mu.Lock()
if rf.state != LEADER {
rf.mu.Unlock()
return
}
args := AppendEntryArgs{
Term: rf.currentTerm,
LeaderId: rf.me,
PrevLogId: rf.nextId[server] - 1,
PrevLogTerm: rf.logs.getLogTerm(rf.nextId[server] - 1),
}
args.Entries = rf.logs.getEntries(rf.nextId[server], rf.getLastLogId())
args.LeaderCommitId = rf.commitId
rf.mu.Unlock()
var reply AppendEntryReply
ok := rf.sendAppendEntry(server, &args, &reply)
rf.mu.Lock()
defer rf.mu.Unlock()
if !ok || rf.state != LEADER || rf.currentTerm != args.Term { return }
if reply.Term > rf.currentTerm {
rf.stepDownToFollower(reply.Term)
return
}
if reply.Success {
rf.matchId[server] = args.PrevLogId + len(args.Entries)
rf.nextId[server] = rf.matchId[server] + 1
newCommitId := rf.matchQuorum()
rf.advanceCommitIndex(newCommitId)
return
}
nextId := reply.XIndex + 1
if reply.XTerm != -1 {
leftId := rf.lastSnapshotIndex
rightId := reply.XIndex
for leftId <= rightId {
midId := leftId + (rightId-leftId)/2
if rf.logs.getLogTerm(midId) <= reply.XTerm {
leftId = midId + 1
} else {
rightId = midId - 1
}
}
nextId = leftId
}
rf.nextId[server] = nextId
}(server)
}
}
What leader need to act after it send entries to follower?
If the majority of the follower get the entries, then the leader should commit it. If the follower received a leaderCommitId more than its own commitId, the follower should update its commitId. After the commitId is updated, apply entries to state machine. I tried to use another backend go routine to constantly check commitId and lastAppliedId, when commitId is more than lastAppliedId, apply to state machine. It works but complicated the process. A simpler way is apply to state machine whenever commitId is updated. For Leader, commitId is updated by count matchId after sendLogEntries. For Follower, commitId is updated by leaderCommitId in AppendEntries RPC. The two main functions are:
func (rf *Raft) advanceCommitIndex(newCommitId int) {
if rf.logs.getLogTerm(newCommitId) != rf.currentTerm {
return
}
if rf.commitId >= newCommitId {
return
}
if newCommitId > rf.getLastLogId() {
newCommitId = rf.getLastLogId()
}
rf.commitId = newCommitId
rf.applyStateMachine()
}
func (rf *Raft) applyStateMachine() {
for rf.commitId > rf.lastApplied && rf.lastApplied < rf.getLastLogId() {
rf.lastApplied++
entry := rf.logs.getEntry(rf.lastApplied)
msg := ApplyMsg{CommandValid: true, Command: entry.Command, CommandIndex: entry.Index, CommandTerm: entry.Term}
rf.applyCh <- msg
}
}
Lab 2C
Lab 2C adds persister to the server, so that the server could still service after reboot. currentTerm, votedFor, and logs need to be persisted. Code persist() and readPersist() is relatively simple and straight forward. Just note that a temp variable need to create when read from persister.
When should the server persist?
In my first version, I used a backend go routine to check the state every 100 millisecond, and do persist. Again, it somehow waster some time and resource. According to the persisted state, only these states changed, the persist() need to be called. Just insert rf.persist() to wherever these state changed.
These conclude Lab 2. Complete Code: github