From cd6fd758f625fa2215ed63462465835ee2985432 Mon Sep 17 00:00:00 2001 From: AngstyDuck Date: Tue, 8 Aug 2023 22:45:46 +0800 Subject: [PATCH] client: Added new session option to set timeout for session creation When communicating via gRPC, etcd sets the gRPC option WaitForReady = true to minimize client request error responses due to transient failures. But because of this option, the creation of new sessions hang without errors when all nodes could not be contacted by the client. This new session option allows client to receive an error if the nodes cannot be contacted after the specified amount of time. Signed-off-by: AngstyDuck --- client/v3/concurrency/session.go | 37 +++++++++++++++++-- .../clientv3/concurrency/session_test.go | 35 ++++++++++++++++++ 2 files changed, 68 insertions(+), 4 deletions(-) diff --git a/client/v3/concurrency/session.go b/client/v3/concurrency/session.go index 2275e96c972c..c7fd27f85549 100644 --- a/client/v3/concurrency/session.go +++ b/client/v3/concurrency/session.go @@ -40,11 +40,21 @@ type Session struct { // NewSession gets the leased session for a client. func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) { lg := client.GetLogger() - ops := &sessionOptions{ttl: defaultSessionTTL, ctx: client.Ctx()} + ops := &sessionOptions{ + ttl: defaultSessionTTL, + ctx: client.Ctx(), + sessionCreationTimeout: 0, + } for _, opt := range opts { opt(ops, lg) } + var deadlineCancel context.CancelFunc + if ops.sessionCreationTimeout > 0 { + clientDeadline := time.Now().Add(time.Duration(ops.sessionCreationTimeout) * time.Millisecond) + ops.ctx, deadlineCancel = context.WithDeadline(ops.ctx, clientDeadline) + } + id := ops.leaseID if id == v3.NoLease { resp, err := client.Grant(ops.ctx, int64(ops.ttl)) @@ -58,6 +68,9 @@ func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) { keepAlive, err := client.KeepAlive(ctx, id) if err != nil || keepAlive == nil { cancel() + if deadlineCancel != nil { + deadlineCancel() + } return nil, err } @@ -69,6 +82,9 @@ func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) { defer func() { close(donec) cancel() + if deadlineCancel != nil { + deadlineCancel() + } }() for range keepAlive { // eat messages until keep alive channel closes @@ -115,9 +131,10 @@ func (s *Session) Close() error { } type sessionOptions struct { - ttl int - leaseID v3.LeaseID - ctx context.Context + ttl int + leaseID v3.LeaseID + ctx context.Context + sessionCreationTimeout int } // SessionOption configures Session. @@ -135,6 +152,18 @@ func WithTTL(ttl int) SessionOption { } } +// WithSessionCreationTimeout configures the timeout for creating a new session +// in milliseconds. If timeout is <= 0, no timeout will be used. +func WithSessionCreationTimeout(timeout int) SessionOption { + return func(so *sessionOptions, lg *zap.Logger) { + if timeout > 0 { + so.sessionCreationTimeout = timeout + } else { + lg.Warn("WithSessionCreationTimeout(): timeout should be > 0, preserving current timeout", zap.Int64("current-session-timeout", int64(so.sessionCreationTimeout))) + } + } +} + // WithLease specifies the existing leaseID to be used for the session. // This is useful in process restart scenario, for example, to reclaim // leadership from an election prior to restart. diff --git a/tests/integration/clientv3/concurrency/session_test.go b/tests/integration/clientv3/concurrency/session_test.go index b17991179751..f72f229372d2 100644 --- a/tests/integration/clientv3/concurrency/session_test.go +++ b/tests/integration/clientv3/concurrency/session_test.go @@ -20,9 +20,11 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/concurrency" integration2 "go.etcd.io/etcd/tests/v3/framework/integration" + clientv3test "go.etcd.io/etcd/tests/v3/integration/clientv3" ) func TestSessionOptions(t *testing.T) { @@ -111,3 +113,36 @@ func TestSessionCtx(t *testing.T) { } assert.Equal(t, childCtx.Err(), context.Canceled) } + +// TestSessionCreationTimeout checks that the option WithSessionCreationTimeout +// sets a timeout for the creation of new sessions +func TestSessionCreationTimeout(t *testing.T) { + integration2.BeforeTest(t) + + // create new cluster + clus := integration2.NewCluster(t, &integration2.ClusterConfig{ + Size: 3, + }) + + eps := []string{clus.Members[0].GRPCURL(), clus.Members[1].GRPCURL(), clus.Members[2].GRPCURL()} + lead := clus.WaitLeader(t) + + // create new client + cli, err := integration2.NewClient(t, clientv3.Config{Endpoints: []string{eps[lead]}}) + if err != nil { + t.Fatal(err) + } + defer cli.Close() + + // wait for eps[lead] to be pinned + clientv3test.MustWaitPinReady(t, cli) + + // add all eps to list, so that when the original pined one fails + // the client can switch to other available eps + cli.SetEndpoints(eps...) + + clus.Terminate(t) + + _, err = concurrency.NewSession(cli, concurrency.WithSessionCreationTimeout(50)) + require.Error(t, err) +}