diff --git a/client/v3/concurrency/session.go b/client/v3/concurrency/session.go index 2275e96c972c..e95ea6388489 100644 --- a/client/v3/concurrency/session.go +++ b/client/v3/concurrency/session.go @@ -40,11 +40,21 @@ type Session struct { // NewSession gets the leased session for a client. func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) { lg := client.GetLogger() - ops := &sessionOptions{ttl: defaultSessionTTL, ctx: client.Ctx()} + ops := &sessionOptions{ + ttl: defaultSessionTTL, + ctx: client.Ctx(), + sessionCreationTimeout: 0, + } for _, opt := range opts { opt(ops, lg) } + var cancel context.CancelFunc + if ops.sessionCreationTimeout > 0 { + clientDeadline := time.Now().Add(time.Duration(ops.sessionCreationTimeout) * time.Millisecond) + ops.ctx, cancel = context.WithDeadline(ops.ctx, clientDeadline) + } + id := ops.leaseID if id == v3.NoLease { resp, err := client.Grant(ops.ctx, int64(ops.ttl)) @@ -115,9 +125,10 @@ func (s *Session) Close() error { } type sessionOptions struct { - ttl int - leaseID v3.LeaseID - ctx context.Context + ttl int + leaseID v3.LeaseID + ctx context.Context + sessionCreationTimeout int } // SessionOption configures Session. @@ -135,6 +146,18 @@ func WithTTL(ttl int) SessionOption { } } +// WithSessionCreationTimeout configures the timeout for creating a new session +// in milliseconds. If timeout is <= 0, no timeout will be used. +func WithSessionCreationTimeout(timeout int) SessionOption { + return func(so *sessionOptions, lg *zap.Logger) { + if timeout > 0 { + so.sessionCreationTimeout = timeout + } else { + lg.Warn("WithSessionCreationTimeout(): timeout should be > 0, preserving current timeout", zap.Int64("current-session-timeout", int64(so.sessionCreationTimeout))) + } + } +} + // WithLease specifies the existing leaseID to be used for the session. // This is useful in process restart scenario, for example, to reclaim // leadership from an election prior to restart. diff --git a/tests/integration/clientv3/concurrency/session_test.go b/tests/integration/clientv3/concurrency/session_test.go index b17991179751..f72f229372d2 100644 --- a/tests/integration/clientv3/concurrency/session_test.go +++ b/tests/integration/clientv3/concurrency/session_test.go @@ -20,9 +20,11 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/concurrency" integration2 "go.etcd.io/etcd/tests/v3/framework/integration" + clientv3test "go.etcd.io/etcd/tests/v3/integration/clientv3" ) func TestSessionOptions(t *testing.T) { @@ -111,3 +113,36 @@ func TestSessionCtx(t *testing.T) { } assert.Equal(t, childCtx.Err(), context.Canceled) } + +// TestSessionCreationTimeout checks that the option WithSessionCreationTimeout +// sets a timeout for the creation of new sessions +func TestSessionCreationTimeout(t *testing.T) { + integration2.BeforeTest(t) + + // create new cluster + clus := integration2.NewCluster(t, &integration2.ClusterConfig{ + Size: 3, + }) + + eps := []string{clus.Members[0].GRPCURL(), clus.Members[1].GRPCURL(), clus.Members[2].GRPCURL()} + lead := clus.WaitLeader(t) + + // create new client + cli, err := integration2.NewClient(t, clientv3.Config{Endpoints: []string{eps[lead]}}) + if err != nil { + t.Fatal(err) + } + defer cli.Close() + + // wait for eps[lead] to be pinned + clientv3test.MustWaitPinReady(t, cli) + + // add all eps to list, so that when the original pined one fails + // the client can switch to other available eps + cli.SetEndpoints(eps...) + + clus.Terminate(t) + + _, err = concurrency.NewSession(cli, concurrency.WithSessionCreationTimeout(50)) + require.Error(t, err) +}