Skip to content

Commit f64d156

Browse files
committed
raft: advance commit index safely
This change makes the commit index advancement in handleHeartbeat safe. Previously, a follower would attempt to update the commit index to whichever was sent in the MsgHeartbeat message. Out-of-bound indices would crash the node. It is always safe to advance a commit index if the follower's log is "in sync" with the leader, i.e. when its log is guaranteed to be a prefix of the leader's log. This is always true if the term of last entry in the log matches the leader team, otherwise this guarantee is established when the first MsgApp append message from the leader succeeds. At the moment, the leader will never send a commit index that exceeds the follower's log size. However, this may change in future. This change is a defence-in-depth. Signed-off-by: Pavel Kalinnikov <[email protected]>
1 parent 026484c commit f64d156

File tree

2 files changed

+31
-6
lines changed

2 files changed

+31
-6
lines changed

raft.go

+18-1
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,12 @@ type raft struct {
375375

376376
// the leader id
377377
lead uint64
378+
// logSynced is true if this node's log is guaranteed to be a prefix of the
379+
// leader's log at this term. Always true for the leader. Always false for a
380+
// candidate. For a follower, this is true if the last entry term matches the
381+
// leader term, otherwise becomes true when the first MsgApp append from the
382+
// leader succeeds.
383+
logSynced bool
378384
// leadTransferee is id of the leader transfer target when its value is not zero.
379385
// Follow the procedure defined in raft thesis 3.10.
380386
leadTransferee uint64
@@ -763,6 +769,7 @@ func (r *raft) reset(term uint64) {
763769
r.Vote = None
764770
}
765771
r.lead = None
772+
r.logSynced = false
766773

767774
r.electionElapsed = 0
768775
r.heartbeatElapsed = 0
@@ -866,6 +873,10 @@ func (r *raft) becomeFollower(term uint64, lead uint64) {
866873
r.reset(term)
867874
r.tick = r.tickElection
868875
r.lead = lead
876+
// If the last entry term matches the leader term, the log is guaranteed to be
877+
// a prefix of the leader's log. Otherwise, we will establish this guarantee
878+
// later, on the first successful MsgApp.
879+
r.logSynced = r.raftLog.lastTerm() == term
869880
r.state = StateFollower
870881
r.logger.Infof("%x became follower at term %d", r.id, r.Term)
871882
}
@@ -908,6 +919,7 @@ func (r *raft) becomeLeader() {
908919
r.reset(r.Term)
909920
r.tick = r.tickHeartbeat
910921
r.lead = r.id
922+
r.logSynced = true // the leader's log is in sync with itself
911923
r.state = StateLeader
912924
// Followers enter replicate mode when they've been successfully probed
913925
// (perhaps after having received a snapshot as a result). The leader is
@@ -1735,6 +1747,7 @@ func (r *raft) handleAppendEntries(m pb.Message) {
17351747
return
17361748
}
17371749
if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
1750+
r.logSynced = true // from now on, the log is a prefix of the leader's log
17381751
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
17391752
return
17401753
}
@@ -1770,7 +1783,11 @@ func (r *raft) handleAppendEntries(m pb.Message) {
17701783
}
17711784

17721785
func (r *raft) handleHeartbeat(m pb.Message) {
1773-
r.raftLog.commitTo(m.Commit)
1786+
// It is only safe to advance the commit index if our log is a prefix of the
1787+
// leader's log. Otherwise, entries at this index may mismatch.
1788+
if r.logSynced {
1789+
r.raftLog.commitTo(min(m.Commit, r.raftLog.lastIndex()))
1790+
}
17741791
r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
17751792
}
17761793

raft_test.go

+13-5
Original file line numberDiff line numberDiff line change
@@ -1332,16 +1332,24 @@ func TestHandleMsgApp(t *testing.T) {
13321332
func TestHandleHeartbeat(t *testing.T) {
13331333
commit := uint64(2)
13341334
tests := []struct {
1335-
m pb.Message
1336-
wCommit uint64
1335+
m pb.Message
1336+
lastTerm uint64
1337+
wCommit uint64
13371338
}{
1338-
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 1}, commit + 1},
1339-
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit - 1}, commit}, // do not decrease commit
1339+
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 1}, 2, commit + 1},
1340+
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit - 1}, 2, commit}, // do not decrease commit
1341+
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit - 1}, 1, commit},
1342+
1343+
// Do not increase the commit index if the log is not guaranteed to be a
1344+
// prefix of the leader's log.
1345+
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 1}, 1, commit},
1346+
// Do not increase the commit index beyond our log size.
1347+
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 10}, 2, commit + 1},
13401348
}
13411349

13421350
for i, tt := range tests {
13431351
storage := newTestMemoryStorage(withPeers(1, 2))
1344-
storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}})
1352+
storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}, {Index: 3, Term: tt.lastTerm}})
13451353
sm := newTestRaft(1, 5, 1, storage)
13461354
sm.becomeFollower(2, 2)
13471355
sm.raftLog.commitTo(commit)

0 commit comments

Comments
 (0)