Skip to content

Commit 87ac09e

Browse files
committed
raft: advance commit index safely
This change makes the commit index advancement in handleHeartbeat safe. Previously, a follower would attempt to update the commit index to whichever was sent in the MsgHeartbeat message. Out-of-bound indices would crash the node. It is always safe to advance a commit index if the follower's log is "in sync" with the leader, i.e. when its log is guaranteed to be a prefix of the leader's log. This is always true if the term of last entry in the log matches the leader team, otherwise this guarantee is established when the first MsgApp append message from the leader succeeds. At the moment, the leader will never send a commit index that exceeds the follower's log size. However, this may change in future. This change is a defence-in-depth. The newly added raftLog.leaderTerm field will be used for other safety checks in the future, for example to establish that overriding a suffix of entries in raftLog is safe. Signed-off-by: Pavel Kalinnikov <[email protected]>
1 parent 026484c commit 87ac09e

File tree

3 files changed

+70
-6
lines changed

3 files changed

+70
-6
lines changed

log.go

+43
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,35 @@ type raftLog struct {
2929
// they will be saved into storage.
3030
unstable unstable
3131

32+
// leaderTerm is a term of the leader with whom our log is "consistent". The
33+
// log is guaranteed to be a prefix of this term's leader log.
34+
//
35+
// The leaderTerm can be safely updated to `t` if:
36+
// 1. the last entry in the log has term `t`, or, more generally,
37+
// 2. the last successful append was sent by the leader `t`.
38+
//
39+
// This is due to the following safety property (see raft paper §5.3):
40+
//
41+
// Log Matching: if two logs contain an entry with the same index and term,
42+
// then the logs are identical in all entries up through the given index.
43+
//
44+
// We use (1) to initialize leaderTerm, and (2) to maintain it on updates.
45+
//
46+
// NB: (2) does not imply (1). If our log is behind the leader's log, the last
47+
// entry term can be below leaderTerm.
48+
//
49+
// NB: leaderTerm does not necessarily match this raft node's term. It only
50+
// does for the leader. For followers and candidates, when we first learn or
51+
// bump to a new term, we don't have a proof that our log is consistent with
52+
// the new term's leader (current or prospective). The new leader may override
53+
// any suffix of the log after the committed index. Only when the first append
54+
// from the new leader succeeds, we can update leaderTerm.
55+
//
56+
// During normal operation, leaderTerm matches the node term though. During a
57+
// leader change, it briefly lags behind, and matches again when the first
58+
// append message succeeds.
59+
leaderTerm uint64
60+
3261
// committed is the highest log position that is known to be in
3362
// stable storage on a quorum of nodes.
3463
committed uint64
@@ -88,6 +117,11 @@ func newLogWithSize(storage Storage, logger Logger, maxApplyingEntsSize entryEnc
88117
if err != nil {
89118
panic(err) // TODO(bdarnell)
90119
}
120+
lastTerm, err := storage.Term(lastIndex)
121+
if err != nil {
122+
panic(err) // TODO(pav-kv)
123+
}
124+
log.leaderTerm = lastTerm
91125
log.unstable.offset = lastIndex + 1
92126
log.unstable.offsetInProgress = lastIndex + 1
93127
log.unstable.logger = logger
@@ -106,6 +140,15 @@ func (l *raftLog) String() string {
106140

107141
// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise,
108142
// it returns (last index of new entries, true).
143+
//
144+
// TODO(pav-kv): pass in the term of the leader who sent this update. It is only
145+
// safe to handle this append if this term is >= l.leaderTerm. It is only safe
146+
// to override an uncommitted suffix of entries if term > l.leaderTerm.
147+
//
148+
// TODO(pav-kv): introduce a struct that consolidates the append metadata. The
149+
// (prevEntryIndex, prevEntryTerm, leaderTerm) tuple must always be carried
150+
// together, and safety of this append must be checked at the lowest layer here,
151+
// rather than up in raft.go.
109152
func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {
110153
if !l.matchTerm(index, logTerm) {
111154
return 0, false

raft.go

+14-1
Original file line numberDiff line numberDiff line change
@@ -935,6 +935,8 @@ func (r *raft) becomeLeader() {
935935
// so the preceding log append does not count against the uncommitted log
936936
// quota of the new leader. In other words, after the call to appendEntry,
937937
// r.uncommittedSize is still 0.
938+
939+
r.raftLog.leaderTerm = r.Term // the leader's log is consistent with itself
938940
r.logger.Infof("%x became leader at term %d", r.id, r.Term)
939941
}
940942

@@ -1735,6 +1737,7 @@ func (r *raft) handleAppendEntries(m pb.Message) {
17351737
return
17361738
}
17371739
if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
1740+
r.raftLog.leaderTerm = m.Term // the log is now consistent with the leader
17381741
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
17391742
return
17401743
}
@@ -1770,7 +1773,16 @@ func (r *raft) handleAppendEntries(m pb.Message) {
17701773
}
17711774

17721775
func (r *raft) handleHeartbeat(m pb.Message) {
1773-
r.raftLog.commitTo(m.Commit)
1776+
// It is only safe to advance the commit index if our log is a prefix of the
1777+
// leader's log. Otherwise, entries at this index may mismatch.
1778+
//
1779+
// TODO(pav-kv): move this logic to r.raftLog, which is more appropriate for
1780+
// handling safety. The raftLog can use leaderTerm for other safety checks.
1781+
// For example, unstable.truncateAndAppend currently may override a suffix of
1782+
// the log unconditionally, but it can only be done if m.Term > leaderTerm.
1783+
if m.Term == r.raftLog.leaderTerm {
1784+
r.raftLog.commitTo(min(m.Commit, r.raftLog.lastIndex()))
1785+
}
17741786
r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
17751787
}
17761788

@@ -1785,6 +1797,7 @@ func (r *raft) handleSnapshot(m pb.Message) {
17851797
if r.restore(s) {
17861798
r.logger.Infof("%x [commit: %d] restored snapshot [index: %d, term: %d]",
17871799
r.id, r.raftLog.committed, sindex, sterm)
1800+
r.raftLog.leaderTerm = m.Term // the log is now consistent with the leader
17881801
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
17891802
} else {
17901803
r.logger.Infof("%x [commit: %d] ignored snapshot [index: %d, term: %d]",

raft_test.go

+13-5
Original file line numberDiff line numberDiff line change
@@ -1332,16 +1332,24 @@ func TestHandleMsgApp(t *testing.T) {
13321332
func TestHandleHeartbeat(t *testing.T) {
13331333
commit := uint64(2)
13341334
tests := []struct {
1335-
m pb.Message
1336-
wCommit uint64
1335+
m pb.Message
1336+
lastTerm uint64
1337+
wCommit uint64
13371338
}{
1338-
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 1}, commit + 1},
1339-
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit - 1}, commit}, // do not decrease commit
1339+
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 1}, 2, commit + 1},
1340+
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit - 1}, 2, commit}, // do not decrease commit
1341+
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit - 1}, 1, commit},
1342+
1343+
// Do not increase the commit index if the log is not guaranteed to be a
1344+
// prefix of the leader's log.
1345+
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 1}, 1, commit},
1346+
// Do not increase the commit index beyond our log size.
1347+
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 10}, 2, commit + 1},
13401348
}
13411349

13421350
for i, tt := range tests {
13431351
storage := newTestMemoryStorage(withPeers(1, 2))
1344-
storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}})
1352+
storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}, {Index: 3, Term: tt.lastTerm}})
13451353
sm := newTestRaft(1, 5, 1, storage)
13461354
sm.becomeFollower(2, 2)
13471355
sm.raftLog.commitTo(commit)

0 commit comments

Comments
 (0)