Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 95e61b3

Browse files
committedMay 12, 2024·
raft: consolidate all append message sending
This PR consolidates all decision-making about sending append messages into a single maybeSendAppend method. Previously, the behaviour depended on the sendIfEmpty flag which was set/unset depending on the context in which the method is called. This is unnecessary because the Progress struct contains enough information about the leader->follower flow state, so maybeSendAppend can be made stand-alone. In follow-up PRs, the consolidated maybeSendAppend method will be used to implement a more flexible message flow control. Epic: CRDB-37515 Release note: none
1 parent 59b261a commit 95e61b3

8 files changed

+128
-118
lines changed
 

‎pkg/raft/doc.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ stale log entries:
314314
rafthttp package.
315315
316316
'MsgApp' contains log entries to replicate. A leader calls bcastAppend,
317-
which calls sendAppend, which sends soon-to-be-replicated logs in 'MsgApp'
317+
which calls maybeSendAppend, which sends soon-to-be-replicated logs in 'MsgApp'
318318
type. When 'MsgApp' is passed to candidate's Step method, candidate reverts
319319
back to follower, because it indicates that there is a valid leader sending
320320
'MsgApp' messages. Candidate and follower respond to this message in
@@ -352,8 +352,8 @@ stale log entries:
352352
353353
'MsgSnap' requests to install a snapshot message. When a node has just
354354
become a leader or the leader receives 'MsgProp' message, it calls
355-
'bcastAppend' method, which then calls 'sendAppend' method to each
356-
follower. In 'sendAppend', if a leader fails to get term or entries,
355+
'bcastAppend' method, which then calls 'maybeSendAppend' method to each
356+
follower. In 'maybeSendAppend', if a leader fails to get term or entries,
357357
the leader requests snapshot by sending 'MsgSnap' type message.
358358
359359
'MsgSnapStatus' tells the result of snapshot install message. When a
@@ -375,7 +375,7 @@ stale log entries:
375375
'MsgHeartbeatResp' is a response to 'MsgHeartbeat'. When 'MsgHeartbeatResp'
376376
is passed to leader's Step method, the leader knows which follower
377377
responded. And only when the leader's last committed index is greater than
378-
follower's Match index, the leader runs 'sendAppend` method.
378+
follower's Match index, the leader runs 'maybeSendAppend` method.
379379
380380
'MsgUnreachable' tells that request(message) wasn't delivered. When
381381
'MsgUnreachable' is passed to leader's Step method, the leader discovers

‎pkg/raft/raft.go

+46-86
Original file line numberDiff line numberDiff line change
@@ -547,24 +547,24 @@ func (r *raft) send(m pb.Message) {
547547
}
548548
}
549549

550-
// sendAppend sends an append RPC with new entries (if any) and the
551-
// current commit index to the given peer.
552-
func (r *raft) sendAppend(to uint64) {
553-
r.maybeSendAppend(to, true)
554-
}
555-
556-
// maybeSendAppend sends an append RPC with new entries to the given peer,
557-
// if necessary. Returns true if a message was sent. The sendIfEmpty
558-
// argument controls whether messages with no entries will be sent
559-
// ("empty" messages are useful to convey updated Commit indexes, but
560-
// are undesirable when we're sending multiple messages in a batch).
550+
// maybeSendAppend sends an append RPC with log entries (if any) that are not
551+
// yet known to be replicated in the given peer's log, as well as the current
552+
// commit index. Usually it sends a MsgApp message, but in some cases (e.g. the
553+
// log has been compacted) it can send a MsgSnap.
554+
//
555+
// In some cases, the MsgApp message can have zero entries, and yet being sent.
556+
// When the follower log is not fully up-to-date, we must send a MsgApp
557+
// periodically so that eventually the flow is either accepted or rejected. Not
558+
// doing so can result in replication stall, in cases when a MsgApp is dropped.
561559
//
562-
// TODO(pav-kv): make invocation of maybeSendAppend stateless. The Progress
563-
// struct contains all the state necessary for deciding whether to send a
564-
// message.
565-
func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
560+
// Returns true if a message was sent, or false otherwise. A message is not sent
561+
// if the follower log and commit index are up-to-date, the flow is paused (for
562+
// reasons like in-flight limits), or the message could not be constructed.
563+
func (r *raft) maybeSendAppend(to uint64) bool {
566564
pr := r.trk.Progress[to]
567-
if pr.IsPaused() {
565+
566+
last, commit := r.raftLog.lastIndex(), r.raftLog.committed
567+
if !pr.ShouldSendMsgApp(last, commit) {
568568
return false
569569
}
570570

@@ -576,35 +576,25 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
576576
return r.maybeSendSnapshot(to, pr)
577577
}
578578

579-
var ents []pb.Entry
580-
// In a throttled StateReplicate only send empty MsgApp, to ensure progress.
581-
// Otherwise, if we had a full Inflights and all inflight messages were in
582-
// fact dropped, replication to that follower would stall. Instead, an empty
583-
// MsgApp will eventually reach the follower (heartbeats responses prompt the
584-
// leader to send an append), allowing it to be acked or rejected, both of
585-
// which will clear out Inflights.
586-
if pr.State != tracker.StateReplicate || !pr.Inflights.Full() {
587-
ents, err = r.raftLog.entries(pr.Next, r.maxMsgSize)
588-
}
589-
if len(ents) == 0 && !sendIfEmpty {
590-
return false
591-
}
592-
// TODO(pav-kv): move this check up to where err is returned.
593-
if err != nil { // send a snapshot if we failed to get the entries
594-
return r.maybeSendSnapshot(to, pr)
579+
var entries []pb.Entry
580+
if pr.CanSendEntries(last) {
581+
if entries, err = r.raftLog.entries(pr.Next, r.maxMsgSize); err != nil {
582+
// Send a snapshot if we failed to get the entries.
583+
return r.maybeSendSnapshot(to, pr)
584+
}
595585
}
596586

597-
// Send the actual MsgApp otherwise, and update the progress accordingly.
587+
// Send the MsgApp, and update the progress accordingly.
598588
r.send(pb.Message{
599589
To: to,
600590
Type: pb.MsgApp,
601591
Index: prevIndex,
602592
LogTerm: prevTerm,
603-
Entries: ents,
604-
Commit: r.raftLog.committed,
593+
Entries: entries,
594+
Commit: commit,
605595
})
606-
pr.SentEntries(len(ents), uint64(payloadsSize(ents)))
607-
pr.SentCommit(r.raftLog.committed)
596+
pr.SentEntries(len(entries), uint64(payloadsSize(entries)))
597+
pr.SentCommit(commit)
608598
return true
609599
}
610600

@@ -662,7 +652,7 @@ func (r *raft) bcastAppend() {
662652
if id == r.id {
663653
return
664654
}
665-
r.sendAppend(id)
655+
r.maybeSendAppend(id)
666656
})
667657
}
668658

@@ -1414,7 +1404,7 @@ func stepLeader(r *raft, m pb.Message) error {
14141404
if pr.State == tracker.StateReplicate {
14151405
pr.BecomeProbe()
14161406
}
1417-
r.sendAppend(m.From)
1407+
r.maybeSendAppend(m.From)
14181408
}
14191409
} else {
14201410
// We want to update our tracking if the response updates our
@@ -1450,21 +1440,13 @@ func stepLeader(r *raft, m pb.Message) error {
14501440

14511441
if r.maybeCommit() {
14521442
r.bcastAppend()
1453-
} else if r.id != m.From && pr.CanBumpCommit(r.raftLog.committed) {
1454-
// This node may be missing the latest commit index, so send it.
1455-
// NB: this is not strictly necessary because the periodic heartbeat
1456-
// messages deliver commit indices too. However, a message sent now
1457-
// may arrive earlier than the next heartbeat fires.
1458-
r.sendAppend(m.From)
14591443
}
1460-
// We've updated flow control information above, which may
1461-
// allow us to send multiple (size-limited) in-flight messages
1462-
// at once (such as when transitioning from probe to
1463-
// replicate, or when freeTo() covers multiple messages). If
1464-
// we have more entries to send, send as many messages as we
1465-
// can (without sending empty messages for the commit index)
1444+
// We've updated flow control information above, which may allow us to
1445+
// send multiple (size-limited) in-flight messages at once (such as when
1446+
// transitioning from probe to replicate, or when freeTo() covers
1447+
// multiple messages). Send as many messages as we can.
14661448
if r.id != m.From {
1467-
for r.maybeSendAppend(m.From, false /* sendIfEmpty */) {
1449+
for r.maybeSendAppend(m.From) {
14681450
}
14691451
}
14701452
// Transfer leadership is in progress.
@@ -1476,24 +1458,8 @@ func stepLeader(r *raft, m pb.Message) error {
14761458
}
14771459
case pb.MsgHeartbeatResp:
14781460
pr.RecentActive = true
1479-
pr.MsgAppFlowPaused = false
1480-
1481-
// NB: if the follower is paused (full Inflights), this will still send an
1482-
// empty append, allowing it to recover from situations in which all the
1483-
// messages that filled up Inflights in the first place were dropped. Note
1484-
// also that the outgoing heartbeat already communicated the commit index.
1485-
//
1486-
// If the follower is fully caught up but also in StateProbe (as can happen
1487-
// if ReportUnreachable was called), we also want to send an append (it will
1488-
// be empty) to allow the follower to transition back to StateReplicate once
1489-
// it responds.
1490-
//
1491-
// Note that StateSnapshot typically satisfies pr.Match < lastIndex, but
1492-
// `pr.Paused()` is always true for StateSnapshot, so sendAppend is a
1493-
// no-op.
1494-
if pr.Match < r.raftLog.lastIndex() || pr.State == tracker.StateProbe {
1495-
r.sendAppend(m.From)
1496-
}
1461+
pr.PauseMsgAppProbes(false)
1462+
r.maybeSendAppend(m.From)
14971463

14981464
case pb.MsgSnapStatus:
14991465
if pr.State != tracker.StateSnapshot {
@@ -1549,7 +1515,8 @@ func stepLeader(r *raft, m pb.Message) error {
15491515
r.sendTimeoutNow(leadTransferee)
15501516
r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee)
15511517
} else {
1552-
r.sendAppend(leadTransferee)
1518+
pr.PauseMsgAppProbes(false)
1519+
r.maybeSendAppend(leadTransferee)
15531520
}
15541521
}
15551522
return nil
@@ -1880,21 +1847,14 @@ func (r *raft) switchToConfig(cfg tracker.Config, trk tracker.ProgressMap) pb.Co
18801847
return cs
18811848
}
18821849

1883-
if r.maybeCommit() {
1884-
// If the configuration change means that more entries are committed now,
1885-
// broadcast/append to everyone in the updated config.
1886-
r.bcastAppend()
1887-
} else {
1888-
// Otherwise, still probe the newly added replicas; there's no reason to
1889-
// let them wait out a heartbeat interval (or the next incoming
1890-
// proposal).
1891-
r.trk.Visit(func(id uint64, pr *tracker.Progress) {
1892-
if id == r.id {
1893-
return
1894-
}
1895-
r.maybeSendAppend(id, false /* sendIfEmpty */)
1896-
})
1897-
}
1850+
r.maybeCommit()
1851+
// If the configuration change means that more entries are committed now,
1852+
// broadcast/append to everyone in the updated config.
1853+
//
1854+
// Otherwise, still probe the newly added replicas; there's no reason to let
1855+
// them wait out a heartbeat interval (or the next incoming proposal).
1856+
r.bcastAppend()
1857+
18981858
// If the leadTransferee was removed or demoted, abort the leadership transfer.
18991859
if _, tOK := r.trk.Config.Voters.IDs()[r.leadTransferee]; !tOK && r.leadTransferee != 0 {
19001860
r.abortLeaderTransfer()

‎pkg/raft/raft_test.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ func TestProgressResumeByHeartbeatResp(t *testing.T) {
134134
assert.False(t, r.trk.Progress[2].MsgAppFlowPaused)
135135
r.trk.Progress[2].MsgAppFlowPaused = true
136136
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
137-
assert.False(t, r.trk.Progress[2].MsgAppFlowPaused)
137+
assert.True(t, r.trk.Progress[2].MsgAppFlowPaused)
138138
}
139139

140140
func TestProgressPaused(t *testing.T) {
@@ -2062,7 +2062,7 @@ func TestSendAppendForProgressProbe(t *testing.T) {
20622062
// loop. After that, the follower is paused until a heartbeat response is
20632063
// received.
20642064
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
2065-
r.sendAppend(2)
2065+
r.maybeSendAppend(2)
20662066
msg := r.readMessages()
20672067
assert.Len(t, msg, 1)
20682068
assert.Zero(t, msg[0].Index)
@@ -2071,7 +2071,7 @@ func TestSendAppendForProgressProbe(t *testing.T) {
20712071
assert.True(t, r.trk.Progress[2].MsgAppFlowPaused)
20722072
for j := 0; j < 10; j++ {
20732073
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
2074-
r.sendAppend(2)
2074+
r.maybeSendAppend(2)
20752075
assert.Empty(t, r.readMessages())
20762076
}
20772077

@@ -2104,7 +2104,7 @@ func TestSendAppendForProgressReplicate(t *testing.T) {
21042104

21052105
for i := 0; i < 10; i++ {
21062106
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
2107-
r.sendAppend(2)
2107+
r.maybeSendAppend(2)
21082108
msgs := r.readMessages()
21092109
assert.Len(t, msgs, 1, "#%d", i)
21102110
}
@@ -2119,7 +2119,7 @@ func TestSendAppendForProgressSnapshot(t *testing.T) {
21192119

21202120
for i := 0; i < 10; i++ {
21212121
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
2122-
r.sendAppend(2)
2122+
r.maybeSendAppend(2)
21232123
msgs := r.readMessages()
21242124
assert.Empty(t, msgs, "#%d", i)
21252125
}
@@ -3639,10 +3639,10 @@ func TestLogReplicationWithReorderedMessage(t *testing.T) {
36393639

36403640
// r1 sends 2 MsgApp messages to r2.
36413641
mustAppendEntry(r1, pb.Entry{Data: []byte("somedata")})
3642-
r1.sendAppend(2)
3642+
r1.maybeSendAppend(2)
36433643
req1 := expectOneMessage(t, r1)
36443644
mustAppendEntry(r1, pb.Entry{Data: []byte("somedata")})
3645-
r1.sendAppend(2)
3645+
r1.maybeSendAppend(2)
36463646
req2 := expectOneMessage(t, r1)
36473647

36483648
// r2 receives the second MsgApp first due to reordering.

‎pkg/raft/testdata/replicate_pause.txt

+3
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,9 @@ deliver-msgs drop=3
7676
dropped: 1->3 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "prop_1_12"]
7777
dropped: 1->3 MsgApp Term:1 Log:1/12 Commit:11 Entries:[1/13 EntryNormal "prop_1_13"]
7878
dropped: 1->3 MsgApp Term:1 Log:1/13 Commit:11 Entries:[1/14 EntryNormal "prop_1_14"]
79+
dropped: 1->3 MsgApp Term:1 Log:1/14 Commit:12
80+
dropped: 1->3 MsgApp Term:1 Log:1/14 Commit:13
81+
dropped: 1->3 MsgApp Term:1 Log:1/14 Commit:14
7982

8083

8184
# Repeat committing 3 entries.

‎pkg/raft/testdata/slow_follower_after_compaction.txt

+2
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ deliver-msgs drop=3
8888
----
8989
dropped: 1->3 MsgApp Term:1 Log:1/14 Commit:14 Entries:[1/15 EntryNormal "prop_1_15"]
9090
dropped: 1->3 MsgApp Term:1 Log:1/15 Commit:14 Entries:[1/16 EntryNormal "prop_1_16"]
91+
dropped: 1->3 MsgApp Term:1 Log:1/16 Commit:15
92+
dropped: 1->3 MsgApp Term:1 Log:1/16 Commit:16
9193

9294
# Truncate the leader's log beyond node 3 log size.
9395
compact 1 17

‎pkg/raft/tracker/inflights.go

+2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ type Inflights struct {
3232
count int // number of inflight messages in the buffer
3333
bytes uint64 // number of inflight bytes
3434

35+
// TODO(pav-kv): do not store the limits here, pass them to methods. For flow
36+
// control, we need to support dynamic limits.
3537
size int // the max number of inflight messages
3638
maxBytes uint64 // the max total byte size of inflight messages
3739

0 commit comments

Comments
 (0)