Skip to content

Commit

Permalink
client: Added new session option to set timeout for session creation
Browse files Browse the repository at this point in the history
When communicating via gRPC, etcd sets the gRPC option WaitForReady = true
to minimize client request error responses due to transient failures. But
because of this option, the creation of new sessions hang without errors
when all nodes could not be contacted by the client. This new session option
allows client to receive an error if the nodes cannot be contacted after the
specified amount of time.

Signed-off-by: AngstyDuck <[email protected]>
  • Loading branch information
AngstyDuck committed Aug 9, 2023
1 parent def3494 commit ffe5ef5
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 4 deletions.
31 changes: 27 additions & 4 deletions client/v3/concurrency/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,21 @@ type Session struct {
// NewSession gets the leased session for a client.
func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) {
lg := client.GetLogger()
ops := &sessionOptions{ttl: defaultSessionTTL, ctx: client.Ctx()}
ops := &sessionOptions{
ttl: defaultSessionTTL,
ctx: client.Ctx(),
sessionCreationTimeout: 0,
}
for _, opt := range opts {
opt(ops, lg)
}

var cancel context.CancelFunc
if ops.sessionCreationTimeout > 0 {
clientDeadline := time.Now().Add(time.Duration(ops.sessionCreationTimeout) * time.Millisecond)
ops.ctx, cancel = context.WithDeadline(ops.ctx, clientDeadline)
}

id := ops.leaseID
if id == v3.NoLease {
resp, err := client.Grant(ops.ctx, int64(ops.ttl))
Expand Down Expand Up @@ -115,9 +125,10 @@ func (s *Session) Close() error {
}

type sessionOptions struct {
ttl int
leaseID v3.LeaseID
ctx context.Context
ttl int
leaseID v3.LeaseID
ctx context.Context
sessionCreationTimeout int
}

// SessionOption configures Session.
Expand All @@ -135,6 +146,18 @@ func WithTTL(ttl int) SessionOption {
}
}

// WithSessionCreationTimeout configures the timeout for creating a new session
// in milliseconds. If timeout is <= 0, no timeout will be used.
func WithSessionCreationTimeout(timeout int) SessionOption {
return func(so *sessionOptions, lg *zap.Logger) {
if timeout > 0 {
so.sessionCreationTimeout = timeout
} else {
lg.Warn("WithSessionCreationTimeout(): timeout should be > 0, preserving current timeout", zap.Int64("current-session-timeout", int64(so.sessionCreationTimeout)))
}
}
}

// WithLease specifies the existing leaseID to be used for the session.
// This is useful in process restart scenario, for example, to reclaim
// leadership from an election prior to restart.
Expand Down
35 changes: 35 additions & 0 deletions tests/integration/clientv3/concurrency/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
integration2 "go.etcd.io/etcd/tests/v3/framework/integration"
clientv3test "go.etcd.io/etcd/tests/v3/integration/clientv3"
)

func TestSessionOptions(t *testing.T) {
Expand Down Expand Up @@ -111,3 +113,36 @@ func TestSessionCtx(t *testing.T) {
}
assert.Equal(t, childCtx.Err(), context.Canceled)
}

// TestSessionCreationTimeout checks that the option WithSessionCreationTimeout
// sets a timeout for the creation of new sessions
func TestSessionCreationTimeout(t *testing.T) {
integration2.BeforeTest(t)

// create new cluster
clus := integration2.NewCluster(t, &integration2.ClusterConfig{
Size: 3,
})

eps := []string{clus.Members[0].GRPCURL(), clus.Members[1].GRPCURL(), clus.Members[2].GRPCURL()}
lead := clus.WaitLeader(t)

// create new client
cli, err := integration2.NewClient(t, clientv3.Config{Endpoints: []string{eps[lead]}})
if err != nil {
t.Fatal(err)
}
defer cli.Close()

// wait for eps[lead] to be pinned
clientv3test.MustWaitPinReady(t, cli)

// add all eps to list, so that when the original pined one fails
// the client can switch to other available eps
cli.SetEndpoints(eps...)

clus.Terminate(t)

_, err = concurrency.NewSession(cli, concurrency.WithSessionCreationTimeout(50))
require.Error(t, err)
}

0 comments on commit ffe5ef5

Please sign in to comment.