Skip to content

Commit

Permalink
client: Added new session option to set timeout for session creation
Browse files Browse the repository at this point in the history
When communicating via gRPC, etcd sets the gRPC option WaitForReady = true
to minimize client request error responses due to transient failures. But
because of this option, the creation of new sessions hang without errors
when all nodes could not be contacted by the client. This new session option
allows client to receive an error if the nodes cannot be contacted after the
specified amount of time.

Signed-off-by: AngstyDuck <[email protected]>
  • Loading branch information
AngstyDuck committed Aug 13, 2023
1 parent def3494 commit 77ec301
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 5 deletions.
34 changes: 29 additions & 5 deletions client/v3/concurrency/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,25 @@ type Session struct {
// NewSession gets the leased session for a client.
func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) {
lg := client.GetLogger()
ops := &sessionOptions{ttl: defaultSessionTTL, ctx: client.Ctx()}
ops := &sessionOptions{
ttl: defaultSessionTTL,
ctx: client.Ctx(),
sessionCreationTimeout: 0,
}
for _, opt := range opts {
opt(ops, lg)
}

var cancel context.CancelFunc
sessionCreationCtx := ops.ctx
if ops.sessionCreationTimeout > 0 {
clientDeadline := time.Now().Add(time.Duration(ops.sessionCreationTimeout) * time.Millisecond)
sessionCreationCtx, cancel = context.WithDeadline(ops.ctx, clientDeadline)
}

id := ops.leaseID
if id == v3.NoLease {
resp, err := client.Grant(ops.ctx, int64(ops.ttl))
resp, err := client.Grant(sessionCreationCtx, int64(ops.ttl))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -115,9 +126,10 @@ func (s *Session) Close() error {
}

type sessionOptions struct {
ttl int
leaseID v3.LeaseID
ctx context.Context
ttl int
leaseID v3.LeaseID
ctx context.Context
sessionCreationTimeout int
}

// SessionOption configures Session.
Expand All @@ -135,6 +147,18 @@ func WithTTL(ttl int) SessionOption {
}
}

// WithSessionCreationTimeout configures the timeout for creating a new session
// in milliseconds. If timeout is <= 0, no timeout will be used.
func WithSessionCreationTimeout(timeout int) SessionOption {
return func(so *sessionOptions, lg *zap.Logger) {
if timeout > 0 {
so.sessionCreationTimeout = timeout
} else {
lg.Warn("WithSessionCreationTimeout(): timeout should be > 0, preserving current timeout", zap.Int64("current-session-timeout", int64(so.sessionCreationTimeout)))
}
}
}

// WithLease specifies the existing leaseID to be used for the session.
// This is useful in process restart scenario, for example, to reclaim
// leadership from an election prior to restart.
Expand Down
80 changes: 80 additions & 0 deletions tests/integration/clientv3/concurrency/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
integration2 "go.etcd.io/etcd/tests/v3/framework/integration"
clientv3test "go.etcd.io/etcd/tests/v3/integration/clientv3"
)

func TestSessionOptions(t *testing.T) {
Expand Down Expand Up @@ -111,3 +112,82 @@ func TestSessionCtx(t *testing.T) {
}
assert.Equal(t, childCtx.Err(), context.Canceled)
}

// TestSessionCreationTimeout checks that the option WithSessionCreationTimeout
// sets a timeout for the creation of new sessions
func TestSessionCreationTimeout(t *testing.T) {
integration2.BeforeTest(t)

// create new cluster
clus := integration2.NewCluster(t, &integration2.ClusterConfig{
Size: 3,
})

eps := []string{clus.Members[0].GRPCURL(), clus.Members[1].GRPCURL(), clus.Members[2].GRPCURL()}
lead := clus.WaitLeader(t)

// create new client
cli, err := integration2.NewClient(t, clientv3.Config{Endpoints: []string{eps[lead]}})
if err != nil {
t.Fatal(err)
}
defer cli.Close()

// wait for eps[lead] to be pinned
clientv3test.MustWaitPinReady(t, cli)

// add all eps to list, so that when the original pined one fails
// the client can switch to other available eps
cli.SetEndpoints(eps...)

clus.Terminate(t)

_, err = concurrency.NewSession(cli, concurrency.WithSessionCreationTimeout(50))
assert.Equal(t, err, context.DeadlineExceeded)
}

// TestTimeoutDoesntAffectSubsequentConnections checks that the option WithSessionCreationTimeout
// does not set the timeout of subsequent connections to the server. This means after successful
// session creation, if servers are unavailable, requests would be blocked waiting for the
// cluster to recover
func TestTimeoutDoesntAffectSubsequentConnections(t *testing.T) {
integration2.BeforeTest(t)

// create new cluster
clus := integration2.NewCluster(t, &integration2.ClusterConfig{
Size: 3,
})

eps := []string{clus.Members[0].GRPCURL(), clus.Members[1].GRPCURL(), clus.Members[2].GRPCURL()}
lead := clus.WaitLeader(t)

// create new client
cli, err := integration2.NewClient(t, clientv3.Config{Endpoints: []string{eps[lead]}})
if err != nil {
t.Fatal(err)
}
defer cli.Close()

// wait for eps[lead] to be pinned
clientv3test.MustWaitPinReady(t, cli)

// add all eps to list, so that when the original pined one fails
// the client can switch to other available eps
cli.SetEndpoints(eps...)

s, err := concurrency.NewSession(cli, concurrency.WithSessionCreationTimeout(50))
if err != nil {
t.Fatal(err)
}
defer s.Close()

clus.Terminate(t)
key := "sample_key"
value := "sample_value"
_, err = cli.Put(s.Ctx(), key, value, clientv3.WithLease(s.Lease()))

// assert that the request is cancelled because the test case timed out
// from the infinite hanging of the request, instead of the request
// itself timing out
assert.Equal(t, err, context.Canceled)
}

0 comments on commit 77ec301

Please sign in to comment.