Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 41 additions & 16 deletions internal/xds/xdsclient/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ type OptionsForTesting struct {
// MetricsRecorder is the metrics recorder the xDS Client will use. If
// unspecified, uses a no-op MetricsRecorder.
MetricsRecorder estats.MetricsRecorder

// Config is the xDS bootstrap configuration that will be used to initialize
// the client. If unset, the client will use the config provided by env
// variables.
Config *bootstrap.Config
}

// NewPool creates a new xDS client pool with the given bootstrap config.
Expand All @@ -91,6 +96,17 @@ func NewPool(config *bootstrap.Config) *Pool {
}
}

// NewClientWithConfig returns an xDS client with the given name from the pool. If the
// client doesn't already exist, it creates a new xDS client and adds it to the
// pool.
//
// The second return value represents a close function which the caller is
// expected to invoke once they are done using the client. It is safe for the
// caller to invoke this close function multiple times.
func (p *Pool) NewClientWithConfig(name string, metricsRecorder estats.MetricsRecorder, config *bootstrap.Config) (XDSClient, func(), error) {
return p.newRefCounted(name, metricsRecorder, defaultWatchExpiryTimeout, config)
}

// NewClient returns an xDS client with the given name from the pool. If the
// client doesn't already exist, it creates a new xDS client and adds it to the
// pool.
Expand All @@ -99,7 +115,7 @@ func NewPool(config *bootstrap.Config) *Pool {
// expected to invoke once they are done using the client. It is safe for the
// caller to invoke this close function multiple times.
func (p *Pool) NewClient(name string, metricsRecorder estats.MetricsRecorder) (XDSClient, func(), error) {
return p.newRefCounted(name, metricsRecorder, defaultWatchExpiryTimeout)
return p.newRefCounted(name, metricsRecorder, defaultWatchExpiryTimeout, nil)
}

// NewClientForTesting returns an xDS client configured with the provided
Expand All @@ -126,7 +142,7 @@ func (p *Pool) NewClientForTesting(opts OptionsForTesting) (XDSClient, func(), e
if opts.MetricsRecorder == nil {
opts.MetricsRecorder = istats.NewMetricsRecorderList(nil)
}
c, cancel, err := p.newRefCounted(opts.Name, opts.MetricsRecorder, opts.WatchExpiryTimeout)
c, cancel, err := p.newRefCounted(opts.Name, opts.MetricsRecorder, opts.WatchExpiryTimeout, opts.Config)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -251,30 +267,39 @@ func (p *Pool) clientRefCountedClose(name string) {
// newRefCounted creates a new reference counted xDS client implementation for
// name, if one does not exist already. If an xDS client for the given name
// exists, it gets a reference to it and returns it.
func (p *Pool) newRefCounted(name string, metricsRecorder estats.MetricsRecorder, watchExpiryTimeout time.Duration) (*clientImpl, func(), error) {
func (p *Pool) newRefCounted(name string, metricsRecorder estats.MetricsRecorder, watchExpiryTimeout time.Duration, bConfig *bootstrap.Config) (*clientImpl, func(), error) {
p.mu.Lock()
defer p.mu.Unlock()

config, err := p.getConfiguration()
if err != nil {
return nil, nil, fmt.Errorf("xds: failed to read xDS bootstrap config from env vars: %v", err)
if c := p.clients[name]; c != nil {
c.incrRef()
return c, sync.OnceFunc(func() { p.clientRefCountedClose(name) }), nil
}

if config == nil {
// If the environment variables are not set, then fallback bootstrap
// configuration should be set before attempting to create an xDS client,
// else xDS client creation will fail.
config = p.fallbackConfig
var (
config *bootstrap.Config
err error
)

if bConfig != nil {
config = bConfig
} else {
config, err = p.getConfiguration()
if err != nil {
return nil, nil, fmt.Errorf("xds: failed to read xDS bootstrap config from env vars: %v", err)
}
if config == nil {
// If the environment variables are not set, then fallback bootstrap
// configuration should be set before attempting to create an xDS client,
// else xDS client creation will fail.
config = p.fallbackConfig
}
}

if config == nil {
return nil, nil, fmt.Errorf("failed to read xDS bootstrap config from env vars: bootstrap environment variables (%q or %q) not defined and fallback config not set", envconfig.XDSBootstrapFileNameEnv, envconfig.XDSBootstrapFileContentEnv)
}

if c := p.clients[name]; c != nil {
c.incrRef()
return c, sync.OnceFunc(func() { p.clientRefCountedClose(name) }), nil
}

c, err := newClientImpl(config, metricsRecorder, name, watchExpiryTimeout)
if err != nil {
return nil, nil, err
Expand Down
23 changes: 21 additions & 2 deletions xds/googledirectpath/googlec2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,16 @@ func getXdsServerURI() string {
return fmt.Sprintf("dns:///directpath-pa.%s", universeDomain)
}

type c2pResolverWrapper struct {
resolver.Resolver
cancel func()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a trailing comment to this line saying this cancel func is used to release the ref to the xDS client that we created in Build

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intended for the comment to be here, where the field is defined and not where it is used in Close.

}

func (r *c2pResolverWrapper) Close() {
r.Resolver.Close()
r.cancel()
}

type c2pResolverBuilder struct{}

func (c2pResolverBuilder) Build(t resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
Expand Down Expand Up @@ -161,7 +171,6 @@ func (c2pResolverBuilder) Build(t resolver.Target, cc resolver.ClientConn, opts
if err != nil {
return nil, fmt.Errorf("failed to parse bootstrap contents: %s, %v", string(cfgJSON), err)
}
xdsClientPool.SetFallbackBootstrapConfig(config)

t = resolver.Target{
URL: url.URL{
Expand All @@ -170,7 +179,17 @@ func (c2pResolverBuilder) Build(t resolver.Target, cc resolver.ClientConn, opts
Path: t.URL.Path,
},
}
return resolver.Get(xdsName).Build(t, cc, opts)
_, cancel, err := xdsClientPool.NewClientWithConfig(t.String(), opts.MetricsRecorder, config)
if err != nil {
return nil, fmt.Errorf("failed to create xds client: %v", err)
}

r, err := resolver.Get(xdsName).Build(t, cc, opts)
if err != nil {
cancel()
return nil, err
}
return &c2pResolverWrapper{Resolver: r, cancel: cancel}, nil
}

func (b c2pResolverBuilder) Scheme() string {
Expand Down
59 changes: 42 additions & 17 deletions xds/googledirectpath/googlec2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package googledirectpath
import (
"context"
"encoding/json"
"net/url"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -155,9 +156,9 @@ func (s) TestBuildWithBootstrapEnvSet(t *testing.T) {
}
defer r.Close()

// Build should return xDS, not DNS.
if r != testXDSResolver {
t.Fatalf("Build() returned %#v, want xds resolver", r)
// Build should return wrapped xDS resolver, not DNS.
if r, ok := r.(*c2pResolverWrapper); !ok || r.Resolver != testXDSResolver {
t.Fatalf("Build() returned %#v, want c2pResolverWrapper", r)
}
})
}
Expand Down Expand Up @@ -314,18 +315,26 @@ func (s) TestBuildXDS(t *testing.T) {
defer func() { getIPv6Capable = oldGetIPv6Capability }()

// Build the google-c2p resolver.
r, err := builder.Build(resolver.Target{}, nil, resolver.BuildOptions{})
target := resolver.Target{URL: url.URL{Scheme: c2pScheme, Path: "test-path"}}
r, err := builder.Build(target, nil, resolver.BuildOptions{})
if err != nil {
t.Fatalf("failed to build resolver: %v", err)
}
defer r.Close()

// Build should return xDS, not DNS.
if r != testXDSResolver {
t.Fatalf("Build() returned %#v, want xds resolver", r)
// Build should return wrapped xDS resolver, not DNS.
if r, ok := r.(*c2pResolverWrapper); !ok || r.Resolver != testXDSResolver {
t.Fatalf("Build() returned %#v, want c2pResolverWrapper", r)
}

xdsTarget := resolver.Target{URL: url.URL{Scheme: xdsName, Host: c2pAuthority, Path: target.URL.Path}}
client, close, err := xdsClientPool.GetClientForTesting(xdsTarget.String())
if err != nil {
t.Fatalf("Failed to get xds client: %v", err)
}
defer close()

gotConfig := xdsClientPool.BootstrapConfigForTesting()
gotConfig := client.BootstrapConfig()
if gotConfig == nil {
t.Fatalf("Failed to get bootstrap config: %v", err)
}
Expand Down Expand Up @@ -415,18 +424,26 @@ func (s) TestSetUniverseDomainNonDefault(t *testing.T) {
defer func() { xdsClientPool = oldXdsClientPool }()

// Build the google-c2p resolver.
r, err := builder.Build(resolver.Target{}, nil, resolver.BuildOptions{})
target := resolver.Target{URL: url.URL{Scheme: c2pScheme, Path: "test-path"}}
r, err := builder.Build(target, nil, resolver.BuildOptions{})
if err != nil {
t.Fatalf("failed to build resolver: %v", err)
}
defer r.Close()

// Build should return xDS, not DNS.
if r != testXDSResolver {
t.Fatalf("Build() returned %#v, want xds resolver", r)
// Build should return wrapped xDS resolver, not DNS.
if r, ok := r.(*c2pResolverWrapper); !ok || r.Resolver != testXDSResolver {
t.Fatalf("Build() returned %#v, want c2pResolverWrapper", r)
}

gotConfig := xdsClientPool.BootstrapConfigForTesting()
xdsTarget := resolver.Target{URL: url.URL{Scheme: xdsName, Host: c2pAuthority, Path: target.URL.Path}}
client, close, err := xdsClientPool.GetClientForTesting(xdsTarget.String())
if err != nil {
t.Fatalf("Failed to get xds client: %v", err)
}
defer close()

gotConfig := client.BootstrapConfig()
if gotConfig == nil {
t.Fatalf("Failed to get bootstrap config: %v", err)
}
Expand Down Expand Up @@ -484,18 +501,26 @@ func (s) TestDefaultUniverseDomain(t *testing.T) {
defer func() { xdsClientPool = oldXdsClientPool }()

// Build the google-c2p resolver.
r, err := builder.Build(resolver.Target{}, nil, resolver.BuildOptions{})
target := resolver.Target{URL: url.URL{Scheme: c2pScheme, Path: "test-path"}}
r, err := builder.Build(target, nil, resolver.BuildOptions{})
if err != nil {
t.Fatalf("failed to build resolver: %v", err)
}
defer r.Close()

// Build should return xDS, not DNS.
if r != testXDSResolver {
t.Fatalf("Build() returned %#v, want xds resolver", r)
if r, ok := r.(*c2pResolverWrapper); !ok || r.Resolver != testXDSResolver {
t.Fatalf("Build() returned %#v, want c2pResolverWrapper", r)
}

xdsTarget := resolver.Target{URL: url.URL{Scheme: xdsName, Host: c2pAuthority, Path: target.URL.Path}}
client, close, err := xdsClientPool.GetClientForTesting(xdsTarget.String())
if err != nil {
t.Fatalf("Failed to get xds client: %v", err)
}
defer close()

gotConfig := xdsClientPool.BootstrapConfigForTesting()
gotConfig := client.BootstrapConfig()
if gotConfig == nil {
t.Fatalf("Failed to get bootstrap config: %v", err)
}
Expand Down
Loading