Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 40 additions & 16 deletions internal/xds/xdsclient/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type Pool struct {
// config.
mu sync.Mutex
clients map[string]*clientImpl
fallbackConfig *bootstrap.Config
fallbackConfig *bootstrap.Config // TODO(i/8661): remove fallbackConfig.
// getConfiguration is a sync.OnceValues that attempts to read the bootstrap
// configuration from environment variables once.
getConfiguration func() (*bootstrap.Config, error)
Expand All @@ -73,6 +73,11 @@ type OptionsForTesting struct {
// MetricsRecorder is the metrics recorder the xDS Client will use. If
// unspecified, uses a no-op MetricsRecorder.
MetricsRecorder estats.MetricsRecorder

// Config is the xDS bootstrap configuration that will be used to initialize
// the client. If unset, the client will use the config provided by env
// variables.
Config *bootstrap.Config
}

// NewPool creates a new xDS client pool with the given bootstrap config.
Expand All @@ -91,6 +96,17 @@ func NewPool(config *bootstrap.Config) *Pool {
}
}

// NewClientWithConfig returns an xDS client with the given name from the pool. If the
// client doesn't already exist, it creates a new xDS client and adds it to the
// pool.
//
// The second return value represents a close function which the caller is
// expected to invoke once they are done using the client. It is safe for the
// caller to invoke this close function multiple times.
func (p *Pool) NewClientWithConfig(name string, metricsRecorder estats.MetricsRecorder, config *bootstrap.Config) (XDSClient, func(), error) {
return p.newRefCounted(name, metricsRecorder, defaultWatchExpiryTimeout, config)
}

// NewClient returns an xDS client with the given name from the pool. If the
// client doesn't already exist, it creates a new xDS client and adds it to the
// pool.
Expand All @@ -99,7 +115,7 @@ func NewPool(config *bootstrap.Config) *Pool {
// expected to invoke once they are done using the client. It is safe for the
// caller to invoke this close function multiple times.
func (p *Pool) NewClient(name string, metricsRecorder estats.MetricsRecorder) (XDSClient, func(), error) {
return p.newRefCounted(name, metricsRecorder, defaultWatchExpiryTimeout)
return p.newRefCounted(name, metricsRecorder, defaultWatchExpiryTimeout, nil)
}

// NewClientForTesting returns an xDS client configured with the provided
Expand All @@ -126,7 +142,7 @@ func (p *Pool) NewClientForTesting(opts OptionsForTesting) (XDSClient, func(), e
if opts.MetricsRecorder == nil {
opts.MetricsRecorder = istats.NewMetricsRecorderList(nil)
}
c, cancel, err := p.newRefCounted(opts.Name, opts.MetricsRecorder, opts.WatchExpiryTimeout)
c, cancel, err := p.newRefCounted(opts.Name, opts.MetricsRecorder, opts.WatchExpiryTimeout, opts.Config)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -159,6 +175,7 @@ func (p *Pool) GetClientForTesting(name string) (XDSClient, func(), error) {
// SetFallbackBootstrapConfig is used to specify a bootstrap configuration
// that will be used as a fallback when the bootstrap environment variables
// are not defined.
// TODO(i/8661): remove SetFallbackBootstrapConfig function.
func (p *Pool) SetFallbackBootstrapConfig(config *bootstrap.Config) {
p.mu.Lock()
defer p.mu.Unlock()
Expand Down Expand Up @@ -198,6 +215,7 @@ func (p *Pool) BootstrapConfigForTesting() *bootstrap.Config {
if cfg != nil {
return cfg
}
// TODO(i/8661)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry if I wasn't clear in my previous comment. We don't need a TODO on every line where this field is referenced. Just one TODO where the field is defined should be good enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made the changes.

return p.fallbackConfig
}

Expand All @@ -208,6 +226,7 @@ func (p *Pool) BootstrapConfigForTesting() *bootstrap.Config {
func (p *Pool) UnsetBootstrapConfigForTesting() {
p.mu.Lock()
defer p.mu.Unlock()
// TODO(i/8661)
p.fallbackConfig = nil
p.getConfiguration = sync.OnceValues(bootstrap.GetConfiguration)
}
Expand Down Expand Up @@ -251,30 +270,35 @@ func (p *Pool) clientRefCountedClose(name string) {
// newRefCounted creates a new reference counted xDS client implementation for
// name, if one does not exist already. If an xDS client for the given name
// exists, it gets a reference to it and returns it.
func (p *Pool) newRefCounted(name string, metricsRecorder estats.MetricsRecorder, watchExpiryTimeout time.Duration) (*clientImpl, func(), error) {
func (p *Pool) newRefCounted(name string, metricsRecorder estats.MetricsRecorder, watchExpiryTimeout time.Duration, bConfig *bootstrap.Config) (*clientImpl, func(), error) {
p.mu.Lock()
defer p.mu.Unlock()

config, err := p.getConfiguration()
if err != nil {
return nil, nil, fmt.Errorf("xds: failed to read xDS bootstrap config from env vars: %v", err)
if c := p.clients[name]; c != nil {
c.incrRef()
return c, sync.OnceFunc(func() { p.clientRefCountedClose(name) }), nil
}

config := bConfig
if config == nil {
// If the environment variables are not set, then fallback bootstrap
// configuration should be set before attempting to create an xDS client,
// else xDS client creation will fail.
config = p.fallbackConfig
var err error
config, err = p.getConfiguration()
if err != nil {
return nil, nil, fmt.Errorf("xds: failed to read xDS bootstrap config from env vars: %v", err)
}
if config == nil {
// TODO(i/8661)
// If the environment variables are not set, then fallback bootstrap
// configuration should be set before attempting to create an xDS client,
// else xDS client creation will fail.
config = p.fallbackConfig
}
}

if config == nil {
return nil, nil, fmt.Errorf("failed to read xDS bootstrap config from env vars: bootstrap environment variables (%q or %q) not defined and fallback config not set", envconfig.XDSBootstrapFileNameEnv, envconfig.XDSBootstrapFileContentEnv)
}

if c := p.clients[name]; c != nil {
c.incrRef()
return c, sync.OnceFunc(func() { p.clientRefCountedClose(name) }), nil
}

c, err := newClientImpl(config, metricsRecorder, name, watchExpiryTimeout)
if err != nil {
return nil, nil, err
Expand Down
30 changes: 28 additions & 2 deletions xds/googledirectpath/googlec2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,16 @@ func getXdsServerURI() string {
return fmt.Sprintf("dns:///directpath-pa.%s", universeDomain)
}

type c2pResolverWrapper struct {
resolver.Resolver
cancel func()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a trailing comment to this line saying this cancel func is used to release the ref to the xDS client that we created in Build

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intended for the comment to be here, where the field is defined and not where it is used in Close.

}

func (r *c2pResolverWrapper) Close() {
r.Resolver.Close()
r.cancel() // Release the reference to the xDS client that was created in Build().
}

type c2pResolverBuilder struct{}

func (c2pResolverBuilder) Build(t resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
Expand Down Expand Up @@ -161,7 +171,6 @@ func (c2pResolverBuilder) Build(t resolver.Target, cc resolver.ClientConn, opts
if err != nil {
return nil, fmt.Errorf("failed to parse bootstrap contents: %s, %v", string(cfgJSON), err)
}
xdsClientPool.SetFallbackBootstrapConfig(config)

t = resolver.Target{
URL: url.URL{
Expand All @@ -170,7 +179,24 @@ func (c2pResolverBuilder) Build(t resolver.Target, cc resolver.ClientConn, opts
Path: t.URL.Path,
},
}
return resolver.Get(xdsName).Build(t, cc, opts)

// Create a new xDS client for this target using the provided bootstrap
// configuration. This client is stored in the xdsclient pool’s internal
// cache, keeping it alive and associated with this resolver until Closed().
// While the c2p resolver itself does not directly use the client, creating
// it ensures that when the xDS resolver later requests a client for the
// same target, the existing instance will be reused.
_, cancel, err := xdsClientPool.NewClientWithConfig(t.String(), opts.MetricsRecorder, config)
if err != nil {
return nil, fmt.Errorf("failed to create xds client: %v", err)
}

r, err := resolver.Get(xdsName).Build(t, cc, opts)
if err != nil {
cancel()
return nil, err
}
return &c2pResolverWrapper{Resolver: r, cancel: cancel}, nil
}

func (b c2pResolverBuilder) Scheme() string {
Expand Down
Loading