Skip to content

Commit feeb0ae

Browse files
committed
add PriorityRequest FeatureGate to make LeaseRevoke have higher priority to be applied under overload conditions
Signed-off-by: shenmu.wy <[email protected]>
1 parent 3531079 commit feeb0ae

File tree

5 files changed

+97
-5
lines changed

5 files changed

+97
-5
lines changed

server/etcdserver/server_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1167,7 +1167,7 @@ func TestPublishV3(t *testing.T) {
11671167
lgMu: new(sync.RWMutex),
11681168
lg: lg,
11691169
readych: make(chan struct{}),
1170-
Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries, MaxRequestBytes: 1000},
1170+
Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries, MaxRequestBytes: 1000, ServerFeatureGate: features.NewDefaultServerFeatureGate("test", lg)},
11711171
memberID: 1,
11721172
r: *newRaftNode(raftNodeConfig{lg: lg, Node: n}),
11731173
attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}},
@@ -1209,7 +1209,7 @@ func TestPublishV3Stopped(t *testing.T) {
12091209
srv := &EtcdServer{
12101210
lgMu: new(sync.RWMutex),
12111211
lg: zaptest.NewLogger(t),
1212-
Cfg: config.ServerConfig{Logger: zaptest.NewLogger(t), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
1212+
Cfg: config.ServerConfig{Logger: zaptest.NewLogger(t), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries, ServerFeatureGate: features.NewDefaultServerFeatureGate("test", nil)},
12131213
r: *r,
12141214
cluster: &membership.RaftCluster{},
12151215
w: mockwait.NewNop(),
@@ -1237,7 +1237,7 @@ func TestPublishV3Retry(t *testing.T) {
12371237
lgMu: new(sync.RWMutex),
12381238
lg: lg,
12391239
readych: make(chan struct{}),
1240-
Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries, MaxRequestBytes: 1000},
1240+
Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries, MaxRequestBytes: 1000, ServerFeatureGate: features.NewDefaultServerFeatureGate("test", lg)},
12411241
memberID: 1,
12421242
r: *newRaftNode(raftNodeConfig{lg: lg, Node: n}),
12431243
w: mockwait.NewNop(),
@@ -1288,7 +1288,7 @@ func TestUpdateVersionV3(t *testing.T) {
12881288
lgMu: new(sync.RWMutex),
12891289
lg: zaptest.NewLogger(t),
12901290
memberID: 1,
1291-
Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries, MaxRequestBytes: 1000},
1291+
Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries, MaxRequestBytes: 1000, ServerFeatureGate: features.NewDefaultServerFeatureGate("test", lg)},
12921292
r: *newRaftNode(raftNodeConfig{lg: zaptest.NewLogger(t), Node: n}),
12931293
attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}},
12941294
cluster: &membership.RaftCluster{},

server/etcdserver/util.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"fmt"
1919
"time"
2020

21+
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
2122
"go.etcd.io/etcd/client/pkg/v3/types"
2223
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
2324
"go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
@@ -42,6 +43,25 @@ func isConnectedFullySince(transport rafthttp.Transporter, since time.Time, self
4243
return numConnectedSince(transport, since, self, members) == len(members)
4344
}
4445

46+
// exceedsRequestLimit checks if the committed index is too far ahead of
47+
// the applied index. Priority requests are allowed up to 10% extra gap.
48+
func exceedsRequestLimit(ai, ci uint64, r *pb.InternalRaftRequest, enablePriority bool) bool {
49+
if ci <= ai+maxGapBetweenApplyAndCommitIndex {
50+
return false
51+
}
52+
// allow up to 10% extra gap for priority requests
53+
if enablePriority && ci <= ai+maxGapBetweenApplyAndCommitIndex*110/100 {
54+
if isPriorityRequest(r) {
55+
return false
56+
}
57+
}
58+
return true
59+
}
60+
61+
func isPriorityRequest(r *pb.InternalRaftRequest) bool {
62+
return r != nil && r.LeaseRevoke != nil
63+
}
64+
4565
// numConnectedSince counts how many members are connected to the local member
4666
// since the given time.
4767
func numConnectedSince(transport rafthttp.Transporter, since time.Time, self types.ID, members []*membership.Member) int {

server/etcdserver/util_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@ import (
1919
"testing"
2020
"time"
2121

22+
"github.com/stretchr/testify/assert"
2223
"go.uber.org/zap/zaptest"
2324

25+
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
2426
"go.etcd.io/etcd/client/pkg/v3/types"
2527
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
2628
"go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
@@ -110,3 +112,66 @@ type testStringerFunc func() string
110112
func (s testStringerFunc) String() string {
111113
return s()
112114
}
115+
116+
func TestExceedsRequestLimit(t *testing.T) {
117+
tests := []struct {
118+
name string
119+
ci uint64
120+
ai uint64
121+
expectedResult bool
122+
req *pb.InternalRaftRequest
123+
enablePriority bool
124+
}{
125+
{
126+
ci: 1 + maxGapBetweenApplyAndCommitIndex,
127+
ai: 1,
128+
expectedResult: false,
129+
req: nil,
130+
name: "Test nil InternalRaftRequest",
131+
},
132+
{
133+
ci: 1 + maxGapBetweenApplyAndCommitIndex,
134+
ai: 1,
135+
expectedResult: false,
136+
req: &pb.InternalRaftRequest{},
137+
name: "Test non-critical request and gap is not larger than maxGapBetweenApplyAndCommitIndex",
138+
},
139+
{
140+
ci: 1 + maxGapBetweenApplyAndCommitIndex + 1,
141+
ai: 1,
142+
expectedResult: true,
143+
req: &pb.InternalRaftRequest{},
144+
name: "Test non-critical request and gap is larger than maxGapBetweenApplyAndCommitIndex",
145+
},
146+
{
147+
ci: 1 + maxGapBetweenApplyAndCommitIndex + 1,
148+
ai: 1,
149+
expectedResult: false,
150+
req: &pb.InternalRaftRequest{LeaseRevoke: &pb.LeaseRevokeRequest{}},
151+
name: "Test critical request and gap is larger than maxGapBetweenApplyAndCommitIndex with priority check",
152+
enablePriority: true,
153+
},
154+
{
155+
ci: 1 + maxGapBetweenApplyAndCommitIndex + 1,
156+
ai: 1,
157+
expectedResult: true,
158+
req: &pb.InternalRaftRequest{LeaseRevoke: &pb.LeaseRevokeRequest{}},
159+
name: "Test critical request and gap is larger than maxGapBetweenApplyAndCommitIndex without priority check",
160+
enablePriority: false,
161+
},
162+
{
163+
ci: 1 + maxGapBetweenApplyAndCommitIndex + 1 + maxGapBetweenApplyAndCommitIndex/10,
164+
ai: 1,
165+
expectedResult: true,
166+
req: &pb.InternalRaftRequest{LeaseRevoke: &pb.LeaseRevokeRequest{}},
167+
name: "Test critical request and gap is larger than 110% maxGapBetweenApplyAndCommitIndex with priority check",
168+
enablePriority: true,
169+
},
170+
}
171+
172+
for _, tc := range tests {
173+
t.Run(tc.name, func(t *testing.T) {
174+
assert.Equal(t, tc.expectedResult, exceedsRequestLimit(tc.ai, tc.ci, tc.req, tc.enablePriority))
175+
})
176+
}
177+
}

server/etcdserver/v3_server.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -803,7 +803,8 @@ func (s *EtcdServer) doSerialize(ctx context.Context, chk func(*auth.AuthInfo) e
803803
func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*apply2.Result, error) {
804804
ai := s.getAppliedIndex()
805805
ci := s.getCommittedIndex()
806-
if ci > ai+maxGapBetweenApplyAndCommitIndex {
806+
807+
if exceedsRequestLimit(ai, ci, &r, s.FeatureEnabled(features.PriorityRequest)) {
807808
return nil, errors.ErrTooManyRequests
808809
}
809810

server/features/etcd_features.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,11 @@ const (
7474
// alpha: v3.6
7575
// main PR: https://github.com/etcd-io/etcd/pull/17661
7676
SetMemberLocalAddr featuregate.Feature = "SetMemberLocalAddr"
77+
// PriorityRequest enables certain kinds of requests(e.g. LreaeRevoke) having higher priority to be applied under overload conditions.
78+
// owner: @silentred
79+
// alpha: v3.7
80+
// main PR: https://github.com/etcd-io/etcd/pull/20492
81+
PriorityRequest featuregate.Feature = "PriorityRequest"
7782
)
7883

7984
var DefaultEtcdServerFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
@@ -84,6 +89,7 @@ var DefaultEtcdServerFeatureGates = map[featuregate.Feature]featuregate.FeatureS
8489
LeaseCheckpoint: {Default: false, PreRelease: featuregate.Alpha},
8590
LeaseCheckpointPersist: {Default: false, PreRelease: featuregate.Alpha},
8691
SetMemberLocalAddr: {Default: false, PreRelease: featuregate.Alpha},
92+
PriorityRequest: {Default: false, PreRelease: featuregate.Alpha},
8793
}
8894

8995
func NewDefaultServerFeatureGate(name string, lg *zap.Logger) featuregate.FeatureGate {

0 commit comments

Comments
 (0)