From 288707e78ae7efb995394f7d87a99682abae8dd6 Mon Sep 17 00:00:00 2001 From: AngstyDuck Date: Wed, 13 Sep 2023 00:00:30 +0800 Subject: [PATCH] client: moved test cases to cluster Signed-off-by: AngstyDuck --- .../clientv3/concurrency/session_test.go | 83 ----------------- tests/integration/cluster_test.go | 91 +++++++++++++++++++ 2 files changed, 91 insertions(+), 83 deletions(-) diff --git a/tests/integration/clientv3/concurrency/session_test.go b/tests/integration/clientv3/concurrency/session_test.go index 8facbb2f4be7..b17991179751 100644 --- a/tests/integration/clientv3/concurrency/session_test.go +++ b/tests/integration/clientv3/concurrency/session_test.go @@ -23,7 +23,6 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/concurrency" integration2 "go.etcd.io/etcd/tests/v3/framework/integration" - clientv3test "go.etcd.io/etcd/tests/v3/integration/clientv3" ) func TestSessionOptions(t *testing.T) { @@ -112,85 +111,3 @@ func TestSessionCtx(t *testing.T) { } assert.Equal(t, childCtx.Err(), context.Canceled) } - -// TestCreationTimeout checks that the option WithCreationTimeout -// sets a timeout for the creation of new sessions -func TestCreationTimeout(t *testing.T) { - // create new cluster - clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 1}) - - // create new client - cli, err := integration2.NewClient(t, clientv3.Config{Endpoints: []string{clus.Members[0].GRPCURL()}}) - if err != nil { - clus.Terminate(t) - t.Fatal(err) - } - defer cli.Close() - - // ensure the connection is established. - clientv3test.MustWaitPinReady(t, cli) - - // terminating the cluster - clus.Terminate(t) - - // override the grpc logger - logOb := integration2.ClientGRPCLoggerObserver(t) - - _, err = concurrency.NewSession(cli, concurrency.WithCreationTimeout(3000*time.Millisecond)) - assert.Equal(t, err, context.DeadlineExceeded) - - _, err = logOb.Expect(context.Background(), "Subchannel Connectivity change to TRANSIENT_FAILURE", 3) - assert.Nil(t, err) -} - -// TestTimeoutDoesntAffectSubsequentConnections checks that the option WithCreationTimeout -// is only used when Session is created -func TestTimeoutDoesntAffectSubsequentConnections(t *testing.T) { - // create new cluster - clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 1}) - defer clus.Terminate(t) - clus.Members[0].KeepDataDirTerminate = true - - // create new client - cli, err := integration2.NewClient(t, clientv3.Config{Endpoints: []string{clus.Members[0].GRPCURL()}}) - if err != nil { - clus.Terminate(t) - t.Fatal(err) - } - defer cli.Close() - - // ensure the connection is established. - clientv3test.MustWaitPinReady(t, cli) - - s, err := concurrency.NewSession(cli, concurrency.WithCreationTimeout(1*time.Second)) - - // terminating the cluster - clus.Members[0].Terminate(t) - - errorc := make(chan error) - defer close(errorc) - go func() { - _, err := cli.Put(s.Ctx(), "sample_key", "sample_value", clientv3.WithLease(s.Lease())) - errorc <- err - }() - - select { - case err := <-errorc: - t.Fatalf("Operation put should be blocked forever when the server is unreachable: %v", err) - // if Put operation is blocked beyond the timeout specified using WithCreationTimeout, - // that timeout is not used by the Put operation - case <-time.After(2 * time.Second): - } - - // restarting and ensuring that the Put operation will eventually succeed - clus.Members[0].Restart(t) - clus.Members[0].WaitOK(t) - select { - case err := <-errorc: - if err != nil { - t.Errorf("Put failed: %v", err) - } - case <-time.After(2 * time.Second): - t.Fatal("Put function hung even after restarting cluster") - } -} diff --git a/tests/integration/cluster_test.go b/tests/integration/cluster_test.go index a05b662b8a17..a0e7cae5529f 100644 --- a/tests/integration/cluster_test.go +++ b/tests/integration/cluster_test.go @@ -25,10 +25,13 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/concurrency" "go.etcd.io/etcd/server/v3/etcdserver" "go.etcd.io/etcd/tests/v3/framework/config" "go.etcd.io/etcd/tests/v3/framework/integration" + clientv3test "go.etcd.io/etcd/tests/v3/integration/clientv3" ) func init() { @@ -518,3 +521,91 @@ func TestSpeedyTerminate(t *testing.T) { case <-donec: } } + +// TestCreationTimeout checks that the option WithCreationTimeout +// sets a timeout for the creation of new sessions in case the cluster +// shuts down +func TestCreationTimeout(t *testing.T) { + integration.BeforeTest(t) + + // create new cluster + clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + // create new client + cli, err := integration.NewClient(t, clientv3.Config{Endpoints: []string{clus.Members[0].GRPCURL()}}) + if err != nil { + clus.Terminate(t) + t.Fatal(err) + } + defer cli.Close() + + // ensure the connection is established. + clientv3test.MustWaitPinReady(t, cli) + + // terminating the cluster + clus.Terminate(t) + + // override the grpc logger + logOb := integration.ClientGRPCLoggerObserver(t) + + _, err = concurrency.NewSession(cli, concurrency.WithCreationTimeout(3000*time.Millisecond)) + assert.Equal(t, err, context.DeadlineExceeded) + + _, err = logOb.Expect(context.Background(), "Subchannel Connectivity change to TRANSIENT_FAILURE", 3) + assert.Nil(t, err) +} + +// TestTimeoutDoesntAffectSubsequentConnections checks that the option WithCreationTimeout +// is only used when Session is created +func TestTimeoutDoesntAffectSubsequentConnections(t *testing.T) { + integration.BeforeTest(t) + + // create new cluster + clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + clus.Members[0].KeepDataDirTerminate = true + + // create new client + cli, err := integration.NewClient(t, clientv3.Config{Endpoints: []string{clus.Members[0].GRPCURL()}}) + if err != nil { + clus.Terminate(t) + t.Fatal(err) + } + defer cli.Close() + + // ensure the connection is established. + clientv3test.MustWaitPinReady(t, cli) + + s, err := concurrency.NewSession(cli, concurrency.WithCreationTimeout(1*time.Second)) + + // terminating the cluster + clus.Members[0].Terminate(t) + + errorc := make(chan error) + defer close(errorc) + go func() { + _, err := cli.Put(s.Ctx(), "sample_key", "sample_value", clientv3.WithLease(s.Lease())) + errorc <- err + }() + + select { + case err := <-errorc: + t.Fatalf("Operation put should be blocked forever when the server is unreachable: %v", err) + // if Put operation is blocked beyond the timeout specified using WithCreationTimeout, + // that timeout is not used by the Put operation + case <-time.After(2 * time.Second): + } + + // restarting and ensuring that the Put operation will eventually succeed + clus.Members[0].Restart(t) + clus.Members[0].WaitOK(t) + select { + case err := <-errorc: + if err != nil { + t.Errorf("Put failed: %v", err) + } + case <-time.After(2 * time.Second): + t.Fatal("Put function hung even after restarting cluster") + } +}