Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions server/etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1167,7 +1167,7 @@ func TestPublishV3(t *testing.T) {
lgMu: new(sync.RWMutex),
lg: lg,
readych: make(chan struct{}),
Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries, MaxRequestBytes: 1000},
Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries, MaxRequestBytes: 1000, ServerFeatureGate: features.NewDefaultServerFeatureGate("test", lg)},
memberID: 1,
r: *newRaftNode(raftNodeConfig{lg: lg, Node: n}),
attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}},
Expand Down Expand Up @@ -1209,7 +1209,7 @@ func TestPublishV3Stopped(t *testing.T) {
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zaptest.NewLogger(t),
Cfg: config.ServerConfig{Logger: zaptest.NewLogger(t), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
Cfg: config.ServerConfig{Logger: zaptest.NewLogger(t), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries, ServerFeatureGate: features.NewDefaultServerFeatureGate("test", nil)},
r: *r,
cluster: &membership.RaftCluster{},
w: mockwait.NewNop(),
Expand Down Expand Up @@ -1237,7 +1237,7 @@ func TestPublishV3Retry(t *testing.T) {
lgMu: new(sync.RWMutex),
lg: lg,
readych: make(chan struct{}),
Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries, MaxRequestBytes: 1000},
Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries, MaxRequestBytes: 1000, ServerFeatureGate: features.NewDefaultServerFeatureGate("test", lg)},
memberID: 1,
r: *newRaftNode(raftNodeConfig{lg: lg, Node: n}),
w: mockwait.NewNop(),
Expand Down Expand Up @@ -1288,7 +1288,7 @@ func TestUpdateVersionV3(t *testing.T) {
lgMu: new(sync.RWMutex),
lg: zaptest.NewLogger(t),
memberID: 1,
Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries, MaxRequestBytes: 1000},
Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries, MaxRequestBytes: 1000, ServerFeatureGate: features.NewDefaultServerFeatureGate("test", lg)},
r: *newRaftNode(raftNodeConfig{lg: zaptest.NewLogger(t), Node: n}),
attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}},
cluster: &membership.RaftCluster{},
Expand Down
20 changes: 20 additions & 0 deletions server/etcdserver/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"time"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
Expand All @@ -42,6 +43,25 @@ func isConnectedFullySince(transport rafthttp.Transporter, since time.Time, self
return numConnectedSince(transport, since, self, members) == len(members)
}

// exceedsRequestLimit checks if the committed index is too far ahead of
// the applied index. Priority requests are allowed up to 10% extra gap.
func exceedsRequestLimit(ai, ci uint64, r *pb.InternalRaftRequest, enablePriority bool) bool {
if ci <= ai+maxGapBetweenApplyAndCommitIndex {
return false
}
// allow up to 100% extra gap for priority requests
if enablePriority && isPriorityRequest(r) {
if ci <= ai+maxGapBetweenApplyAndCommitIndex*200/100 {
return false
}
}
return true
}

func isPriorityRequest(r *pb.InternalRaftRequest) bool {
return r != nil && r.LeaseRevoke != nil
}

// numConnectedSince counts how many members are connected to the local member
// since the given time.
func numConnectedSince(transport rafthttp.Transporter, since time.Time, self types.ID, members []*membership.Member) int {
Expand Down
65 changes: 65 additions & 0 deletions server/etcdserver/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.uber.org/zap/zaptest"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
Expand Down Expand Up @@ -110,3 +112,66 @@ type testStringerFunc func() string
func (s testStringerFunc) String() string {
return s()
}

func TestExceedsRequestLimit(t *testing.T) {
tests := []struct {
name string
ci uint64
ai uint64
expectedResult bool
req *pb.InternalRaftRequest
enablePriority bool
}{
{
ci: 1 + maxGapBetweenApplyAndCommitIndex,
ai: 1,
expectedResult: false,
req: nil,
name: "Test nil InternalRaftRequest",
},
{
ci: 1 + maxGapBetweenApplyAndCommitIndex,
ai: 1,
expectedResult: false,
req: &pb.InternalRaftRequest{},
name: "Test non-critical request and gap is not larger than maxGapBetweenApplyAndCommitIndex",
},
{
ci: 1 + maxGapBetweenApplyAndCommitIndex + 1,
ai: 1,
expectedResult: true,
req: &pb.InternalRaftRequest{},
name: "Test non-critical request and gap is larger than maxGapBetweenApplyAndCommitIndex",
},
{
ci: 1 + maxGapBetweenApplyAndCommitIndex + 1,
ai: 1,
expectedResult: false,
req: &pb.InternalRaftRequest{LeaseRevoke: &pb.LeaseRevokeRequest{}},
name: "Test critical request and gap is larger than maxGapBetweenApplyAndCommitIndex with priority check",
enablePriority: true,
},
{
ci: 1 + maxGapBetweenApplyAndCommitIndex + 1,
ai: 1,
expectedResult: true,
req: &pb.InternalRaftRequest{LeaseRevoke: &pb.LeaseRevokeRequest{}},
name: "Test critical request and gap is larger than maxGapBetweenApplyAndCommitIndex without priority check",
enablePriority: false,
},
{
ci: 1 + 2*maxGapBetweenApplyAndCommitIndex + 1,
ai: 1,
expectedResult: true,
req: &pb.InternalRaftRequest{LeaseRevoke: &pb.LeaseRevokeRequest{}},
name: "Test critical request and gap is larger than 200% maxGapBetweenApplyAndCommitIndex with priority check",
enablePriority: true,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
assert.Equal(t, tc.expectedResult, exceedsRequestLimit(tc.ai, tc.ci, tc.req, tc.enablePriority))
})
}
}
3 changes: 2 additions & 1 deletion server/etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -817,7 +817,8 @@ func (s *EtcdServer) doSerialize(ctx context.Context, chk func(*auth.AuthInfo) e
func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*apply2.Result, error) {
ai := s.getAppliedIndex()
ci := s.getCommittedIndex()
if ci > ai+maxGapBetweenApplyAndCommitIndex {

if exceedsRequestLimit(ai, ci, &r, s.FeatureEnabled(features.PriorityRequest)) {
return nil, errors.ErrTooManyRequests
}

Expand Down
6 changes: 6 additions & 0 deletions server/features/etcd_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ const (
// beta: v3.7
// main PR: https://github.com/etcd-io/etcd/pull/20589
FastLeaseKeepAlive featuregate.Feature = "FastLeaseKeepAlive"
// PriorityRequest enables certain kinds of requests(e.g. LreaeRevoke) having higher priority to be applied under overload conditions.
// owner: @silentred
// alpha: v3.7
// main PR: https://github.com/etcd-io/etcd/pull/20492
PriorityRequest featuregate.Feature = "PriorityRequest"
)

var DefaultEtcdServerFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
Expand All @@ -90,6 +95,7 @@ var DefaultEtcdServerFeatureGates = map[featuregate.Feature]featuregate.FeatureS
LeaseCheckpointPersist: {Default: false, PreRelease: featuregate.Alpha},
SetMemberLocalAddr: {Default: false, PreRelease: featuregate.Alpha},
FastLeaseKeepAlive: {Default: true, PreRelease: featuregate.Beta},
PriorityRequest: {Default: false, PreRelease: featuregate.Alpha},
}

func NewDefaultServerFeatureGate(name string, lg *zap.Logger) featuregate.FeatureGate {
Expand Down