Skip to content

Commit 400b4b1

Browse files
committedMay 28, 2024·
tracker: rename MsgAppFlowPaused
Epic: none Release note: none
1 parent 3640661 commit 400b4b1

8 files changed

+58
-58
lines changed
 

‎pkg/kv/kvserver/flow_control_replica.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func (rf *replicaFlowControl) getBehindFollowers() map[roachpb.ReplicaID]struct{
8585
// time for it to catch up and then later return those tokens to us.
8686
// This is I3a again; do it as part of #95563.
8787
_ = progress.RecentActive
88-
_ = progress.MsgAppFlowPaused
88+
_ = progress.MsgAppProbesPaused
8989
_ = progress.Match
9090
})
9191
return behindFollowers

‎pkg/kv/kvserver/flow_control_replica_integration_test.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ import (
5252
// follows: progress=(replid@match:<state>:<active>:<paused>,...).
5353
// <state> is one of {probe,replicate,snapshot}, <active> is
5454
// {active,!inactive}, and <paused> is {paused,!paused}. The latter controls
55-
// MsgAppFlowPaused in the raft library, not the CRDB-level follower
55+
// MsgAppProbesPaused in the raft library, not the CRDB-level follower
5656
// pausing.
5757
//
5858
// B. For the raft transport, we can specify the set of replica IDs we're
@@ -169,12 +169,12 @@ func TestFlowControlReplicaIntegration(t *testing.T) {
169169
paused := parts[3] == "paused"
170170

171171
progress[replID] = tracker.Progress{
172-
Match: uint64(index),
173-
State: state,
174-
RecentActive: active,
175-
MsgAppFlowPaused: paused,
176-
Inflights: tracker.NewInflights(1, 0), // avoid NPE
177-
IsLearner: false,
172+
Match: uint64(index),
173+
State: state,
174+
RecentActive: active,
175+
MsgAppProbesPaused: paused,
176+
Inflights: tracker.NewInflights(1, 0), // avoid NPE
177+
IsLearner: false,
178178
}
179179

180180
case "descriptor", "paused", "inactive":

‎pkg/kv/kvserver/split_delay_helper_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -139,10 +139,10 @@ func TestSplitDelayToAvoidSnapshot(t *testing.T) {
139139
st := statusWithState(raft.StateLeader)
140140
st.Progress = map[uint64]tracker.Progress{
141141
2: {
142-
State: state,
143-
RecentActive: true,
144-
MsgAppFlowPaused: true, // Unifies string output below.
145-
Inflights: &tracker.Inflights{},
142+
State: state,
143+
RecentActive: true,
144+
MsgAppProbesPaused: true, // Unifies string output below.
145+
Inflights: &tracker.Inflights{},
146146
},
147147
// Healthy follower just for kicks.
148148
3: {State: tracker.StateReplicate},

‎pkg/raft/raft.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -1494,7 +1494,7 @@ func stepLeader(r *raft, m pb.Message) error {
14941494
}
14951495
case pb.MsgHeartbeatResp:
14961496
pr.RecentActive = true
1497-
pr.MsgAppFlowPaused = false
1497+
pr.MsgAppProbesPaused = false
14981498
r.maybeSendAppend(m.From)
14991499

15001500
case pb.MsgSnapStatus:
@@ -1514,7 +1514,7 @@ func stepLeader(r *raft, m pb.Message) error {
15141514
// If snapshot finish, wait for the MsgAppResp from the remote node before sending
15151515
// out the next MsgApp.
15161516
// If snapshot failure, wait for a heartbeat interval before next try
1517-
pr.MsgAppFlowPaused = true
1517+
pr.MsgAppProbesPaused = true
15181518
case pb.MsgUnreachable:
15191519
// During optimistic replication, if the remote becomes unreachable,
15201520
// there is huge probability that a MsgApp is lost.
@@ -1551,7 +1551,7 @@ func stepLeader(r *raft, m pb.Message) error {
15511551
r.sendTimeoutNow(leadTransferee)
15521552
r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee)
15531553
} else {
1554-
pr.MsgAppFlowPaused = false
1554+
pr.MsgAppProbesPaused = false
15551555
r.maybeSendAppend(leadTransferee)
15561556
}
15571557
}

‎pkg/raft/raft_snap_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,8 @@ func TestSnapshotFailure(t *testing.T) {
8686
if sm.trk.Progress[2].Next != 1 {
8787
t.Fatalf("Next = %d, want 1", sm.trk.Progress[2].Next)
8888
}
89-
if !sm.trk.Progress[2].MsgAppFlowPaused {
90-
t.Errorf("MsgAppFlowPaused = %v, want true", sm.trk.Progress[2].MsgAppFlowPaused)
89+
if !sm.trk.Progress[2].MsgAppProbesPaused {
90+
t.Errorf("MsgAppProbesPaused = %v, want true", sm.trk.Progress[2].MsgAppProbesPaused)
9191
}
9292
}
9393

@@ -109,8 +109,8 @@ func TestSnapshotSucceed(t *testing.T) {
109109
if sm.trk.Progress[2].Next != 12 {
110110
t.Fatalf("Next = %d, want 12", sm.trk.Progress[2].Next)
111111
}
112-
if !sm.trk.Progress[2].MsgAppFlowPaused {
113-
t.Errorf("MsgAppFlowPaused = %v, want true", sm.trk.Progress[2].MsgAppFlowPaused)
112+
if !sm.trk.Progress[2].MsgAppProbesPaused {
113+
t.Errorf("MsgAppProbesPaused = %v, want true", sm.trk.Progress[2].MsgAppProbesPaused)
114114
}
115115
}
116116

‎pkg/raft/raft_test.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -125,16 +125,16 @@ func TestProgressResumeByHeartbeatResp(t *testing.T) {
125125
r.becomeCandidate()
126126
r.becomeLeader()
127127

128-
r.trk.Progress[2].MsgAppFlowPaused = true
128+
r.trk.Progress[2].MsgAppProbesPaused = true
129129

130130
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
131-
assert.True(t, r.trk.Progress[2].MsgAppFlowPaused)
131+
assert.True(t, r.trk.Progress[2].MsgAppProbesPaused)
132132

133133
r.trk.Progress[2].BecomeReplicate()
134-
assert.False(t, r.trk.Progress[2].MsgAppFlowPaused)
135-
r.trk.Progress[2].MsgAppFlowPaused = true
134+
assert.False(t, r.trk.Progress[2].MsgAppProbesPaused)
135+
r.trk.Progress[2].MsgAppProbesPaused = true
136136
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
137-
assert.True(t, r.trk.Progress[2].MsgAppFlowPaused)
137+
assert.True(t, r.trk.Progress[2].MsgAppProbesPaused)
138138
}
139139

140140
func TestProgressPaused(t *testing.T) {
@@ -2076,7 +2076,7 @@ func TestSendAppendForProgressProbe(t *testing.T) {
20762076
assert.Zero(t, msg[0].Index)
20772077
}
20782078

2079-
assert.True(t, r.trk.Progress[2].MsgAppFlowPaused)
2079+
assert.True(t, r.trk.Progress[2].MsgAppProbesPaused)
20802080
for j := 0; j < 10; j++ {
20812081
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
20822082
r.maybeSendAppend(2)
@@ -2087,7 +2087,7 @@ func TestSendAppendForProgressProbe(t *testing.T) {
20872087
for j := 0; j < r.heartbeatTimeout; j++ {
20882088
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
20892089
}
2090-
assert.True(t, r.trk.Progress[2].MsgAppFlowPaused)
2090+
assert.True(t, r.trk.Progress[2].MsgAppProbesPaused)
20912091

20922092
// consume the heartbeat
20932093
msg := r.readMessages()
@@ -2100,7 +2100,7 @@ func TestSendAppendForProgressProbe(t *testing.T) {
21002100
msg := r.readMessages()
21012101
assert.Len(t, msg, 1)
21022102
assert.Zero(t, msg[0].Index)
2103-
assert.True(t, r.trk.Progress[2].MsgAppFlowPaused)
2103+
assert.True(t, r.trk.Progress[2].MsgAppProbesPaused)
21042104
}
21052105

21062106
func TestSendAppendForProgressReplicate(t *testing.T) {

‎pkg/raft/tracker/progress.go

+14-14
Original file line numberDiff line numberDiff line change
@@ -96,13 +96,13 @@ type Progress struct {
9696
// This is always true on the leader.
9797
RecentActive bool
9898

99-
// MsgAppFlowPaused is used when the MsgApp flow to a node is throttled. This
99+
// MsgAppProbesPaused is used when the MsgApp flow to a node is throttled. This
100100
// happens in StateProbe, or StateReplicate with saturated Inflights. In both
101101
// cases, we need to continue sending MsgApp once in a while to guarantee
102-
// progress, but we only do so when MsgAppFlowPaused is false (it is reset on
102+
// progress, but we only do so when MsgAppProbesPaused is false (it is reset on
103103
// receiving a heartbeat response), to not overflow the receiver. See
104-
// IsPaused().
105-
MsgAppFlowPaused bool
104+
// IsPaused() and ShouldSendMsgApp().
105+
MsgAppProbesPaused bool
106106

107107
// Inflights is a sliding window for the inflight messages.
108108
// Each inflight message contains one or more log entries.
@@ -122,10 +122,10 @@ type Progress struct {
122122
IsLearner bool
123123
}
124124

125-
// ResetState moves the Progress into the specified State, resetting MsgAppFlowPaused,
125+
// ResetState moves the Progress into the specified State, resetting MsgAppProbesPaused,
126126
// PendingSnapshot, and Inflights.
127127
func (pr *Progress) ResetState(state StateType) {
128-
pr.MsgAppFlowPaused = false
128+
pr.MsgAppProbesPaused = false
129129
pr.PendingSnapshot = 0
130130
pr.State = state
131131
pr.Inflights.reset()
@@ -173,7 +173,7 @@ func (pr *Progress) SentEntries(entries int, bytes uint64) {
173173
pr.Next += uint64(entries)
174174
pr.Inflights.Add(pr.Next-1, bytes)
175175
}
176-
pr.MsgAppFlowPaused = true
176+
pr.MsgAppProbesPaused = true
177177
}
178178

179179
// CanSendEntries returns true if the flow control state allows sending at least
@@ -248,7 +248,7 @@ func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool {
248248
pr.Next = max(min(rejected, matchHint+1), pr.Match+1)
249249
// Regress the sentCommit since it unlikely has been applied.
250250
pr.sentCommit = min(pr.sentCommit, pr.Next-1)
251-
pr.MsgAppFlowPaused = false
251+
pr.MsgAppProbesPaused = false
252252
return true
253253
}
254254

@@ -264,9 +264,9 @@ func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool {
264264
func (pr *Progress) IsPaused() bool {
265265
switch pr.State {
266266
case StateProbe:
267-
return pr.MsgAppFlowPaused
267+
return pr.MsgAppProbesPaused
268268
case StateReplicate:
269-
return pr.MsgAppFlowPaused && pr.Inflights.Full()
269+
return pr.MsgAppProbesPaused && pr.Inflights.Full()
270270
case StateSnapshot:
271271
return true
272272
default:
@@ -296,7 +296,7 @@ func (pr *Progress) IsPaused() bool {
296296
func (pr *Progress) ShouldSendMsgApp(last, commit uint64) bool {
297297
switch pr.State {
298298
case StateProbe:
299-
return !pr.MsgAppFlowPaused
299+
return !pr.MsgAppProbesPaused
300300

301301
case StateReplicate:
302302
// If the in-flight limits are not saturated, and there are pending entries
@@ -309,9 +309,9 @@ func (pr *Progress) ShouldSendMsgApp(last, commit uint64) bool {
309309
// follower eventually will reply with an ack or reject.
310310
//
311311
// If the follower's log is outdated, and we haven't recently sent a MsgApp
312-
// (according to the MsgAppFlowPaused flag), send one now. This is going to
313-
// be an empty "probe" MsgApp.
314-
if pr.Match < last && !pr.MsgAppFlowPaused {
312+
// (according to the MsgAppProbesPaused flag), send one now. This is going
313+
// to be an empty "probe" MsgApp.
314+
if pr.Match < last && !pr.MsgAppProbesPaused {
315315
return true
316316
}
317317
// Send an empty MsgApp containing the latest commit index if:

‎pkg/raft/tracker/progress_test.go

+17-17
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,14 @@ func TestProgressString(t *testing.T) {
2727
ins := NewInflights(1, 0)
2828
ins.Add(123, 1)
2929
pr := &Progress{
30-
Match: 1,
31-
Next: 2,
32-
State: StateSnapshot,
33-
PendingSnapshot: 123,
34-
RecentActive: false,
35-
MsgAppFlowPaused: true,
36-
IsLearner: true,
37-
Inflights: ins,
30+
Match: 1,
31+
Next: 2,
32+
State: StateSnapshot,
33+
PendingSnapshot: 123,
34+
RecentActive: false,
35+
MsgAppProbesPaused: true,
36+
IsLearner: true,
37+
Inflights: ins,
3838
}
3939
const exp = `StateSnapshot match=1 next=2 learner paused pendingSnap=123 inactive inflight=1[full]`
4040
assert.Equal(t, exp, pr.String())
@@ -56,29 +56,29 @@ func TestProgressIsPaused(t *testing.T) {
5656
}
5757
for i, tt := range tests {
5858
p := &Progress{
59-
State: tt.state,
60-
MsgAppFlowPaused: tt.paused,
61-
Inflights: NewInflights(256, 0),
59+
State: tt.state,
60+
MsgAppProbesPaused: tt.paused,
61+
Inflights: NewInflights(256, 0),
6262
}
6363
assert.Equal(t, tt.w, p.IsPaused(), i)
6464
}
6565
}
6666

67-
// TestProgressResume ensures that MaybeDecrTo resets MsgAppFlowPaused, and
67+
// TestProgressResume ensures that MaybeDecrTo resets MsgAppProbesPaused, and
6868
// MaybeUpdate does not.
6969
//
7070
// TODO(pav-kv): there is little sense in testing these micro-behaviours in the
7171
// struct. We should test the visible behaviour instead.
7272
func TestProgressResume(t *testing.T) {
7373
p := &Progress{
74-
Next: 2,
75-
MsgAppFlowPaused: true,
74+
Next: 2,
75+
MsgAppProbesPaused: true,
7676
}
7777
p.MaybeDecrTo(1, 1)
78-
assert.False(t, p.MsgAppFlowPaused)
79-
p.MsgAppFlowPaused = true
78+
assert.False(t, p.MsgAppProbesPaused)
79+
p.MsgAppProbesPaused = true
8080
p.MaybeUpdate(2)
81-
assert.True(t, p.MsgAppFlowPaused)
81+
assert.True(t, p.MsgAppProbesPaused)
8282
}
8383

8484
func TestProgressBecomeProbe(t *testing.T) {

0 commit comments

Comments
 (0)
Please sign in to comment.