Skip to content

Commit 91981c3

Browse files
committed
tracker: rename the paused probes flow field
Signed-off-by: Pavel Kalinnikov <[email protected]>
1 parent 1ce78f5 commit 91981c3

File tree

5 files changed

+40
-36
lines changed

5 files changed

+40
-36
lines changed

raft.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1563,7 +1563,7 @@ func stepLeader(r *raft, m pb.Message) error {
15631563
// If snapshot finish, wait for the MsgAppResp from the remote node before sending
15641564
// out the next MsgApp.
15651565
// If snapshot failure, wait for a heartbeat interval before next try
1566-
pr.MsgAppFlowPaused = true
1566+
pr.PauseMsgAppProbes(true)
15671567
case pb.MsgUnreachable:
15681568
// During optimistic replication, if the remote becomes unreachable,
15691569
// there is huge probability that a MsgApp is lost.

raft_snap_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ func TestSnapshotFailure(t *testing.T) {
8383
if sm.trk.Progress[2].Next != 1 {
8484
t.Fatalf("Next = %d, want 1", sm.trk.Progress[2].Next)
8585
}
86-
if !sm.trk.Progress[2].MsgAppFlowPaused {
87-
t.Errorf("MsgAppFlowPaused = %v, want true", sm.trk.Progress[2].MsgAppFlowPaused)
86+
if !sm.trk.Progress[2].MsgAppProbesPaused {
87+
t.Errorf("msgAppProbesPaused = %v, want true", sm.trk.Progress[2].MsgAppProbesPaused)
8888
}
8989
}
9090

@@ -106,8 +106,8 @@ func TestSnapshotSucceed(t *testing.T) {
106106
if sm.trk.Progress[2].Next != 12 {
107107
t.Fatalf("Next = %d, want 12", sm.trk.Progress[2].Next)
108108
}
109-
if !sm.trk.Progress[2].MsgAppFlowPaused {
110-
t.Errorf("MsgAppFlowPaused = %v, want true", sm.trk.Progress[2].MsgAppFlowPaused)
109+
if !sm.trk.Progress[2].MsgAppProbesPaused {
110+
t.Errorf("MsgAppProbesPaused = %v, want true", sm.trk.Progress[2].MsgAppProbesPaused)
111111
}
112112
}
113113

raft_test.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -123,16 +123,16 @@ func TestProgressResumeByHeartbeatResp(t *testing.T) {
123123
r.becomeCandidate()
124124
r.becomeLeader()
125125

126-
r.trk.Progress[2].MsgAppFlowPaused = true
126+
r.trk.Progress[2].PauseMsgAppProbes(true)
127127

128128
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
129-
assert.True(t, r.trk.Progress[2].MsgAppFlowPaused)
129+
assert.True(t, r.trk.Progress[2].MsgAppProbesPaused)
130130

131131
r.trk.Progress[2].BecomeReplicate()
132-
assert.False(t, r.trk.Progress[2].MsgAppFlowPaused)
133-
r.trk.Progress[2].MsgAppFlowPaused = true
132+
assert.False(t, r.trk.Progress[2].MsgAppProbesPaused)
133+
r.trk.Progress[2].PauseMsgAppProbes(true)
134134
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
135-
assert.True(t, r.trk.Progress[2].MsgAppFlowPaused)
135+
assert.True(t, r.trk.Progress[2].MsgAppProbesPaused)
136136
}
137137

138138
func TestProgressPaused(t *testing.T) {
@@ -2305,7 +2305,7 @@ func TestSendAppendForProgressProbe(t *testing.T) {
23052305
assert.Zero(t, msg[0].Index)
23062306
}
23072307

2308-
assert.True(t, r.trk.Progress[2].MsgAppFlowPaused)
2308+
assert.True(t, r.trk.Progress[2].MsgAppProbesPaused)
23092309
for j := 0; j < 10; j++ {
23102310
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
23112311
r.maybeSendAppend(2)
@@ -2316,7 +2316,7 @@ func TestSendAppendForProgressProbe(t *testing.T) {
23162316
for j := 0; j < r.heartbeatTimeout; j++ {
23172317
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
23182318
}
2319-
assert.True(t, r.trk.Progress[2].MsgAppFlowPaused)
2319+
assert.True(t, r.trk.Progress[2].MsgAppProbesPaused)
23202320

23212321
// consume the heartbeat
23222322
msg := r.readMessages()
@@ -2329,7 +2329,7 @@ func TestSendAppendForProgressProbe(t *testing.T) {
23292329
msg := r.readMessages()
23302330
assert.Len(t, msg, 1)
23312331
assert.Zero(t, msg[0].Index)
2332-
assert.True(t, r.trk.Progress[2].MsgAppFlowPaused)
2332+
assert.True(t, r.trk.Progress[2].MsgAppProbesPaused)
23332333
}
23342334

23352335
func TestSendAppendForProgressReplicate(t *testing.T) {

tracker/progress.go

+18-13
Original file line numberDiff line numberDiff line change
@@ -93,13 +93,13 @@ type Progress struct {
9393
// This is always true on the leader.
9494
RecentActive bool
9595

96-
// MsgAppFlowPaused is used when the MsgApp flow to a node is throttled. This
97-
// happens in StateProbe, or StateReplicate with saturated Inflights. In both
98-
// cases, we need to continue sending MsgApp once in a while to guarantee
99-
// progress, but we only do so when MsgAppFlowPaused is false (it is reset on
100-
// receiving a heartbeat response), to not overflow the receiver. See
101-
// IsPaused().
102-
MsgAppFlowPaused bool
96+
// MsgAppProbesPaused set to true prevents sending "probe" MsgApp messages to
97+
// this follower. Used in StateProbe, or StateReplicate when all entries are
98+
// in-flight or the in-flight volume exceeds limits. See ShouldSendMsgApp().
99+
//
100+
// TODO(pav-kv): unexport this field. It is used by a few tests, but should be
101+
// replaced by PauseMsgAppProbes() and ShouldSendMsgApp().
102+
MsgAppProbesPaused bool
103103

104104
// Inflights is a sliding window for the inflight messages.
105105
// Each inflight message contains one or more log entries.
@@ -119,7 +119,7 @@ type Progress struct {
119119
IsLearner bool
120120
}
121121

122-
// ResetState moves the Progress into the specified State, resetting MsgAppFlowPaused,
122+
// ResetState moves the Progress into the specified State, resetting MsgAppProbesPaused,
123123
// PendingSnapshot, and Inflights.
124124
func (pr *Progress) ResetState(state StateType) {
125125
pr.PauseMsgAppProbes(false)
@@ -176,7 +176,7 @@ func (pr *Progress) SentEntries(entries int, bytes uint64) {
176176
// PauseMsgAppProbes pauses or unpauses empty MsgApp messages flow, depending on
177177
// the passed-in bool.
178178
func (pr *Progress) PauseMsgAppProbes(pause bool) {
179-
pr.MsgAppFlowPaused = pause
179+
pr.MsgAppProbesPaused = pause
180180
}
181181

182182
// CanSendEntries returns true if the flow control state allows sending at least
@@ -261,12 +261,17 @@ func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool {
261261
// operation, this is false. A throttled node will be contacted less frequently
262262
// until it has reached a state in which it's able to accept a steady stream of
263263
// log entries again.
264+
//
265+
// TODO(pav-kv): this method is deprecated, remove it. It is still used in tests
266+
// and String(), find a way to avoid this. The problem is that the actual flow
267+
// control state depends on the log size and commit index, which are not part of
268+
// this Progress struct - they are passed-in to methods like ShouldSendMsgApp().
264269
func (pr *Progress) IsPaused() bool {
265270
switch pr.State {
266271
case StateProbe:
267-
return pr.MsgAppFlowPaused
272+
return pr.MsgAppProbesPaused
268273
case StateReplicate:
269-
return pr.MsgAppFlowPaused && pr.Inflights.Full()
274+
return pr.MsgAppProbesPaused && pr.Inflights.Full()
270275
case StateSnapshot:
271276
return true
272277
default:
@@ -296,10 +301,10 @@ func (pr *Progress) IsPaused() bool {
296301
func (pr *Progress) ShouldSendMsgApp(last, commit uint64) bool {
297302
switch pr.State {
298303
case StateProbe:
299-
return !pr.MsgAppFlowPaused
304+
return !pr.MsgAppProbesPaused
300305
case StateReplicate:
301306
return pr.CanBumpCommit(commit) ||
302-
pr.Match < last && (!pr.MsgAppFlowPaused || pr.CanSendEntries(last))
307+
pr.Match < last && (!pr.MsgAppProbesPaused || pr.CanSendEntries(last))
303308
case StateSnapshot:
304309
return false
305310
default:

tracker/progress_test.go

+9-10
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ func TestProgressString(t *testing.T) {
2929
State: StateSnapshot,
3030
PendingSnapshot: 123,
3131
RecentActive: false,
32-
MsgAppFlowPaused: true,
3332
IsLearner: true,
3433
Inflights: ins,
3534
}
@@ -53,29 +52,29 @@ func TestProgressIsPaused(t *testing.T) {
5352
}
5453
for i, tt := range tests {
5554
p := &Progress{
56-
State: tt.state,
57-
MsgAppFlowPaused: tt.paused,
58-
Inflights: NewInflights(256, 0),
55+
State: tt.state,
56+
MsgAppProbesPaused: tt.paused,
57+
Inflights: NewInflights(256, 0),
5958
}
6059
assert.Equal(t, tt.w, p.IsPaused(), i)
6160
}
6261
}
6362

64-
// TestProgressResume ensures that MaybeDecrTo resets MsgAppFlowPaused, and
63+
// TestProgressResume ensures that MaybeDecrTo resets MsgAppProbesPaused, and
6564
// MaybeUpdate does not.
6665
//
6766
// TODO(pav-kv): there is little sense in testing these micro-behaviours in the
6867
// struct. We should test the visible behaviour instead.
6968
func TestProgressResume(t *testing.T) {
7069
p := &Progress{
71-
Next: 2,
72-
MsgAppFlowPaused: true,
70+
Next: 2,
71+
MsgAppProbesPaused: true,
7372
}
7473
p.MaybeDecrTo(1, 1)
75-
assert.False(t, p.MsgAppFlowPaused)
76-
p.MsgAppFlowPaused = true
74+
assert.False(t, p.MsgAppProbesPaused)
75+
p.MsgAppProbesPaused = true
7776
p.MaybeUpdate(2)
78-
assert.True(t, p.MsgAppFlowPaused)
77+
assert.True(t, p.MsgAppProbesPaused)
7978
}
8079

8180
func TestProgressBecomeProbe(t *testing.T) {

0 commit comments

Comments
 (0)