diff --git a/tests/common/e2e_test.go b/tests/common/e2e_test.go index eeba55702811..31955acbcc58 100644 --- a/tests/common/e2e_test.go +++ b/tests/common/e2e_test.go @@ -54,12 +54,41 @@ func minimalE2eEnabled() bool { func e2eClusterTestCases() []testCase { minimalTestCases := []testCase{ { - name: "NoTLS", - config: config.ClusterConfig{ClusterSize: 1}, + name: "NoTLS", + config: config.ClusterConfig{ + ClusterSize: 1, + ClusterContext: &e2e.ClusterContext{ + ClientHTTPSeparate: false, + }, + }, + }, + { + name: "NoTLS SeparateHTTPPort", + config: config.ClusterConfig{ + ClusterSize: 1, + ClusterContext: &e2e.ClusterContext{ + ClientHTTPSeparate: true, + }, + }, }, { - name: "PeerTLS and ClientTLS", - config: config.ClusterConfig{ClusterSize: 3, PeerTLS: config.ManualTLS, ClientTLS: config.ManualTLS}, + name: "PeerTLS and ClientTLS", + config: config.ClusterConfig{ + ClusterSize: 3, + PeerTLS: config.ManualTLS, + ClientTLS: config.ManualTLS, + }, + }, + { + name: "PeerTLS and ClientTLS SeparateHTTPPort", + config: config.ClusterConfig{ + ClusterSize: 3, + PeerTLS: config.ManualTLS, + ClientTLS: config.ManualTLS, + ClusterContext: &e2e.ClusterContext{ + ClientHTTPSeparate: true, + }, + }, }, } diff --git a/tests/common/watch_test.go b/tests/common/watch_test.go index 4e853df921c3..1c1275657439 100644 --- a/tests/common/watch_test.go +++ b/tests/common/watch_test.go @@ -16,14 +16,20 @@ package common import ( "context" + "fmt" + "strings" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" "go.etcd.io/etcd/tests/v3/framework/config" + "go.etcd.io/etcd/tests/v3/framework/e2e" "go.etcd.io/etcd/tests/v3/framework/testutils" + + e2e_utils "go.etcd.io/etcd/tests/v3/e2e" ) func TestWatch(t *testing.T) { @@ -85,6 +91,7 @@ func TestWatch(t *testing.T) { wCancel() require.NoErrorf(t, err, "failed to get key-values from watch channel %s", err) } + fmt.Println("kvs: ", kvs) wCancel() assert.Equal(t, tt.wanted, kvs) @@ -93,3 +100,94 @@ func TestWatch(t *testing.T) { }) } } + +func TestWatchDelayForPeriodicProgressNotification(t *testing.T) { + testRunner.BeforeTest(t) + watchResponsePeriod := 100 * time.Millisecond + watchTestDuration := 5 * time.Second + dbSizeBytes := 5 * 1000 * 1000 + + for _, tc := range clusterTestCases() { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(t.Context(), 20*time.Second) + defer cancel() + + cfg := tc.config + if cfg.ClusterContext == nil { + cfg.ClusterContext = &e2e.ClusterContext{} + } + cfg.ClusterContext.(*e2e.ClusterContext).ServerWatchProgressNotifyInterval = watchResponsePeriod + + clus := testRunner.NewCluster(ctx, t, config.WithClusterConfig(cfg)) + defer clus.Close() + + cc := testutils.MustClient(clus.Client()) + + wCtx, cancel := context.WithTimeout(t.Context(), watchTestDuration) + defer cancel() + require.NoError(t, e2e_utils.FillEtcdWithData(ctx, cc, dbSizeBytes)) + + g := errgroup.Group{} + + wch := cc.Watch(wCtx, "fake-key", config.WatchOptions{ProgressNotify: true}) + require.NotNil(t, wch) + + e2e_utils.ContinuouslyExecuteGetAll(wCtx, t, &g, cc) + + e2e_utils.ValidateWatchDelay(t, wch, 150*time.Millisecond) + require.NoError(t, g.Wait()) + }) + } +} + +func TestWatchDelayForEvent(t *testing.T) { + e2e.BeforeTest(t) + watchResponsePeriod := 100 * time.Millisecond + watchTestDuration := 5 * time.Second + dbSizeBytes := 5 * 1000 * 1000 + + for _, tc := range clusterTestCases() { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(t.Context(), 20*time.Second) + defer cancel() + + cfg := tc.config + if cfg.ClusterContext == nil { + cfg.ClusterContext = &e2e.ClusterContext{} + } + cfg.ClusterContext.(*e2e.ClusterContext).ServerWatchProgressNotifyInterval = watchResponsePeriod + + clus := testRunner.NewCluster(ctx, t, config.WithClusterConfig(cfg)) + defer clus.Close() + + cc := testutils.MustClient(clus.Client()) + + wCtx, cancel := context.WithTimeout(t.Context(), watchTestDuration) + defer cancel() + require.NoError(t, e2e_utils.FillEtcdWithData(ctx, cc, dbSizeBytes)) + + g := errgroup.Group{} + g.Go(func() error { + i := 0 + for { + err := cc.Put(ctx, "key", fmt.Sprintf("%d", i), config.PutOptions{}) + if err != nil { + if strings.Contains(err.Error(), "context deadline exceeded") { + return nil + } + return err + } + time.Sleep(watchResponsePeriod) + } + }) + + wch := cc.Watch(wCtx, "key", config.WatchOptions{}) + require.NotNil(t, wch) + + e2e_utils.ContinuouslyExecuteGetAll(wCtx, t, &g, cc) + + e2e_utils.ValidateWatchDelay(t, wch, 150*time.Millisecond) + require.NoError(t, g.Wait()) + }) + } +} diff --git a/tests/e2e/utils.go b/tests/e2e/utils.go index 9eb7e0ec2c30..b5544d560931 100644 --- a/tests/e2e/utils.go +++ b/tests/e2e/utils.go @@ -38,8 +38,10 @@ import ( "go.etcd.io/etcd/client/pkg/v3/transport" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/pkg/v3/stringutil" + "go.etcd.io/etcd/tests/v3/framework/config" "go.etcd.io/etcd/tests/v3/framework/e2e" "go.etcd.io/etcd/tests/v3/framework/integration" + "go.etcd.io/etcd/tests/v3/framework/interfaces" ) func newClient(t *testing.T, entpoints []string, cfg e2e.ClientConfig) *clientv3.Client { @@ -87,7 +89,7 @@ func tlsInfo(tb testing.TB, cfg e2e.ClientConfig) (*transport.TLSInfo, error) { } } -func fillEtcdWithData(ctx context.Context, c *clientv3.Client, dbSize int) error { +func FillEtcdWithData(ctx context.Context, c interfaces.Client, dbSize int) error { g := errgroup.Group{} concurrency := 10 keyCount := 100 @@ -97,8 +99,7 @@ func fillEtcdWithData(ctx context.Context, c *clientv3.Client, dbSize int) error i := i g.Go(func() error { for j := 0; j < keysPerRoutine; j++ { - _, err := c.Put(ctx, fmt.Sprintf("%d", i*keysPerRoutine+j), stringutil.RandString(uint(valueSize))) - if err != nil { + if err := c.Put(ctx, fmt.Sprintf("%d", i*keysPerRoutine+j), stringutil.RandString(uint(valueSize)), config.PutOptions{}); err != nil { return err } } diff --git a/tests/e2e/watch.go b/tests/e2e/watch.go new file mode 100644 index 000000000000..719cb9fc224b --- /dev/null +++ b/tests/e2e/watch.go @@ -0,0 +1,103 @@ +// Copyright 2023 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// These tests are performance sensitive, addition of cluster proxy makes them unstable. +//go:build !cluster_proxy + +package e2e + +import ( + "context" + "strings" + "sync" + "testing" + "time" + + "golang.org/x/sync/errgroup" + + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/tests/v3/framework/config" + "go.etcd.io/etcd/tests/v3/framework/interfaces" +) + +const ( + watchResponsePeriod = 100 * time.Millisecond + watchTestDuration = 5 * time.Second + readLoadConcurrency = 10 +) + +func ValidateWatchDelay(t *testing.T, watch clientv3.WatchChan, maxWatchDelay time.Duration) { + start := time.Now() + var maxDelay time.Duration + for range watch { + sinceLast := time.Since(start) + if sinceLast > watchResponsePeriod+maxWatchDelay { + t.Errorf("Unexpected watch response delayed over allowed threshold %s, delay: %s", maxWatchDelay, sinceLast-watchResponsePeriod) + } else { + t.Logf("Got watch response, since last: %s", sinceLast) + } + if sinceLast > maxDelay { + maxDelay = sinceLast + } + start = time.Now() + } + sinceLast := time.Since(start) + if sinceLast > maxDelay && sinceLast > watchResponsePeriod+maxWatchDelay { + t.Errorf("Unexpected watch response delayed over allowed threshold %s, delay: unknown", maxWatchDelay) + t.Errorf("Test finished while in middle of delayed response, measured delay: %s", sinceLast-watchResponsePeriod) + t.Logf("Please increase the test duration to measure delay") + } else { + t.Logf("Max delay: %s", maxDelay-watchResponsePeriod) + } +} + +func ContinuouslyExecuteGetAll(ctx context.Context, t *testing.T, g *errgroup.Group, c interfaces.Client) { + mux := sync.RWMutex{} + size := 0 + for i := 0; i < readLoadConcurrency; i++ { + g.Go(func() error { + for { + resp, err := c.Get(ctx, "", config.GetOptions{Prefix: true}) + if err != nil { + if strings.Contains(err.Error(), "context deadline exceeded") { + return nil + } + return err + } + respSize := 0 + for _, kv := range resp.Kvs { + respSize += kv.Size() + } + mux.Lock() + size += respSize + mux.Unlock() + } + }) + } + g.Go(func() error { + lastSize := size + for range time.Tick(time.Second) { + select { + case <-ctx.Done(): + return nil + default: + } + mux.RLock() + t.Logf("Generating read load around %.1f MB/s", float64(size-lastSize)/1000/1000) + lastSize = size + mux.RUnlock() + } + return nil + }) +} diff --git a/tests/e2e/watch_test.go b/tests/e2e/watch_test.go index c628a0cc888c..1c4df7c552b0 100644 --- a/tests/e2e/watch_test.go +++ b/tests/e2e/watch_test.go @@ -33,15 +33,10 @@ import ( "go.etcd.io/etcd/api/v3/mvccpb" v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/pkg/v3/stringutil" "go.etcd.io/etcd/tests/v3/framework/e2e" ) -const ( - watchResponsePeriod = 100 * time.Millisecond - watchTestDuration = 5 * time.Second - readLoadConcurrency = 10 -) - type testCase struct { name string client e2e.ClientConfig @@ -562,3 +557,23 @@ func TestResumeCompactionOnTombstone(t *testing.T) { t.Fatal("timed out getting watch response") } } + +func fillEtcdWithData(ctx context.Context, c *clientv3.Client, dbSize int) error { + g := errgroup.Group{} + concurrency := 10 + keyCount := 100 + keysPerRoutine := keyCount / concurrency + valueSize := dbSize / keyCount + for i := 0; i < concurrency; i++ { + i := i + g.Go(func() error { + for j := 0; j < keysPerRoutine; j++ { + if _, err := c.Put(ctx, fmt.Sprintf("%d", i*keysPerRoutine+j), stringutil.RandString(uint(valueSize))); err != nil { + return err + } + } + return nil + }) + } + return g.Wait() +} diff --git a/tests/framework/config/client.go b/tests/framework/config/client.go index ac82bd547925..2e18eac9fece 100644 --- a/tests/framework/config/client.go +++ b/tests/framework/config/client.go @@ -73,7 +73,8 @@ type UserAddOptions struct { } type WatchOptions struct { - Prefix bool - Revision int64 - RangeEnd string + Prefix bool + Revision int64 + RangeEnd string + ProgressNotify bool } diff --git a/tests/framework/e2e/config.go b/tests/framework/e2e/config.go index 4e0405468624..12b0bd76d898 100644 --- a/tests/framework/e2e/config.go +++ b/tests/framework/e2e/config.go @@ -17,6 +17,7 @@ package e2e import ( "fmt" "strings" + "time" "github.com/coreos/go-semver/semver" @@ -40,9 +41,12 @@ func (cv ClusterVersion) String() string { } type ClusterContext struct { - Version ClusterVersion - EnvVars map[string]string - UseUnix bool + Version ClusterVersion + EnvVars map[string]string + UseUnix bool + BasePort int + ClientHTTPSeparate bool + ServerWatchProgressNotifyInterval time.Duration } var experimentalFlags = map[string]struct{}{ diff --git a/tests/framework/e2e/e2e.go b/tests/framework/e2e/e2e.go index 7ab52006beac..6b2e4e9975cc 100644 --- a/tests/framework/e2e/e2e.go +++ b/tests/framework/e2e/e2e.go @@ -66,6 +66,10 @@ func (e e2eRunner) NewCluster(ctx context.Context, tb testing.TB, opts ...config if ctx.UseUnix { e2eConfig.BaseClientScheme = "unix" } + e2eConfig.ClientHTTPSeparate = ctx.ClientHTTPSeparate + if ctx.ServerWatchProgressNotifyInterval != 0 { + e2eConfig.ServerConfig.WatchProgressNotifyInterval = ctx.ServerWatchProgressNotifyInterval + } } switch cfg.ClientTLS { diff --git a/tests/framework/e2e/etcdctl.go b/tests/framework/e2e/etcdctl.go index c86fdb871eb0..58e69bef89fa 100644 --- a/tests/framework/e2e/etcdctl.go +++ b/tests/framework/e2e/etcdctl.go @@ -713,6 +713,10 @@ func (ctl *EtcdctlV3) Watch(ctx context.Context, key string, opts config.WatchOp if opts.Revision != 0 { args = append(args, "--rev", fmt.Sprint(opts.Revision)) } + if opts.ProgressNotify { + args = append(args, "--progress-notify") + } + proc, err := SpawnCmd(args, nil) if err != nil { return nil @@ -735,7 +739,7 @@ func (ctl *EtcdctlV3) Watch(ctx context.Context, key string, opts config.WatchOp close(ch) return } - if len(resp.Events) > 0 { + if len(resp.Events) > 0 || opts.ProgressNotify { ch <- resp } }