Skip to content

Commit

Permalink
chore: finish template
Browse files Browse the repository at this point in the history
  • Loading branch information
justin0u0 committed May 31, 2022
1 parent d35f062 commit f37d439
Showing 1 changed file with 97 additions and 166 deletions.
263 changes: 97 additions & 166 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type Raft struct {
config *Config
logger *zap.Logger

// lastHeartbeat stores the last time of a valid RPC received from the leader
lastHeartbeat time.Time

// rpcCh stores incoming RPCs
Expand Down Expand Up @@ -58,117 +59,90 @@ func NewRaft(id uint32, peers map[uint32]Peer, persister Persister, config *Conf
// RPC handlers

func (r *Raft) applyCommand(req *pb.ApplyCommandRequest) (*pb.ApplyCommandResponse, error) {
if r.state != Leader {
return nil, errNotLeader
}
// TODO: (B.1)* - if not leader, reject client operation and returns `errNotLeader`

lastLogId, _ := r.getLastLog()
e := &pb.Entry{Id: lastLogId + 1, Term: r.currentTerm, Data: req.GetData()}
r.appendLogs([]*pb.Entry{e})
// TODO: (B.1)* - create a new log entry, append to the local entries
// Hint:
// - use `getLastLog` to get the last log ID
// - use `appendLogs` to append new log

return &pb.ApplyCommandResponse{Entry: e}, nil
// TODO: (B.1)* - return the new log entry
return nil, nil
}

func (r *Raft) appendEntries(req *pb.AppendEntriesRequest) (*pb.AppendEntriesResponse, error) {
if req.GetTerm() < r.currentTerm {
r.logger.Info("reject append entries since current term is older")

return &pb.AppendEntriesResponse{Term: r.currentTerm, Success: false}, nil
}
// TODO: (A.1) - reply false if term < currentTerm
// Log: r.logger.Info("reject append entries since current term is older")

r.lastHeartbeat = time.Now()
// TODO: (A.2)* - reset the `lastHeartbeat`
// Description: start from the current line, the current request is a valid RPC

// increase term if receive a newer one
if req.GetTerm() > r.currentTerm {
r.toFollower(req.GetTerm())
r.logger.Info("increase term since receive a newer one", zap.Uint64("term", r.currentTerm))
}
// TODO: (A.3) - if RPC request or response contains term T > currentTerm: set currentTerm = T, convert to follower
// Hint: use `toFollower` to convert to follower
// Log: r.logger.Info("increase term since receive a newer one", zap.Uint64("term", r.currentTerm))

if req.GetTerm() == r.currentTerm && r.state != Follower {
r.toFollower(req.GetTerm())
r.logger.Info("receive request from leader, fallback to follower", zap.Uint64("term", r.currentTerm))
}
// TODO: (A.4) - if AppendEntries RPC received from new leader: convert to follower
// Log: r.logger.Info("receive request from leader, fallback to follower", zap.Uint64("term", r.currentTerm))

// verify the last log entry
prevLogId := req.GetPrevLogId()
prevLogTerm := req.GetPrevLogTerm()
if prevLogId != 0 && prevLogTerm != 0 {
log := r.getLog(prevLogId)

if prevLogTerm != log.GetTerm() {
r.logger.Info("the given previous log from leader is missing or mismatched",
zap.Uint64("prevLogId", prevLogId),
zap.Uint64("prevLogTerm", prevLogTerm),
zap.Uint64("logTerm", log.GetTerm()))

return &pb.AppendEntriesResponse{Term: r.currentTerm, Success: false}, nil
}
// TODO: (B.2) - reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm
// Hint: use `getLog` to get log with ID equals to prevLogId
// Log: r.logger.Info("the given previous log from leader is missing or mismatched", zap.Uint64("prevLogId", prevLogId), zap.Uint64("prevLogTerm", prevLogTerm), zap.Uint64("logTerm", log.GetTerm()))
}

if len(req.GetEntries()) != 0 {
// delete entries after previous log
r.deleteLogs(prevLogId)

// append new entries
r.appendLogs(req.GetEntries())

r.logger.Info("receive and append new entries",
zap.Int("newEntries", len(req.GetEntries())),
zap.Int("numberOfEntries", len(r.logs)),
)
// TODO: (B.3) - if an existing entry conflicts with a new one (same index but different terms), delete the existing entry and all that follow it
// TODO: (B.4) - append any new entries not already in the log
// Hint: use `deleteLogs` follows by `appendLogs`
// Log: r.logger.Info("receive and append new entries", zap.Int("newEntries", len(req.GetEntries())), zap.Int("numberOfEntries", len(r.logs)))
}

if req.GetLeaderCommitId() > r.commitIndex {
lastLogId, _ := r.getLastLog()
if req.GetLeaderCommitId() < lastLogId {
r.setCommitIndex(req.GetLeaderCommitId())
} else {
r.setCommitIndex(lastLogId)
}

r.logger.Info("update commit index from leader", zap.Uint64("commitIndex", r.commitIndex))
go r.applyLogs(r.applyCh)
}
// TODO: (B.5) - if leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry)
// Hint: use `getLastLog` to get the index of last new entry
// Hint: use `applyLogs` to apply(commit) new logs in background
// Log: r.logger.Info("update commit index from leader", zap.Uint64("commitIndex", r.commitIndex))

return &pb.AppendEntriesResponse{Term: r.currentTerm, Success: true}, nil
}

func (r *Raft) requestVote(req *pb.RequestVoteRequest) (*pb.RequestVoteResponse, error) {
// reject if current term is older
if req.GetTerm() < r.currentTerm {
r.logger.Info("reject request vote since current term is older")
// TODO: (A.5) - reply false if term < currentTerm
// Log: r.logger.Info("reject request vote since current term is older")

return &pb.RequestVoteResponse{Term: r.currentTerm, VoteGranted: false}, nil
}
// TODO: (A.6) - if RPC request or response contains term T > currentTerm: set currentTerm = T, convert to follower
// Hint: use `toFollower` to convert to follower
// Log: r.logger.Info("increase term since receive a newer one", zap.Uint64("term", r.currentTerm))

// increase term if receive a newer one
if req.GetTerm() > r.currentTerm {
r.toFollower(req.GetTerm())
r.logger.Info("increase term since receive a newer one", zap.Uint64("term", r.currentTerm))
}
if false {
// TODO: (A.7) - if votedFor is null or candidateId, and candidate’s log is at least as up-to-date as receiver’s log, grant vote
// Hint: (fix the condition) if already vote for another candidate, reply false

// reject if already vote for another candidate
if r.votedFor != 0 && r.votedFor != req.GetCandidateId() {
r.logger.Info("reject since already vote for another candidate",
zap.Uint64("term", r.currentTerm),
zap.Uint32("votedFor", r.votedFor))

return &pb.RequestVoteResponse{Term: r.currentTerm, VoteGranted: false}, nil
}

lastLogId, lastLogTerm := r.getLastLog()

// reject if last log entry is more up-to-date
if lastLogTerm > req.GetLastLogTerm() || (lastLogTerm == req.GetLastLogTerm() && lastLogId > req.GetLastLogId()) {
if false {
// TODO: (A.7) - if votedFor is null or candidateId, and candidate’s log is at least as up-to-date as receiver’s log, grant vote
// Hint: (fix the condition) if the local last entry is more up-to-date than the candidate's last entry, reply false
// Hint: use `getLastLog` to get the last log entry
r.logger.Info("reject since last entry is more up-to-date")

return &pb.RequestVoteResponse{Term: r.currentTerm, VoteGranted: false}, nil
}

// TODO: (A.7) - if votedFor is null or candidateId, and candidate’s log is at least as up-to-date as receiver’s log, grant vote
// Hint: now vote should be granted, use `voteFor` to set votedFor
r.voteFor(req.GetCandidateId(), false)
r.lastHeartbeat = time.Now()
r.logger.Info("vote for another candidate", zap.Uint32("votedFor", r.votedFor))

// TODO: (A.8)* - reset the `lastHeartbeat`
// Description: start from the current line, the current request is a valid RPC

return &pb.RequestVoteResponse{Term: r.currentTerm, VoteGranted: true}, nil
}

Expand Down Expand Up @@ -234,7 +208,8 @@ func (r *Raft) runFollower(ctx context.Context) {
}

func (r *Raft) handleFollowerHeartbeatTimeout() {
r.toCandidate()
// TODO: (A.9) - if election timeout elapses without receiving AppendEntries RPC from current leader or granting vote to candidate: convert to candidate
// Hint: use `toCandidate` to convert to candidate

r.logger.Info("heartbeat timeout, change state from follower to candidate")
}
Expand Down Expand Up @@ -285,57 +260,49 @@ func (r *Raft) runCandidate(ctx context.Context) {
}

func (r *Raft) voteForSelf(grantedVotes *int) {
r.voteFor(r.id, true)
(*grantedVotes)++
// TODO: (A.10) increment currentTerm
// TODO: (A.10) vote for self
// Hint: use `voteFor` to vote for self

r.logger.Info("vote for self", zap.Uint64("term", r.currentTerm))
}

func (r *Raft) broadcastRequestVote(ctx context.Context, voteCh chan *voteResult) {
r.logger.Info("broadcast request vote", zap.Uint64("term", r.currentTerm))

lastLogId, lastLogTerm := r.getLastLog()

req := &pb.RequestVoteRequest{
Term: r.currentTerm,
CandidateId: r.id,
LastLogId: lastLogId,
LastLogTerm: lastLogTerm,
// TODO: (A.11) - send RequestVote RPCs to all other servers (set all fields of `req`)
// Hint: use `getLastLog` to get the last log entry
}

// TODO: (A.11) - send RequestVote RPCs to all other servers (modify the code to send `RequestVote` RPCs in parallel)
for peerId, peer := range r.peers {
peerId := peerId
peer := peer

go func() {
resp, err := peer.RequestVote(ctx, req)
if err != nil {
r.logger.Error("fail to send RequestVote RPC", zap.Error(err), zap.Uint32("peer", peerId))
return
}
resp, err := peer.RequestVote(ctx, req)
if err != nil {
r.logger.Error("fail to send RequestVote RPC", zap.Error(err), zap.Uint32("peer", peerId))
return
}

voteCh <- &voteResult{RequestVoteResponse: resp, peerId: peerId}
}()
voteCh <- &voteResult{RequestVoteResponse: resp, peerId: peerId}
}
}

func (r *Raft) handleVoteResult(vote *voteResult, grantedVotes *int, votesNeeded int) {
if vote.GetTerm() > r.currentTerm {
r.toFollower(vote.GetTerm())
r.logger.Info("receive new term on RequestVote response, fallback to follower", zap.Uint32("peer", vote.peerId))

return
}
// TODO: (A.12) - if RPC request or response contains term T > currentTerm: set currentTerm = T, convert to follower
// Hint: use `toFollower` to convert to follower
// Log: r.logger.Info("receive new term on RequestVote response, fallback to follower", zap.Uint32("peer", vote.peerId))

if vote.VoteGranted {
(*grantedVotes)++
r.logger.Info("vote granted", zap.Uint32("peer", vote.peerId), zap.Int("grantedVote", (*grantedVotes)))
}

if (*grantedVotes) >= votesNeeded {
r.toLeader()
r.logger.Info("election won", zap.Int("grantedVote", (*grantedVotes)), zap.Uint64("term", r.currentTerm))
}
// TODO: (A.13) - if votes received from majority of servers: become leader
// Log: r.logger.Info("election won", zap.Int("grantedVote", (*grantedVotes)), zap.Uint64("term", r.currentTerm))
// Hint: use `toLeader` to convert to leader
}

// leader related
Expand Down Expand Up @@ -384,96 +351,60 @@ func (r *Raft) broadcastAppendEntries(ctx context.Context, appendEntriesResultCh
peerId := peerId
peer := peer

prevLog := r.getLog(r.nextIndex[peerId] - 1)
entries := r.getLogs(r.nextIndex[peerId])

req := &pb.AppendEntriesRequest{
Term: r.currentTerm,
LeaderId: r.id,
LeaderCommitId: r.commitIndex,
Entries: entries,
// TODO: (A.14) - send initial empty AppendEntries RPCs (heartbeat) to each server; repeat during idle periods to prevent election timeouts
// Hint: set `req` with the correct fields (entries, prevLogId, prevLogTerm can be ignored for heartbeat)
// TODO: (B.6) - send AppendEntries RPC with log entries starting at nextIndex
// Hint: set `req` with the correct fields (entries, prevLogId and prevLogTerm MUST be set)
// Hint: use `getLog` to get specific log, `getLogs` to get all logs after and include the specific log Id
// Log: r.logger.Debug("send append entries", zap.Uint32("peer", peerId), zap.Any("request", req), zap.Int("entries", len(entries)))
req := &pb.AppendEntriesRequest{}

// TODO: (A.14) & (B.6)
// Hint: modify the code to send `AppendEntries` RPCs in parallel
resp, err := peer.AppendEntries(ctx, req)
if err != nil {
r.logger.Error("fail to send AppendEntries RPC", zap.Error(err), zap.Uint32("peer", peerId))
// connection issue, should not be handled
return
}

if prevLog != nil {
req.PrevLogId = prevLog.GetId()
req.PrevLogTerm = prevLog.GetTerm()
appendEntriesResultCh <- &appendEntriesResult{
AppendEntriesResponse: resp,
req: req,
peerId: peerId,
}

// r.logger.Debug("send append entries", zap.Uint32("peer", peerId), zap.Any("request", req), zap.Int("entries", len(entries)))

go func() {
resp, err := peer.AppendEntries(ctx, req)
if err != nil {
r.logger.Error("fail to send AppendEntries RPC", zap.Error(err), zap.Uint32("peer", peerId))
// connection issue, should not be handled
return
}

appendEntriesResultCh <- &appendEntriesResult{
AppendEntriesResponse: resp,
req: req,
peerId: peerId,
}
}()
}
}

func (r *Raft) handleAppendEntriesResult(result *appendEntriesResult) {
peerId := result.peerId
logger := r.logger.With(zap.Uint32("peer", peerId))

if result.GetTerm() > r.currentTerm {
r.toFollower(result.GetTerm())
logger.Info("receive new term on AppendEntries response, fallback to follower")

return
}
// TODO: (A.15) - if RPC request or response contains term T > currentTerm: set currentTerm = T, convert to follower
// Hint: use `toFollower` to convert to follower
// Log: r.logger.Info("receive new term on AppendEntries response, fallback to follower", zap.Uint32("peer", result.peerId))

entries := result.req.GetEntries()

if !result.GetSuccess() {
// if failed, decrease `nextIndex` and retry
nextIndex := r.nextIndex[peerId] - 1
matchIndex := r.matchIndex[peerId]
r.setNextAndMatchIndex(peerId, nextIndex, matchIndex)

logger.Info("append entries failed, decrease next index",
zap.Uint64("nextIndex", nextIndex),
zap.Uint64("matchIndex", matchIndex))
// TODO: (B.7) - if AppendEntries fails because of log inconsistency: decrement nextIndex and retry
// Hint: use `setNextAndMatchIndex` to decrement nextIndex
// Log: logger.Info("append entries failed, decrease next index", zap.Uint64("nextIndex", nextIndex), zap.Uint64("matchIndex", matchIndex))
} else if len(entries) != 0 {
// if successful and with log entries, update `matchIndex` and `nextIndex` for the follower
matchIndex := entries[len(entries)-1].GetId()
nextIndex := matchIndex + 1
r.setNextAndMatchIndex(peerId, nextIndex, matchIndex)

logger.Info("append entries successfully, set next index and match index",
zap.Uint64("nextIndex", nextIndex),
zap.Uint64("matchIndex", matchIndex))
// TODO: (B.8) - if successful: update nextIndex and matchIndex for follower
// Hint: use `setNextAndMatchIndex` to update nextIndex and matchIndex
// Log: logger.Info("append entries successfully, set next index and match index", zap.Uint32("peer", result.peerId), zap.Uint64("nextIndex", nextIndex), zap.Uint64("matchIndex", matchIndex))
}

replicasNeeded := (len(r.peers)+1)/2 + 1

logs := r.getLogs(r.commitIndex + 1)
for i := len(logs) - 1; i >= 0; i-- {
log := logs[i]
if log.GetTerm() != r.currentTerm {
continue
}
// TODO: (B.9) if there exiss an N such that N > commitIndex, a majority of matchIndex[i] >= N, and log[N].term == currentTerm: set commitIndex = N
// Hint: find if such N exists
// Hint: if such N exists, use `setCommitIndex` to set commit index
// Hint: if such N exists, use `applyLogs` to apply logs

replicas := 1
for peerId := range r.peers {
if r.matchIndex[peerId] >= log.GetId() {
replicas++
}
}

if replicas >= replicasNeeded {
r.setCommitIndex(log.GetId())
r.logger.Info("found new logs committed, apply new logs", zap.Uint64("commitIndex", r.commitIndex))

go r.applyLogs(r.applyCh)

break
}
}
}

0 comments on commit f37d439

Please sign in to comment.