Skip to content

Commit

Permalink
client: moved test cases to cluster
Browse files Browse the repository at this point in the history
Signed-off-by: AngstyDuck <[email protected]>
  • Loading branch information
AngstyDuck committed Sep 12, 2023
1 parent 14732df commit b3f356b
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 83 deletions.
83 changes: 0 additions & 83 deletions tests/integration/clientv3/concurrency/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
integration2 "go.etcd.io/etcd/tests/v3/framework/integration"
clientv3test "go.etcd.io/etcd/tests/v3/integration/clientv3"
)

func TestSessionOptions(t *testing.T) {
Expand Down Expand Up @@ -112,85 +111,3 @@ func TestSessionCtx(t *testing.T) {
}
assert.Equal(t, childCtx.Err(), context.Canceled)
}

// TestCreationTimeout checks that the option WithCreationTimeout
// sets a timeout for the creation of new sessions
func TestCreationTimeout(t *testing.T) {
// create new cluster
clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 1})

// create new client
cli, err := integration2.NewClient(t, clientv3.Config{Endpoints: []string{clus.Members[0].GRPCURL()}})
if err != nil {
clus.Terminate(t)
t.Fatal(err)
}
defer cli.Close()

// ensure the connection is established.
clientv3test.MustWaitPinReady(t, cli)

// terminating the cluster
clus.Terminate(t)

// override the grpc logger
logOb := integration2.ClientGRPCLoggerObserver(t)

_, err = concurrency.NewSession(cli, concurrency.WithCreationTimeout(3000*time.Millisecond))
assert.Equal(t, err, context.DeadlineExceeded)

_, err = logOb.Expect(context.Background(), "Subchannel Connectivity change to TRANSIENT_FAILURE", 3)
assert.Nil(t, err)
}

// TestTimeoutDoesntAffectSubsequentConnections checks that the option WithCreationTimeout
// is only used when Session is created
func TestTimeoutDoesntAffectSubsequentConnections(t *testing.T) {
// create new cluster
clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 1})
defer clus.Terminate(t)
clus.Members[0].KeepDataDirTerminate = true

// create new client
cli, err := integration2.NewClient(t, clientv3.Config{Endpoints: []string{clus.Members[0].GRPCURL()}})
if err != nil {
clus.Terminate(t)
t.Fatal(err)
}
defer cli.Close()

// ensure the connection is established.
clientv3test.MustWaitPinReady(t, cli)

s, err := concurrency.NewSession(cli, concurrency.WithCreationTimeout(1*time.Second))

// terminating the cluster
clus.Members[0].Terminate(t)

errorc := make(chan error)
defer close(errorc)
go func() {
_, err := cli.Put(s.Ctx(), "sample_key", "sample_value", clientv3.WithLease(s.Lease()))
errorc <- err
}()

select {
case err := <-errorc:
t.Fatalf("Operation put should be blocked forever when the server is unreachable: %v", err)
// if Put operation is blocked beyond the timeout specified using WithCreationTimeout,
// that timeout is not used by the Put operation
case <-time.After(2 * time.Second):
}

// restarting and ensuring that the Put operation will eventually succeed
clus.Members[0].Restart(t)
clus.Members[0].WaitOK(t)
select {
case err := <-errorc:
if err != nil {
t.Errorf("Put failed: %v", err)
}
case <-time.After(2 * time.Second):
t.Fatal("Put function hung even after restarting cluster")
}
}
92 changes: 92 additions & 0 deletions tests/integration/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,14 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"

clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/tests/v3/framework/config"
"go.etcd.io/etcd/tests/v3/framework/integration"
clientv3test "go.etcd.io/etcd/tests/v3/integration/clientv3"
)

func init() {
Expand Down Expand Up @@ -518,3 +522,91 @@ func TestSpeedyTerminate(t *testing.T) {
case <-donec:
}
}

// TestCreationTimeout checks that the option WithCreationTimeout
// sets a timeout for the creation of new sessions in case the cluster
// shuts down
func TestCreationTimeout(t *testing.T) {
integration.BeforeTest(t)

// create new cluster
clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)

// create new client
cli, err := integration.NewClient(t, clientv3.Config{Endpoints: []string{clus.Members[0].GRPCURL()}})
if err != nil {
clus.Terminate(t)
t.Fatal(err)
}
defer cli.Close()

// ensure the connection is established.
clientv3test.MustWaitPinReady(t, cli)

// terminating the cluster
clus.Terminate(t)

// override the grpc logger
logOb := integration.ClientGRPCLoggerObserver(t)

_, err = concurrency.NewSession(cli, concurrency.WithCreationTimeout(3000*time.Millisecond))
assert.Equal(t, err, context.DeadlineExceeded)

_, err = logOb.Expect(context.Background(), "Subchannel Connectivity change to TRANSIENT_FAILURE", 3)
assert.Nil(t, err)
}

// TestTimeoutDoesntAffectSubsequentConnections checks that the option WithCreationTimeout
// is only used when Session is created
func TestTimeoutDoesntAffectSubsequentConnections(t *testing.T) {
integration.BeforeTest(t)

// create new cluster
clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
clus.Members[0].KeepDataDirTerminate = true

// create new client
cli, err := integration.NewClient(t, clientv3.Config{Endpoints: []string{clus.Members[0].GRPCURL()}})
if err != nil {
clus.Terminate(t)
t.Fatal(err)
}
defer cli.Close()

// ensure the connection is established.
clientv3test.MustWaitPinReady(t, cli)

s, err := concurrency.NewSession(cli, concurrency.WithCreationTimeout(1*time.Second))

// terminating the cluster
clus.Members[0].Terminate(t)

errorc := make(chan error)
defer close(errorc)
go func() {
_, err := cli.Put(s.Ctx(), "sample_key", "sample_value", clientv3.WithLease(s.Lease()))
errorc <- err
}()

select {
case err := <-errorc:
t.Fatalf("Operation put should be blocked forever when the server is unreachable: %v", err)
// if Put operation is blocked beyond the timeout specified using WithCreationTimeout,
// that timeout is not used by the Put operation
case <-time.After(2 * time.Second):
}

// restarting and ensuring that the Put operation will eventually succeed
clus.Members[0].Restart(t)
clus.Members[0].WaitOK(t)
select {
case err := <-errorc:
if err != nil {
t.Errorf("Put failed: %v", err)
}
case <-time.After(2 * time.Second):
t.Fatal("Put function hung even after restarting cluster")
}
}

0 comments on commit b3f356b

Please sign in to comment.