Skip to content

Commit

Permalink
client: added log monitor to test retrying mechanism when creating se…
Browse files Browse the repository at this point in the history
…ssions

Signed-off-by: AngstyDuck <[email protected]>
  • Loading branch information
AngstyDuck committed Sep 3, 2023
1 parent 600ffc3 commit d86cbc5
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 52 deletions.
22 changes: 22 additions & 0 deletions tests/framework/integration/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

grpc_logsettable "github.com/grpc-ecosystem/go-grpc-middleware/logging/settable"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zapgrpc"
"go.uber.org/zap/zaptest"
Expand All @@ -30,6 +31,7 @@ import (
"go.etcd.io/etcd/client/pkg/v3/verify"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/embed"
"go.etcd.io/etcd/tests/v3/framework/testutils"
)

var grpc_logger grpc_logsettable.SettableLoggerV2
Expand Down Expand Up @@ -131,6 +133,26 @@ func BeforeTest(t testutil.TB, opts ...TestOption) {
os.Chdir(t.TempDir())
}

func ClientGRPCLoggerObserver(t testutil.TB) *testutils.LogObserver {
level := zapcore.InfoLevel

obCore, logOb := testutils.NewLogObserver(level)

options := zaptest.WrapOptions(
zap.WrapCore(func(oldCore zapcore.Core) zapcore.Core {
return zapcore.NewTee(oldCore, obCore)
}),
)

grpc_logger.Set(
zapgrpc.NewLogger(
zaptest.NewLogger(t, zaptest.Level(level), options).
Named("grpc-observer"),
),
)
return logOb
}

func assertInTestContext(t testutil.TB) {
if !insideTestContext {
t.Errorf("the function can be called only in the test context. Was integration.BeforeTest() called ?")
Expand Down
63 changes: 11 additions & 52 deletions tests/integration/clientv3/concurrency/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,69 +119,28 @@ func TestCreationTimeout(t *testing.T) {
integration2.BeforeTest(t)

// create new cluster
clus := integration2.NewCluster(t, &integration2.ClusterConfig{
Size: 3,
})

eps := []string{clus.Members[0].GRPCURL(), clus.Members[1].GRPCURL(), clus.Members[2].GRPCURL()}
lead := clus.WaitLeader(t)
clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 1})

// create new client
cli, err := integration2.NewClient(t, clientv3.Config{Endpoints: []string{eps[lead]}})
cli, err := integration2.NewClient(t, clientv3.Config{Endpoints: []string{clus.Members[0].GRPCURL()}})
if err != nil {
clus.Terminate(t)
t.Fatal(err)
}
defer cli.Close()

// wait for eps[lead] to be pinned
// ensure the connection is established.
clientv3test.MustWaitPinReady(t, cli)

// add all eps to list, so that when the original pined one fails
// the client can switch to other available eps
cli.SetEndpoints(eps...)

// terminating the cluster
clus.Terminate(t)

_, err = concurrency.NewSession(cli, concurrency.WithCreationTimeout(50 * time.Millisecond))
assert.Equal(t, err, context.DeadlineExceeded)
}

// TestTimeoutDoesntAffectSubsequentConnections checks that the option WithCreationTimeout
// does not set the deadline of the context object that is used for subsequent connections to the server.
// This means after successful session creation, if servers are unavailable, requests are supposed to be
// blocked waiting for the cluster to recover
func TestTimeoutDoesntAffectSubsequentConnections(t *testing.T) {
integration2.BeforeTest(t)

// create new cluster
clus := integration2.NewCluster(t, &integration2.ClusterConfig{
Size: 3,
})
defer clus.Terminate(t)

eps := []string{clus.Members[0].GRPCURL(), clus.Members[1].GRPCURL(), clus.Members[2].GRPCURL()}
lead := clus.WaitLeader(t)
// override the grpc logger
logOb := integration2.ClientGRPCLoggerObserver(t)

// create new client
cli, err := integration2.NewClient(t, clientv3.Config{Endpoints: []string{eps[lead]}})
if err != nil {
t.Fatal(err)
}
defer cli.Close()

// wait for eps[lead] to be pinned
clientv3test.MustWaitPinReady(t, cli)

// add all eps to list, so that when the original pined one fails
// the client can switch to other available eps
cli.SetEndpoints(eps...)

s, err := concurrency.NewSession(cli, concurrency.WithCreationTimeout(50 * time.Millisecond))
if err != nil {
t.Fatal(err)
}
defer s.Close()
_, err = concurrency.NewSession(cli, concurrency.WithCreationTimeout(3000 * time.Millisecond))
assert.Equal(t, err, context.DeadlineExceeded)

_, deadlineIsSet := s.Ctx().Deadline()
assert.Equal(t, deadlineIsSet, false)
_, err = logOb.Expect(context.Background(), "Subchannel Connectivity change to TRANSIENT_FAILURE", 3)
assert.Nil(t, err)
}

0 comments on commit d86cbc5

Please sign in to comment.