Skip to content

Commit 3640661

Browse files
committed
raft: consolidate all append message sending
This PR consolidates all decision-making about sending append messages into a single maybeSendAppend method. Previously, the behaviour depended on the sendIfEmpty flag which was set/unset depending on the context in which the method is called. This is unnecessary because the Progress struct contains enough information about the leader->follower flow state, so maybeSendAppend can be made stand-alone. In follow-up PRs, the consolidated maybeSendAppend method will be used to implement a more flexible message flow control. Epic: CRDB-37515 Release note: none
1 parent 46b6fab commit 3640661

8 files changed

+148
-115
lines changed

pkg/raft/doc.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ stale log entries:
314314
rafthttp package.
315315
316316
'MsgApp' contains log entries to replicate. A leader calls bcastAppend,
317-
which calls sendAppend, which sends soon-to-be-replicated logs in 'MsgApp'
317+
which calls maybeSendAppend, which sends soon-to-be-replicated logs in 'MsgApp'
318318
type. When 'MsgApp' is passed to candidate's Step method, candidate reverts
319319
back to follower, because it indicates that there is a valid leader sending
320320
'MsgApp' messages. Candidate and follower respond to this message in
@@ -352,8 +352,8 @@ stale log entries:
352352
353353
'MsgSnap' requests to install a snapshot message. When a node has just
354354
become a leader or the leader receives 'MsgProp' message, it calls
355-
'bcastAppend' method, which then calls 'sendAppend' method to each
356-
follower. In 'sendAppend', if a leader fails to get term or entries,
355+
'bcastAppend' method, which then calls 'maybeSendAppend' method to each
356+
follower. In 'maybeSendAppend', if a leader fails to get term or entries,
357357
the leader requests snapshot by sending 'MsgSnap' type message.
358358
359359
'MsgSnapStatus' tells the result of snapshot install message. When a
@@ -375,7 +375,7 @@ stale log entries:
375375
'MsgHeartbeatResp' is a response to 'MsgHeartbeat'. When 'MsgHeartbeatResp'
376376
is passed to leader's Step method, the leader knows which follower
377377
responded. And only when the leader's last committed index is greater than
378-
follower's Match index, the leader runs 'sendAppend` method.
378+
follower's Match index, the leader runs 'maybeSendAppend` method.
379379
380380
'MsgUnreachable' tells that request(message) wasn't delivered. When
381381
'MsgUnreachable' is passed to leader's Step method, the leader discovers

pkg/raft/raft.go

+45-85
Original file line numberDiff line numberDiff line change
@@ -579,24 +579,24 @@ func (r *raft) send(m pb.Message) {
579579
}
580580
}
581581

582-
// sendAppend sends an append RPC with new entries (if any) and the
583-
// current commit index to the given peer.
584-
func (r *raft) sendAppend(to uint64) {
585-
r.maybeSendAppend(to, true)
586-
}
587-
588-
// maybeSendAppend sends an append RPC with new entries to the given peer,
589-
// if necessary. Returns true if a message was sent. The sendIfEmpty
590-
// argument controls whether messages with no entries will be sent
591-
// ("empty" messages are useful to convey updated Commit indexes, but
592-
// are undesirable when we're sending multiple messages in a batch).
582+
// maybeSendAppend sends an append RPC with log entries (if any) that are not
583+
// yet known to be replicated in the given peer's log, as well as the current
584+
// commit index. Usually it sends a MsgApp message, but in some cases (e.g. the
585+
// log has been compacted) it can send a MsgSnap.
586+
//
587+
// In some cases, the MsgApp message can have zero entries, and yet be sent.
588+
// When the follower log is not fully up-to-date, we must send a MsgApp
589+
// periodically so that eventually the flow is either accepted or rejected. Not
590+
// doing so can result in replication stall, in cases when a MsgApp is dropped.
593591
//
594-
// TODO(pav-kv): make invocation of maybeSendAppend stateless. The Progress
595-
// struct contains all the state necessary for deciding whether to send a
596-
// message.
597-
func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
592+
// Returns true if a message was sent, or false otherwise. A message is not sent
593+
// if the follower log and commit index are up-to-date, the flow is paused (for
594+
// reasons like in-flight limits), or the message could not be constructed.
595+
func (r *raft) maybeSendAppend(to uint64) bool {
598596
pr := r.trk.Progress[to]
599-
if pr.IsPaused() {
597+
598+
last, commit := r.raftLog.lastIndex(), r.raftLog.committed
599+
if !pr.ShouldSendMsgApp(last, commit) {
600600
return false
601601
}
602602

@@ -608,36 +608,26 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
608608
return r.maybeSendSnapshot(to, pr)
609609
}
610610

611-
var ents []pb.Entry
612-
// In a throttled StateReplicate only send empty MsgApp, to ensure progress.
613-
// Otherwise, if we had a full Inflights and all inflight messages were in
614-
// fact dropped, replication to that follower would stall. Instead, an empty
615-
// MsgApp will eventually reach the follower (heartbeats responses prompt the
616-
// leader to send an append), allowing it to be acked or rejected, both of
617-
// which will clear out Inflights.
618-
if pr.State != tracker.StateReplicate || !pr.Inflights.Full() {
619-
ents, err = r.raftLog.entries(pr.Next, r.maxMsgSize)
620-
}
621-
if len(ents) == 0 && !sendIfEmpty {
622-
return false
623-
}
624-
// TODO(pav-kv): move this check up to where err is returned.
625-
if err != nil { // send a snapshot if we failed to get the entries
626-
return r.maybeSendSnapshot(to, pr)
611+
var entries []pb.Entry
612+
if pr.CanSendEntries(last) {
613+
if entries, err = r.raftLog.entries(pr.Next, r.maxMsgSize); err != nil {
614+
// Send a snapshot if we failed to get the entries.
615+
return r.maybeSendSnapshot(to, pr)
616+
}
627617
}
628618

629-
// Send the actual MsgApp otherwise, and update the progress accordingly.
619+
// Send the MsgApp, and update the progress accordingly.
630620
r.send(pb.Message{
631621
To: to,
632622
Type: pb.MsgApp,
633623
Index: prevIndex,
634624
LogTerm: prevTerm,
635-
Entries: ents,
636-
Commit: r.raftLog.committed,
625+
Entries: entries,
626+
Commit: commit,
637627
Match: pr.Match,
638628
})
639-
pr.SentEntries(len(ents), uint64(payloadsSize(ents)))
640-
pr.SentCommit(r.raftLog.committed)
629+
pr.SentEntries(len(entries), uint64(payloadsSize(entries)))
630+
pr.SentCommit(commit)
641631
return true
642632
}
643633

@@ -696,7 +686,7 @@ func (r *raft) bcastAppend() {
696686
if id == r.id {
697687
return
698688
}
699-
r.sendAppend(id)
689+
r.maybeSendAppend(id)
700690
})
701691
}
702692

@@ -1450,7 +1440,7 @@ func stepLeader(r *raft, m pb.Message) error {
14501440
if pr.State == tracker.StateReplicate {
14511441
pr.BecomeProbe()
14521442
}
1453-
r.sendAppend(m.From)
1443+
r.maybeSendAppend(m.From)
14541444
}
14551445
} else {
14561446
// We want to update our tracking if the response updates our
@@ -1486,21 +1476,13 @@ func stepLeader(r *raft, m pb.Message) error {
14861476

14871477
if r.maybeCommit() {
14881478
r.bcastAppend()
1489-
} else if r.id != m.From && pr.CanBumpCommit(r.raftLog.committed) {
1490-
// This node may be missing the latest commit index, so send it.
1491-
// NB: this is not strictly necessary because the periodic heartbeat
1492-
// messages deliver commit indices too. However, a message sent now
1493-
// may arrive earlier than the next heartbeat fires.
1494-
r.sendAppend(m.From)
14951479
}
1496-
// We've updated flow control information above, which may
1497-
// allow us to send multiple (size-limited) in-flight messages
1498-
// at once (such as when transitioning from probe to
1499-
// replicate, or when freeTo() covers multiple messages). If
1500-
// we have more entries to send, send as many messages as we
1501-
// can (without sending empty messages for the commit index)
1480+
// We've updated flow control information above, which may allow us to
1481+
// send multiple (size-limited) in-flight messages at once (such as when
1482+
// transitioning from probe to replicate, or when freeTo() covers
1483+
// multiple messages). Send as many messages as we can.
15021484
if r.id != m.From {
1503-
for r.maybeSendAppend(m.From, false /* sendIfEmpty */) {
1485+
for r.maybeSendAppend(m.From) {
15041486
}
15051487
}
15061488
// Transfer leadership is in progress.
@@ -1513,23 +1495,7 @@ func stepLeader(r *raft, m pb.Message) error {
15131495
case pb.MsgHeartbeatResp:
15141496
pr.RecentActive = true
15151497
pr.MsgAppFlowPaused = false
1516-
1517-
// NB: if the follower is paused (full Inflights), this will still send an
1518-
// empty append, allowing it to recover from situations in which all the
1519-
// messages that filled up Inflights in the first place were dropped. Note
1520-
// also that the outgoing heartbeat already communicated the commit index.
1521-
//
1522-
// If the follower is fully caught up but also in StateProbe (as can happen
1523-
// if ReportUnreachable was called), we also want to send an append (it will
1524-
// be empty) to allow the follower to transition back to StateReplicate once
1525-
// it responds.
1526-
//
1527-
// Note that StateSnapshot typically satisfies pr.Match < lastIndex, but
1528-
// `pr.Paused()` is always true for StateSnapshot, so sendAppend is a
1529-
// no-op.
1530-
if pr.Match < r.raftLog.lastIndex() || pr.State == tracker.StateProbe {
1531-
r.sendAppend(m.From)
1532-
}
1498+
r.maybeSendAppend(m.From)
15331499

15341500
case pb.MsgSnapStatus:
15351501
if pr.State != tracker.StateSnapshot {
@@ -1585,7 +1551,8 @@ func stepLeader(r *raft, m pb.Message) error {
15851551
r.sendTimeoutNow(leadTransferee)
15861552
r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee)
15871553
} else {
1588-
r.sendAppend(leadTransferee)
1554+
pr.MsgAppFlowPaused = false
1555+
r.maybeSendAppend(leadTransferee)
15891556
}
15901557
}
15911558
return nil
@@ -1957,21 +1924,14 @@ func (r *raft) switchToConfig(cfg tracker.Config, trk tracker.ProgressMap) pb.Co
19571924
return cs
19581925
}
19591926

1960-
if r.maybeCommit() {
1961-
// If the configuration change means that more entries are committed now,
1962-
// broadcast/append to everyone in the updated config.
1963-
r.bcastAppend()
1964-
} else {
1965-
// Otherwise, still probe the newly added replicas; there's no reason to
1966-
// let them wait out a heartbeat interval (or the next incoming
1967-
// proposal).
1968-
r.trk.Visit(func(id uint64, pr *tracker.Progress) {
1969-
if id == r.id {
1970-
return
1971-
}
1972-
r.maybeSendAppend(id, false /* sendIfEmpty */)
1973-
})
1974-
}
1927+
r.maybeCommit()
1928+
// If the configuration change means that more entries are committed now,
1929+
// broadcast/append to everyone in the updated config.
1930+
//
1931+
// Otherwise, still probe the newly added replicas; there's no reason to let
1932+
// them wait out a heartbeat interval (or the next incoming proposal).
1933+
r.bcastAppend()
1934+
19751935
// If the leadTransferee was removed or demoted, abort the leadership transfer.
19761936
if _, tOK := r.trk.Config.Voters.IDs()[r.leadTransferee]; !tOK && r.leadTransferee != 0 {
19771937
r.abortLeaderTransfer()

pkg/raft/raft_test.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ func TestProgressResumeByHeartbeatResp(t *testing.T) {
134134
assert.False(t, r.trk.Progress[2].MsgAppFlowPaused)
135135
r.trk.Progress[2].MsgAppFlowPaused = true
136136
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
137-
assert.False(t, r.trk.Progress[2].MsgAppFlowPaused)
137+
assert.True(t, r.trk.Progress[2].MsgAppFlowPaused)
138138
}
139139

140140
func TestProgressPaused(t *testing.T) {
@@ -2070,7 +2070,7 @@ func TestSendAppendForProgressProbe(t *testing.T) {
20702070
// loop. After that, the follower is paused until a heartbeat response is
20712071
// received.
20722072
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
2073-
r.sendAppend(2)
2073+
r.maybeSendAppend(2)
20742074
msg := r.readMessages()
20752075
assert.Len(t, msg, 1)
20762076
assert.Zero(t, msg[0].Index)
@@ -2079,7 +2079,7 @@ func TestSendAppendForProgressProbe(t *testing.T) {
20792079
assert.True(t, r.trk.Progress[2].MsgAppFlowPaused)
20802080
for j := 0; j < 10; j++ {
20812081
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
2082-
r.sendAppend(2)
2082+
r.maybeSendAppend(2)
20832083
assert.Empty(t, r.readMessages())
20842084
}
20852085

@@ -2112,7 +2112,7 @@ func TestSendAppendForProgressReplicate(t *testing.T) {
21122112

21132113
for i := 0; i < 10; i++ {
21142114
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
2115-
r.sendAppend(2)
2115+
r.maybeSendAppend(2)
21162116
msgs := r.readMessages()
21172117
assert.Len(t, msgs, 1, "#%d", i)
21182118
}
@@ -2127,7 +2127,7 @@ func TestSendAppendForProgressSnapshot(t *testing.T) {
21272127

21282128
for i := 0; i < 10; i++ {
21292129
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
2130-
r.sendAppend(2)
2130+
r.maybeSendAppend(2)
21312131
msgs := r.readMessages()
21322132
assert.Empty(t, msgs, "#%d", i)
21332133
}
@@ -3647,10 +3647,10 @@ func TestLogReplicationWithReorderedMessage(t *testing.T) {
36473647

36483648
// r1 sends 2 MsgApp messages to r2.
36493649
mustAppendEntry(r1, pb.Entry{Data: []byte("somedata")})
3650-
r1.sendAppend(2)
3650+
r1.maybeSendAppend(2)
36513651
req1 := expectOneMessage(t, r1)
36523652
mustAppendEntry(r1, pb.Entry{Data: []byte("somedata")})
3653-
r1.sendAppend(2)
3653+
r1.maybeSendAppend(2)
36543654
req2 := expectOneMessage(t, r1)
36553655

36563656
// r2 receives the second MsgApp first due to reordering.

pkg/raft/testdata/replicate_pause.txt

+3
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,9 @@ deliver-msgs drop=3
7676
dropped: 1->3 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "prop_1_12"]
7777
dropped: 1->3 MsgApp Term:1 Log:1/12 Commit:11 Entries:[1/13 EntryNormal "prop_1_13"]
7878
dropped: 1->3 MsgApp Term:1 Log:1/13 Commit:11 Entries:[1/14 EntryNormal "prop_1_14"]
79+
dropped: 1->3 MsgApp Term:1 Log:1/14 Commit:12
80+
dropped: 1->3 MsgApp Term:1 Log:1/14 Commit:13
81+
dropped: 1->3 MsgApp Term:1 Log:1/14 Commit:14
7982

8083

8184
# Repeat committing 3 entries.

pkg/raft/testdata/slow_follower_after_compaction.txt

+2
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ deliver-msgs drop=3
8888
----
8989
dropped: 1->3 MsgApp Term:1 Log:1/14 Commit:14 Entries:[1/15 EntryNormal "prop_1_15"]
9090
dropped: 1->3 MsgApp Term:1 Log:1/15 Commit:14 Entries:[1/16 EntryNormal "prop_1_16"]
91+
dropped: 1->3 MsgApp Term:1 Log:1/16 Commit:15
92+
dropped: 1->3 MsgApp Term:1 Log:1/16 Commit:16
9193

9294
# Truncate the leader's log beyond node 3 log size.
9395
compact 1 17

pkg/raft/tracker/inflights.go

+5
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
// This code has been modified from its original form by Cockroach Labs, Inc.
2+
// All modifications are Copyright 2024 Cockroach Labs, Inc.
3+
//
14
// Copyright 2019 The etcd Authors
25
//
36
// Licensed under the Apache License, Version 2.0 (the "License");
@@ -32,6 +35,8 @@ type Inflights struct {
3235
count int // number of inflight messages in the buffer
3336
bytes uint64 // number of inflight bytes
3437

38+
// TODO(pav-kv): do not store the limits here, pass them to methods. For flow
39+
// control, we need to support dynamic limits.
3540
size int // the max number of inflight messages
3641
maxBytes uint64 // the max total byte size of inflight messages
3742

0 commit comments

Comments
 (0)