diff --git a/internal/xds/balancer/cdsbalancer/cdsbalancer.go b/internal/xds/balancer/cdsbalancer/cdsbalancer.go index 0ad77b85e9f1..a94fc664fa4a 100644 --- a/internal/xds/balancer/cdsbalancer/cdsbalancer.go +++ b/internal/xds/balancer/cdsbalancer/cdsbalancer.go @@ -448,7 +448,7 @@ func (b *cdsBalancer) annotateErrorWithNodeID(err error) error { // graph is resolved, generates child policy config and pushes it down. // // Only executed in the context of a serializer callback. -func (b *cdsBalancer) onClusterUpdate(name string, update xdsresource.ClusterUpdate) { +func (b *cdsBalancer) onClusterUpdate(name string, update *xdsresource.ClusterUpdate) { state := b.watchers[name] if state == nil { // We are currently not watching this cluster anymore. Return early. @@ -458,7 +458,7 @@ func (b *cdsBalancer) onClusterUpdate(name string, update xdsresource.ClusterUpd b.logger.Infof("Received Cluster resource: %s", pretty.ToJSON(update)) // Update the watchers map with the update for the cluster. - state.lastUpdate = &update + state.lastUpdate = update // For an aggregate cluster, always use the security configuration on the // root cluster. diff --git a/internal/xds/balancer/cdsbalancer/cluster_watcher.go b/internal/xds/balancer/cdsbalancer/cluster_watcher.go index dd702b125b32..355923964c36 100644 --- a/internal/xds/balancer/cdsbalancer/cluster_watcher.go +++ b/internal/xds/balancer/cdsbalancer/cluster_watcher.go @@ -32,8 +32,9 @@ type clusterWatcher struct { parent *cdsBalancer } -func (cw *clusterWatcher) ResourceChanged(u *xdsresource.ClusterResourceData, onDone func()) { - handleUpdate := func(context.Context) { cw.parent.onClusterUpdate(cw.name, u.Resource); onDone() } +func (cw *clusterWatcher) ResourceChanged(u *xdsresource.ClusterUpdate, onDone func()) { + handleUpdate := func(context.Context) { cw.parent.onClusterUpdate(cw.name, u); onDone() } + cw.parent.serializer.ScheduleOr(handleUpdate, onDone) } diff --git a/internal/xds/balancer/clusterresolver/resource_resolver_eds.go b/internal/xds/balancer/clusterresolver/resource_resolver_eds.go index 18b517f111d9..6dcdb898e5fb 100644 --- a/internal/xds/balancer/clusterresolver/resource_resolver_eds.go +++ b/internal/xds/balancer/clusterresolver/resource_resolver_eds.go @@ -76,14 +76,14 @@ func newEDSResolver(nameToWatch string, producer xdsresource.Producer, topLevelR } // ResourceChanged is invoked to report an update for the resource being watched. -func (er *edsDiscoveryMechanism) ResourceChanged(update *xdsresource.EndpointsResourceData, onDone func()) { +func (er *edsDiscoveryMechanism) ResourceChanged(update *xdsresource.EndpointsUpdate, onDone func()) { if er.stopped.HasFired() { onDone() return } er.mu.Lock() - er.update = &update.Resource + er.update = update er.mu.Unlock() er.topLevelResolver.onUpdate(onDone) diff --git a/internal/xds/resolver/watch_service.go b/internal/xds/resolver/watch_service.go index 43ff23b7ca4c..3fb122b001c0 100644 --- a/internal/xds/resolver/watch_service.go +++ b/internal/xds/resolver/watch_service.go @@ -68,9 +68,9 @@ func newRouteConfigWatcher(resourceName string, parent *xdsResolver) *routeConfi return rw } -func (r *routeConfigWatcher) ResourceChanged(u *xdsresource.RouteConfigResourceData, onDone func()) { +func (r *routeConfigWatcher) ResourceChanged(u *xdsresource.RouteConfigUpdate, onDone func()) { handleUpdate := func(context.Context) { - r.parent.onRouteConfigResourceUpdate(r.resourceName, u.Resource) + r.parent.onRouteConfigResourceUpdate(r.resourceName, u) onDone() } r.parent.serializer.ScheduleOr(handleUpdate, onDone) diff --git a/internal/xds/resolver/xds_resolver.go b/internal/xds/resolver/xds_resolver.go index 9180cd076899..4662e7b7396a 100644 --- a/internal/xds/resolver/xds_resolver.go +++ b/internal/xds/resolver/xds_resolver.go @@ -242,7 +242,7 @@ type xdsResolver struct { rdsResourceName string routeConfigWatcher *routeConfigWatcher routeConfigUpdateRecvd bool - currentRouteConfig xdsresource.RouteConfigUpdate + currentRouteConfig *xdsresource.RouteConfigUpdate currentVirtualHost *xdsresource.VirtualHost // Matched virtual host for quick access. // activeClusters is a map from cluster name to information about the @@ -461,7 +461,7 @@ func (r *xdsResolver) onResolutionComplete() { r.curConfigSelector = cs } -func (r *xdsResolver) applyRouteConfigUpdate(update xdsresource.RouteConfigUpdate) { +func (r *xdsResolver) applyRouteConfigUpdate(update *xdsresource.RouteConfigUpdate) { matchVh := xdsresource.FindBestMatchingVirtualHost(r.dataplaneAuthority, update.VirtualHosts) if matchVh == nil { // TODO(purnesh42h): Should this be a resource or ambient error? Note @@ -527,7 +527,7 @@ func (r *xdsResolver) onListenerResourceUpdate(update *xdsresource.ListenerUpdat r.routeConfigWatcher = nil } - r.applyRouteConfigUpdate(*update.InlineRouteConfig) + r.applyRouteConfigUpdate(update.InlineRouteConfig) return } @@ -580,7 +580,7 @@ func (r *xdsResolver) onListenerResourceError(err error) { } // Only executed in the context of a serializer callback. -func (r *xdsResolver) onRouteConfigResourceUpdate(name string, update xdsresource.RouteConfigUpdate) { +func (r *xdsResolver) onRouteConfigResourceUpdate(name string, update *xdsresource.RouteConfigUpdate) { if r.logger.V(2) { r.logger.Infof("Received update for RouteConfiguration resource %q: %v", name, pretty.ToJSON(update)) } diff --git a/internal/xds/server/listener_wrapper.go b/internal/xds/server/listener_wrapper.go index 49cfdb635a20..15f6c17c3a5f 100644 --- a/internal/xds/server/listener_wrapper.go +++ b/internal/xds/server/listener_wrapper.go @@ -59,8 +59,7 @@ type ServingModeCallback func(addr net.Addr, mode connectivity.ServingMode, err // XDSClient wraps the methods on the XDSClient which are required by // the listenerWrapper. type XDSClient interface { - WatchResource(rType xdsresource.Type, resourceName string, watcher xdsresource.ResourceWatcher) (cancel func()) - WatchResourceV2(typeURL, resourceName string, watcher xdsclient.ResourceWatcher) (cancel func()) + WatchResource(typeURL, resourceName string, watcher xdsclient.ResourceWatcher) (cancel func()) BootstrapConfig() *bootstrap.Config } diff --git a/internal/xds/server/rds_handler.go b/internal/xds/server/rds_handler.go index bf78c37c8292..53ebef5e36da 100644 --- a/internal/xds/server/rds_handler.go +++ b/internal/xds/server/rds_handler.go @@ -135,7 +135,7 @@ type rdsWatcher struct { canceled bool // eats callbacks if true } -func (rw *rdsWatcher) ResourceChanged(update *xdsresource.RouteConfigResourceData, onDone func()) { +func (rw *rdsWatcher) ResourceChanged(update *xdsresource.RouteConfigUpdate, onDone func()) { defer onDone() rw.mu.Lock() if rw.canceled { @@ -144,11 +144,11 @@ func (rw *rdsWatcher) ResourceChanged(update *xdsresource.RouteConfigResourceDat } rw.mu.Unlock() if rw.logger.V(2) { - rw.logger.Infof("RDS watch for resource %q received update: %#v", rw.routeName, update.Resource) + rw.logger.Infof("RDS watch for resource %q received update: %#v", rw.routeName, update) } routeName := rw.routeName - rwu := rdsWatcherUpdate{data: &update.Resource} + rwu := rdsWatcherUpdate{data: update} rw.parent.updates[routeName] = rwu rw.parent.callback(routeName, rwu) } diff --git a/internal/xds/xdsclient/client.go b/internal/xds/xdsclient/client.go index 1d803402c4d5..d05382d0b112 100644 --- a/internal/xds/xdsclient/client.go +++ b/internal/xds/xdsclient/client.go @@ -26,7 +26,6 @@ import ( "google.golang.org/grpc/internal/xds/bootstrap" "google.golang.org/grpc/internal/xds/clients/lrsclient" "google.golang.org/grpc/internal/xds/clients/xdsclient" - "google.golang.org/grpc/internal/xds/xdsclient/xdsresource" v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3" ) @@ -49,12 +48,7 @@ type XDSClient interface { // During a race (e.g. an xDS response is received while the user is calling // cancel()), there's a small window where the callback can be called after // the watcher is canceled. Callers need to handle this case. - WatchResource(rType xdsresource.Type, resourceName string, watcher xdsresource.ResourceWatcher) (cancel func()) - - // WatchResourceV2 matches the API of the external xdsclient interface. - // Once all users of xdsclient have been moved to this watch API, we can - // remove the WatchResource API above, and rename this to WatchResource. - WatchResourceV2(typeURL, resourceName string, watcher xdsclient.ResourceWatcher) (cancel func()) + WatchResource(typeURL, resourceName string, watcher xdsclient.ResourceWatcher) (cancel func()) ReportLoad(*bootstrap.ServerConfig) (*lrsclient.LoadStore, func(context.Context)) diff --git a/internal/xds/xdsclient/clientimpl_test.go b/internal/xds/xdsclient/clientimpl_test.go index 87a35b8ab079..dd3e4073ca1e 100644 --- a/internal/xds/xdsclient/clientimpl_test.go +++ b/internal/xds/xdsclient/clientimpl_test.go @@ -82,9 +82,9 @@ func (s) TestBuildXDSClientConfig_Success(t *testing.T) { Authorities: map[string]xdsclient.Authority{}, ResourceTypes: map[string]xdsclient.ResourceType{ version.V3ListenerURL: {TypeURL: version.V3ListenerURL, TypeName: xdsresource.ListenerResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewListenerResourceTypeDecoder(c)}, - version.V3RouteConfigURL: {TypeURL: version.V3RouteConfigURL, TypeName: xdsresource.RouteConfigTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewGenericRouteConfigResourceTypeDecoder()}, - version.V3ClusterURL: {TypeURL: version.V3ClusterURL, TypeName: xdsresource.ClusterResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewGenericClusterResourceTypeDecoder(c, gServerCfgMap)}, - version.V3EndpointsURL: {TypeURL: version.V3EndpointsURL, TypeName: xdsresource.EndpointsResourceTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewGenericEndpointsResourceTypeDecoder()}, + version.V3RouteConfigURL: {TypeURL: version.V3RouteConfigURL, TypeName: xdsresource.RouteConfigTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewRouteConfigResourceTypeDecoder(c)}, + version.V3ClusterURL: {TypeURL: version.V3ClusterURL, TypeName: xdsresource.ClusterResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewClusterResourceTypeDecoder(c, gServerCfgMap)}, + version.V3EndpointsURL: {TypeURL: version.V3EndpointsURL, TypeName: xdsresource.EndpointsResourceTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewEndpointsResourceTypeDecoder(c)}, }, MetricsReporter: &metricsReporter{recorder: stats.NewTestMetricsRecorder(), target: testTargetName}, TransportBuilder: grpctransport.NewBuilder(map[string]grpctransport.Config{ @@ -113,16 +113,16 @@ func (s) TestBuildXDSClientConfig_Success(t *testing.T) { topLevelSCfg, auth2SCfg := c.XDSServers()[0], c.Authorities()["auth2"].XDSServers[0] expTopLevelS := xdsclient.ServerConfig{ServerIdentifier: clients.ServerIdentifier{ServerURI: topLevelSCfg.ServerURI(), Extensions: grpctransport.ServerIdentifierExtension{ConfigName: "insecure"}}} expAuth2S := xdsclient.ServerConfig{ServerIdentifier: clients.ServerIdentifier{ServerURI: auth2SCfg.ServerURI(), Extensions: grpctransport.ServerIdentifierExtension{ConfigName: "insecure"}}} - gSCfgMap := map[xdsclient.ServerConfig]*bootstrap.ServerConfig{expTopLevelS: topLevelSCfg, expAuth2S: auth2SCfg} + gServerCfgMap := map[xdsclient.ServerConfig]*bootstrap.ServerConfig{expTopLevelS: topLevelSCfg, expAuth2S: auth2SCfg} return xdsclient.Config{ Servers: []xdsclient.ServerConfig{expTopLevelS}, Node: clients.Node{ID: node.GetId(), Cluster: node.GetCluster(), Metadata: node.Metadata, UserAgentName: node.UserAgentName, UserAgentVersion: node.GetUserAgentVersion()}, Authorities: map[string]xdsclient.Authority{"auth1": {XDSServers: []xdsclient.ServerConfig{expTopLevelS}}, "auth2": {XDSServers: []xdsclient.ServerConfig{expAuth2S}}}, ResourceTypes: map[string]xdsclient.ResourceType{ version.V3ListenerURL: {TypeURL: version.V3ListenerURL, TypeName: xdsresource.ListenerResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewListenerResourceTypeDecoder(c)}, - version.V3RouteConfigURL: {TypeURL: version.V3RouteConfigURL, TypeName: xdsresource.RouteConfigTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewGenericRouteConfigResourceTypeDecoder()}, - version.V3ClusterURL: {TypeURL: version.V3ClusterURL, TypeName: xdsresource.ClusterResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewGenericClusterResourceTypeDecoder(c, gSCfgMap)}, - version.V3EndpointsURL: {TypeURL: version.V3EndpointsURL, TypeName: xdsresource.EndpointsResourceTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewGenericEndpointsResourceTypeDecoder()}, + version.V3RouteConfigURL: {TypeURL: version.V3RouteConfigURL, TypeName: xdsresource.RouteConfigTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewRouteConfigResourceTypeDecoder(c)}, + version.V3ClusterURL: {TypeURL: version.V3ClusterURL, TypeName: xdsresource.ClusterResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewClusterResourceTypeDecoder(c, gServerCfgMap)}, + version.V3EndpointsURL: {TypeURL: version.V3EndpointsURL, TypeName: xdsresource.EndpointsResourceTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewEndpointsResourceTypeDecoder(c)}, }, MetricsReporter: &metricsReporter{recorder: stats.NewTestMetricsRecorder(), target: testTargetName}, TransportBuilder: grpctransport.NewBuilder(map[string]grpctransport.Config{ @@ -152,9 +152,9 @@ func (s) TestBuildXDSClientConfig_Success(t *testing.T) { Authorities: map[string]xdsclient.Authority{}, ResourceTypes: map[string]xdsclient.ResourceType{ version.V3ListenerURL: {TypeURL: version.V3ListenerURL, TypeName: xdsresource.ListenerResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewListenerResourceTypeDecoder(c)}, - version.V3RouteConfigURL: {TypeURL: version.V3RouteConfigURL, TypeName: xdsresource.RouteConfigTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewGenericRouteConfigResourceTypeDecoder()}, - version.V3ClusterURL: {TypeURL: version.V3ClusterURL, TypeName: xdsresource.ClusterResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewGenericClusterResourceTypeDecoder(c, gServerCfgMap)}, - version.V3EndpointsURL: {TypeURL: version.V3EndpointsURL, TypeName: xdsresource.EndpointsResourceTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewGenericEndpointsResourceTypeDecoder()}, + version.V3RouteConfigURL: {TypeURL: version.V3RouteConfigURL, TypeName: xdsresource.RouteConfigTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewRouteConfigResourceTypeDecoder(c)}, + version.V3ClusterURL: {TypeURL: version.V3ClusterURL, TypeName: xdsresource.ClusterResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewClusterResourceTypeDecoder(c, gServerCfgMap)}, + version.V3EndpointsURL: {TypeURL: version.V3EndpointsURL, TypeName: xdsresource.EndpointsResourceTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewEndpointsResourceTypeDecoder(c)}, }, MetricsReporter: &metricsReporter{recorder: stats.NewTestMetricsRecorder(), target: testTargetName}, TransportBuilder: grpctransport.NewBuilder(map[string]grpctransport.Config{ @@ -184,9 +184,9 @@ func (s) TestBuildXDSClientConfig_Success(t *testing.T) { Authorities: map[string]xdsclient.Authority{}, ResourceTypes: map[string]xdsclient.ResourceType{ version.V3ListenerURL: {TypeURL: version.V3ListenerURL, TypeName: xdsresource.ListenerResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewListenerResourceTypeDecoder(c)}, - version.V3RouteConfigURL: {TypeURL: version.V3RouteConfigURL, TypeName: xdsresource.RouteConfigTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewGenericRouteConfigResourceTypeDecoder()}, - version.V3ClusterURL: {TypeURL: version.V3ClusterURL, TypeName: xdsresource.ClusterResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewGenericClusterResourceTypeDecoder(c, gServerCfgMap)}, - version.V3EndpointsURL: {TypeURL: version.V3EndpointsURL, TypeName: xdsresource.EndpointsResourceTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewGenericEndpointsResourceTypeDecoder()}, + version.V3RouteConfigURL: {TypeURL: version.V3RouteConfigURL, TypeName: xdsresource.RouteConfigTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewRouteConfigResourceTypeDecoder(c)}, + version.V3ClusterURL: {TypeURL: version.V3ClusterURL, TypeName: xdsresource.ClusterResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewClusterResourceTypeDecoder(c, gServerCfgMap)}, + version.V3EndpointsURL: {TypeURL: version.V3EndpointsURL, TypeName: xdsresource.EndpointsResourceTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewEndpointsResourceTypeDecoder(c)}, }, MetricsReporter: &metricsReporter{recorder: stats.NewTestMetricsRecorder(), target: testTargetName}, TransportBuilder: grpctransport.NewBuilder(map[string]grpctransport.Config{ diff --git a/internal/xds/xdsclient/clientimpl_watchers.go b/internal/xds/xdsclient/clientimpl_watchers.go deleted file mode 100644 index b90d3d36c669..000000000000 --- a/internal/xds/xdsclient/clientimpl_watchers.go +++ /dev/null @@ -1,36 +0,0 @@ -/* - * - * Copyright 2020 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package xdsclient - -import ( - "google.golang.org/grpc/internal/xds/clients/xdsclient" - "google.golang.org/grpc/internal/xds/xdsclient/xdsresource" -) - -// WatchResource uses xDS to discover the resource associated with the provided -// resource name. The resource type implementation determines how xDS responses -// are are deserialized and validated, as received from the xDS management -// server. Upon receipt of a response from the management server, an -// appropriate callback on the watcher is invoked. -func (c *clientImpl) WatchResource(rType xdsresource.Type, resourceName string, watcher xdsresource.ResourceWatcher) (cancel func()) { - return c.XDSClient.WatchResource(rType.TypeURL(), resourceName, xdsresource.GenericResourceWatcher(watcher)) -} - -func (c *clientImpl) WatchResourceV2(typeURL, resourceName string, watcher xdsclient.ResourceWatcher) (cancel func()) { - return c.XDSClient.WatchResource(typeURL, resourceName, watcher) -} diff --git a/internal/xds/xdsclient/resource_types.go b/internal/xds/xdsclient/resource_types.go index a044eedf8294..29d1ed4cbf4c 100644 --- a/internal/xds/xdsclient/resource_types.go +++ b/internal/xds/xdsclient/resource_types.go @@ -36,19 +36,19 @@ func supportedResourceTypes(config *bootstrap.Config, gServerCfgMap map[xdsclien TypeURL: version.V3RouteConfigURL, TypeName: xdsresource.RouteConfigTypeName, AllResourcesRequiredInSotW: false, - Decoder: xdsresource.NewGenericRouteConfigResourceTypeDecoder(), + Decoder: xdsresource.NewRouteConfigResourceTypeDecoder(config), }, version.V3ClusterURL: { TypeURL: version.V3ClusterURL, TypeName: xdsresource.ClusterResourceTypeName, AllResourcesRequiredInSotW: true, - Decoder: xdsresource.NewGenericClusterResourceTypeDecoder(config, gServerCfgMap), + Decoder: xdsresource.NewClusterResourceTypeDecoder(config, gServerCfgMap), }, version.V3EndpointsURL: { TypeURL: version.V3EndpointsURL, TypeName: xdsresource.EndpointsResourceTypeName, AllResourcesRequiredInSotW: false, - Decoder: xdsresource.NewGenericEndpointsResourceTypeDecoder(), + Decoder: xdsresource.NewEndpointsResourceTypeDecoder(config), }, } } diff --git a/internal/xds/xdsclient/tests/authority_test.go b/internal/xds/xdsclient/tests/authority_test.go index fbea7cb8ec3c..2f821b653274 100644 --- a/internal/xds/xdsclient/tests/authority_test.go +++ b/internal/xds/xdsclient/tests/authority_test.go @@ -316,13 +316,13 @@ func (s) TestAuthority_Fallback(t *testing.T) { if err != nil { t.Fatalf("Error when waiting for a resource update callback: %v", err) } - gotUpdate := v.(xdsresource.ClusterUpdate) + gotUpdate := v.(*xdsresource.ClusterUpdate) wantUpdate := xdsresource.ClusterUpdate{ ClusterName: clusterName, EDSServiceName: edsSecondaryName, } cmpOpts := []cmp.Option{cmpopts.EquateEmpty(), cmpopts.IgnoreFields(xdsresource.ClusterUpdate{}, "Raw", "LBPolicy", "TelemetryLabels")} - if diff := cmp.Diff(wantUpdate, gotUpdate, cmpOpts...); diff != "" { + if diff := cmp.Diff(wantUpdate, *gotUpdate, cmpOpts...); diff != "" { t.Fatalf("Diff in the cluster resource update: (-want, got):\n%s", diff) } @@ -351,8 +351,8 @@ func newClusterWatcherV2() *clusterWatcherV2 { } } -func (cw *clusterWatcherV2) ResourceChanged(update *xdsresource.ClusterResourceData, onDone func()) { - cw.updateCh.Send(update.Resource) +func (cw *clusterWatcherV2) ResourceChanged(update *xdsresource.ClusterUpdate, onDone func()) { + cw.updateCh.Send(update) onDone() } diff --git a/internal/xds/xdsclient/tests/cds_watchers_test.go b/internal/xds/xdsclient/tests/cds_watchers_test.go index e6368340294f..e8f60bfa6e87 100644 --- a/internal/xds/xdsclient/tests/cds_watchers_test.go +++ b/internal/xds/xdsclient/tests/cds_watchers_test.go @@ -44,7 +44,7 @@ import ( type noopClusterWatcher struct{} -func (noopClusterWatcher) ResourceChanged(_ *xdsresource.ClusterResourceData, onDone func()) { +func (noopClusterWatcher) ResourceChanged(_ *xdsresource.ClusterUpdate, onDone func()) { onDone() } func (noopClusterWatcher) ResourceError(_ error, onDone func()) { @@ -67,8 +67,8 @@ func newClusterWatcher() *clusterWatcher { return &clusterWatcher{updateCh: testutils.NewChannel()} } -func (cw *clusterWatcher) ResourceChanged(update *xdsresource.ClusterResourceData, onDone func()) { - cw.updateCh.Send(clusterUpdateErrTuple{update: update.Resource}) +func (cw *clusterWatcher) ResourceChanged(update *xdsresource.ClusterUpdate, onDone func()) { + cw.updateCh.Send(clusterUpdateErrTuple{update: *update}) onDone() } diff --git a/internal/xds/xdsclient/tests/eds_watchers_test.go b/internal/xds/xdsclient/tests/eds_watchers_test.go index a76c58641439..d367e12d7a51 100644 --- a/internal/xds/xdsclient/tests/eds_watchers_test.go +++ b/internal/xds/xdsclient/tests/eds_watchers_test.go @@ -53,7 +53,7 @@ const ( type noopEndpointsWatcher struct{} -func (noopEndpointsWatcher) ResourceChanged(_ *xdsresource.EndpointsResourceData, onDone func()) { +func (noopEndpointsWatcher) ResourceChanged(_ *xdsresource.EndpointsUpdate, onDone func()) { onDone() } func (noopEndpointsWatcher) ResourceError(_ error, onDone func()) { @@ -76,8 +76,8 @@ func newEndpointsWatcher() *endpointsWatcher { return &endpointsWatcher{updateCh: testutils.NewChannel()} } -func (ew *endpointsWatcher) ResourceChanged(update *xdsresource.EndpointsResourceData, onDone func()) { - ew.updateCh.Send(endpointsUpdateErrTuple{update: update.Resource}) +func (ew *endpointsWatcher) ResourceChanged(update *xdsresource.EndpointsUpdate, onDone func()) { + ew.updateCh.Send(endpointsUpdateErrTuple{update: *update}) onDone() } diff --git a/internal/xds/xdsclient/tests/rds_watchers_test.go b/internal/xds/xdsclient/tests/rds_watchers_test.go index e71dfbd36243..c5b7478ee97a 100644 --- a/internal/xds/xdsclient/tests/rds_watchers_test.go +++ b/internal/xds/xdsclient/tests/rds_watchers_test.go @@ -43,7 +43,7 @@ import ( type noopRouteConfigWatcher struct{} -func (noopRouteConfigWatcher) ResourceChanged(_ *xdsresource.RouteConfigResourceData, onDone func()) { +func (noopRouteConfigWatcher) ResourceChanged(_ *xdsresource.RouteConfigUpdate, onDone func()) { onDone() } func (noopRouteConfigWatcher) ResourceError(_ error, onDone func()) { @@ -66,8 +66,8 @@ func newRouteConfigWatcher() *routeConfigWatcher { return &routeConfigWatcher{updateCh: testutils.NewChannel()} } -func (rw *routeConfigWatcher) ResourceChanged(update *xdsresource.RouteConfigResourceData, onDone func()) { - rw.updateCh.Send(routeConfigUpdateErrTuple{update: update.Resource}) +func (rw *routeConfigWatcher) ResourceChanged(update *xdsresource.RouteConfigUpdate, onDone func()) { + rw.updateCh.Send(routeConfigUpdateErrTuple{update: *update}) onDone() } diff --git a/internal/xds/xdsclient/xdsresource/cluster_resource_type.go b/internal/xds/xdsclient/xdsresource/cluster_resource_type.go index 2a6a08f90647..af4e785b5a49 100644 --- a/internal/xds/xdsclient/xdsresource/cluster_resource_type.go +++ b/internal/xds/xdsclient/xdsresource/cluster_resource_type.go @@ -18,12 +18,12 @@ package xdsresource import ( - "google.golang.org/grpc/internal/pretty" + "bytes" + "fmt" + "google.golang.org/grpc/internal/xds/bootstrap" xdsclient "google.golang.org/grpc/internal/xds/clients/xdsclient" "google.golang.org/grpc/internal/xds/xdsclient/xdsresource/version" - "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/types/known/anypb" ) const ( @@ -32,81 +32,62 @@ const ( ClusterResourceTypeName = "ClusterResource" ) -var ( - // Compile time interface checks. - _ Type = clusterResourceType{} - - // Singleton instantiation of the resource type implementation. - clusterType = clusterResourceType{ - resourceTypeState: resourceTypeState{ - typeURL: version.V3ClusterURL, - typeName: ClusterResourceTypeName, - allResourcesRequiredInSotW: true, - }, - } -) - -// clusterResourceType provides the resource-type specific functionality for a -// Cluster resource. -// -// Implements the Type interface. -type clusterResourceType struct { - resourceTypeState +// clusterResourceDecoder is an implementation of the xdsclient.Decoder +// interface for listener resources. +type clusterResourceDecoder struct { + bootstrapConfig *bootstrap.Config + serverConfigs map[xdsclient.ServerConfig]*bootstrap.ServerConfig } -// Decode deserializes and validates an xDS resource serialized inside the -// provided `Any` proto, as received from the xDS management server. -func (clusterResourceType) Decode(opts *DecodeOptions, resource *anypb.Any) (*DecodeResult, error) { - name, cluster, err := unmarshalClusterResource(resource, opts.ServerConfig) - switch { - case name == "": +func (d *clusterResourceDecoder) Decode(resource *xdsclient.AnyProto, opts xdsclient.DecodeOptions) (*xdsclient.DecodeResult, error) { + serverCfg, ok := d.serverConfigs[*opts.ServerConfig] + if !ok { + return nil, fmt.Errorf("no server config found for {%+v}", opts.ServerConfig) + } + name, cluster, err := unmarshalClusterResource(resource.ToAny(), serverCfg) + if name == "" { // Name is unset only when protobuf deserialization fails. return nil, err - case err != nil: + } + if err != nil { // Protobuf deserialization succeeded, but resource validation failed. - return &DecodeResult{Name: name, Resource: &ClusterResourceData{Resource: ClusterUpdate{}}}, err + return &xdsclient.DecodeResult{ + Name: name, + Resource: &ClusterResourceData{Resource: ClusterUpdate{}}, + }, err } // Perform extra validation here. - if err := securityConfigValidator(opts.BootstrapConfig, cluster.SecurityCfg); err != nil { - return &DecodeResult{Name: name, Resource: &ClusterResourceData{Resource: ClusterUpdate{}}}, err + if err := securityConfigValidator(d.bootstrapConfig, cluster.SecurityCfg); err != nil { + return &xdsclient.DecodeResult{ + Name: name, + Resource: &ClusterResourceData{Resource: ClusterUpdate{}}, + }, err } - return &DecodeResult{Name: name, Resource: &ClusterResourceData{Resource: cluster}}, nil - + return &xdsclient.DecodeResult{ + Name: name, + Resource: &ClusterResourceData{Resource: cluster}, + }, nil } // ClusterResourceData wraps the configuration of a Cluster resource as received // from the management server. -// -// Implements the ResourceData interface. type ClusterResourceData struct { - ResourceData - - // TODO: We have always stored update structs by value. See if this can be - // switched to a pointer? Resource ClusterUpdate } -// RawEqual returns true if other is equal to r. -func (c *ClusterResourceData) RawEqual(other ResourceData) bool { - if c == nil && other == nil { - return true - } - if (c == nil) != (other == nil) { +// Equal returns true if other is equal to c. +func (c *ClusterResourceData) Equal(other xdsclient.ResourceData) bool { + if other == nil { return false } - return proto.Equal(c.Resource.Raw, other.Raw()) -} - -// ToJSON returns a JSON string representation of the resource data. -func (c *ClusterResourceData) ToJSON() string { - return pretty.ToJSON(c.Resource) + return bytes.Equal(c.Bytes(), other.Bytes()) } -// Raw returns the underlying raw protobuf form of the cluster resource. -func (c *ClusterResourceData) Raw() *anypb.Any { - return c.Resource.Raw +// Bytes returns the protobuf serialized bytes of the cluster resource proto. +func (c *ClusterResourceData) Bytes() []byte { + return c.Resource.Raw.GetValue() } // ClusterWatcher wraps the callbacks to be invoked for different events @@ -114,7 +95,7 @@ func (c *ClusterResourceData) Raw() *anypb.Any { // exhaustive list of what method is invoked under what conditions. type ClusterWatcher interface { // ResourceChanged indicates a new version of the resource is available. - ResourceChanged(resource *ClusterResourceData, done func()) + ResourceChanged(resource *ClusterUpdate, done func()) // ResourceError indicates an error occurred while trying to fetch or // decode the associated resource. The previous version of the resource @@ -133,9 +114,9 @@ type delegatingClusterWatcher struct { watcher ClusterWatcher } -func (d *delegatingClusterWatcher) ResourceChanged(data ResourceData, onDone func()) { +func (d *delegatingClusterWatcher) ResourceChanged(data xdsclient.ResourceData, onDone func()) { c := data.(*ClusterResourceData) - d.watcher.ResourceChanged(c, onDone) + d.watcher.ResourceChanged(&c.Resource, onDone) } func (d *delegatingClusterWatcher) ResourceError(err error, onDone func()) { @@ -149,12 +130,11 @@ func (d *delegatingClusterWatcher) AmbientError(err error, onDone func()) { // WatchCluster uses xDS to discover the configuration associated with the // provided cluster resource name. func WatchCluster(p Producer, name string, w ClusterWatcher) (cancel func()) { - delegator := &delegatingClusterWatcher{watcher: w} - return p.WatchResource(clusterType, name, delegator) + return p.WatchResource(version.V3ClusterURL, name, &delegatingClusterWatcher{watcher: w}) } -// NewGenericClusterResourceTypeDecoder returns a xdsclient.Decoder that -// wraps the xdsresource.clusterType. -func NewGenericClusterResourceTypeDecoder(bc *bootstrap.Config, gServerCfgMap map[xdsclient.ServerConfig]*bootstrap.ServerConfig) xdsclient.Decoder { - return &GenericResourceTypeDecoder{ResourceType: clusterType, BootstrapConfig: bc, ServerConfigMap: gServerCfgMap} +// NewClusterResourceTypeDecoder returns a xdsclient.Decoder that wraps +// the xdsresource.clusterType. +func NewClusterResourceTypeDecoder(bc *bootstrap.Config, gServerCfgMap map[xdsclient.ServerConfig]*bootstrap.ServerConfig) xdsclient.Decoder { + return &clusterResourceDecoder{bootstrapConfig: bc, serverConfigs: gServerCfgMap} } diff --git a/internal/xds/xdsclient/xdsresource/endpoints_resource_type.go b/internal/xds/xdsclient/xdsresource/endpoints_resource_type.go index 7ca45ec6ad0c..cf72c49574a2 100644 --- a/internal/xds/xdsclient/xdsresource/endpoints_resource_type.go +++ b/internal/xds/xdsclient/xdsresource/endpoints_resource_type.go @@ -18,11 +18,11 @@ package xdsresource import ( - "google.golang.org/grpc/internal/pretty" + "bytes" + + "google.golang.org/grpc/internal/xds/bootstrap" xdsclient "google.golang.org/grpc/internal/xds/clients/xdsclient" "google.golang.org/grpc/internal/xds/xdsclient/xdsresource/version" - "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/types/known/anypb" ) const ( @@ -31,77 +31,49 @@ const ( EndpointsResourceTypeName = "EndpointsResource" ) -var ( - // Compile time interface checks. - _ Type = endpointsResourceType{} - - // Singleton instantiation of the resource type implementation. - endpointsType = endpointsResourceType{ - resourceTypeState: resourceTypeState{ - typeURL: version.V3EndpointsURL, - typeName: "EndpointsResource", - allResourcesRequiredInSotW: false, - }, - } -) - -// endpointsResourceType provides the resource-type specific functionality for a -// ClusterLoadAssignment (or Endpoints) resource. -// -// Implements the Type interface. -type endpointsResourceType struct { - resourceTypeState +// endpointsResourceDecoder is an implementation of the xdsclient.Decoder +// interface for endpoints resources. +type endpointsResourceDecoder struct { + bootstrapConfig *bootstrap.Config } -// Decode deserializes and validates an xDS resource serialized inside the -// provided `Any` proto, as received from the xDS management server. -func (endpointsResourceType) Decode(_ *DecodeOptions, resource *anypb.Any) (*DecodeResult, error) { - name, rc, err := unmarshalEndpointsResource(resource) - switch { - case name == "": +func (d *endpointsResourceDecoder) Decode(resource *xdsclient.AnyProto, _ xdsclient.DecodeOptions) (*xdsclient.DecodeResult, error) { + name, endpoints, err := unmarshalEndpointsResource(resource.ToAny()) + if name == "" { // Name is unset only when protobuf deserialization fails. return nil, err - case err != nil: + } + if err != nil { // Protobuf deserialization succeeded, but resource validation failed. - return &DecodeResult{Name: name, Resource: &EndpointsResourceData{Resource: EndpointsUpdate{}}}, err + return &xdsclient.DecodeResult{ + Name: name, + Resource: &ListenerResourceData{Resource: ListenerUpdate{}}, + }, err } - return &DecodeResult{Name: name, Resource: &EndpointsResourceData{Resource: rc}}, nil - + return &xdsclient.DecodeResult{ + Name: name, + Resource: &EndpointsResourceData{Resource: endpoints}, + }, nil } -// EndpointsResourceData wraps the configuration of an Endpoints resource as -// received from the management server. -// -// Implements the ResourceData interface. +// EndpointsResourceData is an implementation of the xdsclient.ResourceData +// interface for endpoints resources. type EndpointsResourceData struct { - ResourceData - - // TODO: We have always stored update structs by value. See if this can be - // switched to a pointer? Resource EndpointsUpdate } -// RawEqual returns true if other is equal to r. -func (e *EndpointsResourceData) RawEqual(other ResourceData) bool { - if e == nil && other == nil { - return true - } - if (e == nil) != (other == nil) { +// Equal returns true if other is equal to e. +func (e *EndpointsResourceData) Equal(other xdsclient.ResourceData) bool { + if other == nil { return false } - return proto.Equal(e.Resource.Raw, other.Raw()) - -} - -// ToJSON returns a JSON string representation of the resource data. -func (e *EndpointsResourceData) ToJSON() string { - return pretty.ToJSON(e.Resource) + return bytes.Equal(e.Bytes(), other.Bytes()) } -// Raw returns the underlying raw protobuf form of the listener resource. -func (e *EndpointsResourceData) Raw() *anypb.Any { - return e.Resource.Raw +// Bytes returns the protobuf serialized bytes of the listener resource proto. +func (e *EndpointsResourceData) Bytes() []byte { + return e.Resource.Raw.GetValue() } // EndpointsWatcher wraps the callbacks to be invoked for different @@ -109,7 +81,7 @@ func (e *EndpointsResourceData) Raw() *anypb.Any { // contains an exhaustive list of what method is invoked under what conditions. type EndpointsWatcher interface { // ResourceChanged indicates a new version of the resource is available. - ResourceChanged(resource *EndpointsResourceData, done func()) + ResourceChanged(resource *EndpointsUpdate, done func()) // ResourceError indicates an error occurred while trying to fetch or // decode the associated resource. The previous version of the resource @@ -128,9 +100,9 @@ type delegatingEndpointsWatcher struct { watcher EndpointsWatcher } -func (d *delegatingEndpointsWatcher) ResourceChanged(data ResourceData, onDone func()) { +func (d *delegatingEndpointsWatcher) ResourceChanged(data xdsclient.ResourceData, onDone func()) { e := data.(*EndpointsResourceData) - d.watcher.ResourceChanged(e, onDone) + d.watcher.ResourceChanged(&e.Resource, onDone) } func (d *delegatingEndpointsWatcher) ResourceError(err error, onDone func()) { @@ -144,12 +116,11 @@ func (d *delegatingEndpointsWatcher) AmbientError(err error, onDone func()) { // WatchEndpoints uses xDS to discover the configuration associated with the // provided endpoints resource name. func WatchEndpoints(p Producer, name string, w EndpointsWatcher) (cancel func()) { - delegator := &delegatingEndpointsWatcher{watcher: w} - return p.WatchResource(endpointsType, name, delegator) + return p.WatchResource(version.V3EndpointsURL, name, &delegatingEndpointsWatcher{watcher: w}) } -// NewGenericEndpointsResourceTypeDecoder returns a xdsclient.Decoder that -// wraps the xdsresource.endpointsType. -func NewGenericEndpointsResourceTypeDecoder() xdsclient.Decoder { - return &GenericResourceTypeDecoder{ResourceType: endpointsType} +// NewEndpointsResourceTypeDecoder returns a xdsclient.Decoder that wraps +// the xdsresource.endpointsType. +func NewEndpointsResourceTypeDecoder(bc *bootstrap.Config) xdsclient.Decoder { + return &endpointsResourceDecoder{bootstrapConfig: bc} } diff --git a/internal/xds/xdsclient/xdsresource/listener_resource_type.go b/internal/xds/xdsclient/xdsresource/listener_resource_type.go index a1e1c672c88f..f4f69365abb5 100644 --- a/internal/xds/xdsclient/xdsresource/listener_resource_type.go +++ b/internal/xds/xdsclient/xdsresource/listener_resource_type.go @@ -149,8 +149,8 @@ func (d *delegatingListenerWatcher) AmbientError(err error, onDone func()) { // WatchListener uses xDS to discover the configuration associated with the // provided listener resource name. -func WatchListener(p ProducerV2, name string, w ListenerWatcher) (cancel func()) { - return p.WatchResourceV2(version.V3ListenerURL, name, &delegatingListenerWatcher{watcher: w}) +func WatchListener(p Producer, name string, w ListenerWatcher) (cancel func()) { + return p.WatchResource(version.V3ListenerURL, name, &delegatingListenerWatcher{watcher: w}) } // NewListenerResourceTypeDecoder returns a xdsclient.Decoder that wraps diff --git a/internal/xds/xdsclient/xdsresource/resource_type.go b/internal/xds/xdsclient/xdsresource/resource_type.go index aaa30cecfb11..05f0d58f50e5 100644 --- a/internal/xds/xdsclient/xdsresource/resource_type.go +++ b/internal/xds/xdsclient/xdsresource/resource_type.go @@ -25,8 +25,6 @@ package xdsresource import ( - "fmt" - "google.golang.org/grpc/internal/xds/bootstrap" "google.golang.org/grpc/internal/xds/clients/xdsclient" "google.golang.org/protobuf/types/known/anypb" @@ -42,16 +40,7 @@ type Producer interface { // xDS responses are are deserialized and validated, as received from the // xDS management server. Upon receipt of a response from the management // server, an appropriate callback on the watcher is invoked. - WatchResource(rType Type, resourceName string, watcher ResourceWatcher) (cancel func()) -} - -// ProducerV2 is like Producer, but uses the external xdsclient API. -// -// Once all resource type implementations have been migrated to use the external -// xdsclient API, this interface will be renamed to Producer and the existing -// Producer interface will be deleted. -type ProducerV2 interface { - WatchResourceV2(typeURL, resourceName string, watcher xdsclient.ResourceWatcher) (cancel func()) + WatchResource(typeURL, resourceName string, watcher xdsclient.ResourceWatcher) (cancel func()) } // ResourceWatcher is notified of the resource updates and errors that are @@ -148,136 +137,3 @@ type DecodeResult struct { // watched. Resource ResourceData } - -// resourceTypeState wraps the static state associated with concrete resource -// type implementations, which can then embed this struct and get the methods -// implemented here for free. -type resourceTypeState struct { - typeURL string - typeName string - allResourcesRequiredInSotW bool -} - -func (r resourceTypeState) TypeURL() string { - return r.typeURL -} - -func (r resourceTypeState) TypeName() string { - return r.typeName -} - -func (r resourceTypeState) AllResourcesRequiredInSotW() bool { - return r.allResourcesRequiredInSotW -} - -// GenericResourceTypeDecoder wraps an xdsresource.Type and implements -// xdsclient.Decoder. -// -// TODO: #8313 - Delete this once the internal xdsclient usages are updated -// to use the generic xdsclient.ResourceType interface directly. -type GenericResourceTypeDecoder struct { - ResourceType Type - BootstrapConfig *bootstrap.Config - ServerConfigMap map[xdsclient.ServerConfig]*bootstrap.ServerConfig -} - -// Decode deserialize and validate resource bytes of an xDS resource received -// from the xDS management server. -func (gd *GenericResourceTypeDecoder) Decode(resource *xdsclient.AnyProto, gOpts xdsclient.DecodeOptions) (*xdsclient.DecodeResult, error) { - opts := &DecodeOptions{BootstrapConfig: gd.BootstrapConfig} - if gOpts.ServerConfig != nil { - opts.ServerConfig = gd.ServerConfigMap[*gOpts.ServerConfig] - } - - result, err := gd.ResourceType.Decode(opts, resource.ToAny()) - if result == nil { - return nil, err - } - if err != nil { - return &xdsclient.DecodeResult{Name: result.Name}, err - } - - return &xdsclient.DecodeResult{Name: result.Name, Resource: &genericResourceData{resourceData: result.Resource}}, nil -} - -// genericResourceData embed an xdsresource.ResourceData and implements -// xdsclient.ResourceData. -// -// TODO: #8313 - Delete this once the internal xdsclient usages are updated -// to use the generic xdsclient.ResourceData interface directly. -type genericResourceData struct { - resourceData ResourceData -} - -// Equal returns true if the passed in xdsclient.ResourceData -// is equal to that of the receiver. -func (grd *genericResourceData) Equal(other xdsclient.ResourceData) bool { - if other == nil { - return false - } - otherResourceData, ok := other.(*genericResourceData) - if !ok { - return false - } - return grd.resourceData.RawEqual(otherResourceData.resourceData) -} - -// Bytes returns the underlying raw bytes of the wrapped resource. -func (grd *genericResourceData) Bytes() []byte { - rawAny := grd.resourceData.Raw() - if rawAny == nil { - return nil - } - return rawAny.Value -} - -// genericResourceWatcher wraps xdsresource.ResourceWatcher and implements -// xdsclient.ResourceWatcher. -// -// TODO: #8313 - Delete this once the internal xdsclient usages are updated -// to use the generic xdsclient.ResourceWatcher interface directly. -type genericResourceWatcher struct { - xdsResourceWatcher ResourceWatcher -} - -// ResourceChanged indicates a new version of the wrapped resource is -// available. -func (gw *genericResourceWatcher) ResourceChanged(gData xdsclient.ResourceData, done func()) { - if gData == nil { - gw.xdsResourceWatcher.ResourceChanged(nil, done) - return - } - - grd, ok := gData.(*genericResourceData) - if !ok { - err := fmt.Errorf("genericResourceWatcher received unexpected xdsclient.ResourceData type %T, want *genericResourceData", gData) - gw.xdsResourceWatcher.ResourceError(err, done) - return - } - gw.xdsResourceWatcher.ResourceChanged(grd.resourceData, done) -} - -// ResourceError indicates an error occurred while trying to fetch or -// decode the associated wrapped resource. The previous version of the -// wrapped resource should be considered invalid. -func (gw *genericResourceWatcher) ResourceError(err error, done func()) { - gw.xdsResourceWatcher.ResourceError(err, done) -} - -// AmbientError indicates an error occurred after a resource has been -// received that should not modify the use of that wrapped resource but may -// provide useful information about the state of the XDSClient for debugging -// purposes. The previous version of the wrapped resource should still be -// considered valid. -func (gw *genericResourceWatcher) AmbientError(err error, done func()) { - gw.xdsResourceWatcher.AmbientError(err, done) -} - -// GenericResourceWatcher returns a xdsclient.ResourceWatcher that wraps an -// xdsresource.ResourceWatcher to make it compatible with xdsclient.ResourceWatcher. -func GenericResourceWatcher(xdsResourceWatcher ResourceWatcher) xdsclient.ResourceWatcher { - if xdsResourceWatcher == nil { - return nil - } - return &genericResourceWatcher{xdsResourceWatcher: xdsResourceWatcher} -} diff --git a/internal/xds/xdsclient/xdsresource/route_config_resource_type.go b/internal/xds/xdsclient/xdsresource/route_config_resource_type.go index 912dc1b762b4..81033c91f5d9 100644 --- a/internal/xds/xdsclient/xdsresource/route_config_resource_type.go +++ b/internal/xds/xdsclient/xdsresource/route_config_resource_type.go @@ -18,11 +18,11 @@ package xdsresource import ( - "google.golang.org/grpc/internal/pretty" + "bytes" + + "google.golang.org/grpc/internal/xds/bootstrap" xdsclient "google.golang.org/grpc/internal/xds/clients/xdsclient" "google.golang.org/grpc/internal/xds/xdsclient/xdsresource/version" - "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/types/known/anypb" ) const ( @@ -31,78 +31,49 @@ const ( RouteConfigTypeName = "RouteConfigResource" ) -var ( - // Compile time interface checks. - _ Type = routeConfigResourceType{} - - // Singleton instantiation of the resource type implementation. - routeConfigType = routeConfigResourceType{ - resourceTypeState: resourceTypeState{ - typeURL: version.V3RouteConfigURL, - typeName: "RouteConfigResource", - allResourcesRequiredInSotW: false, - }, - } -) - -// routeConfigResourceType provides the resource-type specific functionality for -// a RouteConfiguration resource. -// -// Implements the Type interface. -type routeConfigResourceType struct { - resourceTypeState +// routeConfigResourceDecoder is an implementation of the xdsclient.Decoder +// interface for route configuration resources. +type routeConfigResourceDecoder struct { + bootstrapConfig *bootstrap.Config } -// Decode deserializes and validates an xDS resource serialized inside the -// provided `Any` proto, as received from the xDS management server. -func (routeConfigResourceType) Decode(_ *DecodeOptions, resource *anypb.Any) (*DecodeResult, error) { - name, rc, err := unmarshalRouteConfigResource(resource) - switch { - case name == "": +func (d *routeConfigResourceDecoder) Decode(resource *xdsclient.AnyProto, _ xdsclient.DecodeOptions) (*xdsclient.DecodeResult, error) { + name, rc, err := unmarshalRouteConfigResource(resource.ToAny()) + if name == "" { // Name is unset only when protobuf deserialization fails. return nil, err - case err != nil: + } + if err != nil { // Protobuf deserialization succeeded, but resource validation failed. - return &DecodeResult{Name: name, Resource: &RouteConfigResourceData{Resource: RouteConfigUpdate{}}}, err + return &xdsclient.DecodeResult{ + Name: name, + Resource: &RouteConfigResourceData{Resource: RouteConfigUpdate{}}, + }, err } - return &DecodeResult{Name: name, Resource: &RouteConfigResourceData{Resource: rc}}, nil - + return &xdsclient.DecodeResult{ + Name: name, + Resource: &RouteConfigResourceData{Resource: rc}, + }, nil } -// RouteConfigResourceData wraps the configuration of a RouteConfiguration -// resource as received from the management server. -// -// Implements the ResourceData interface. +// RouteConfigResourceData is an implementation of the xdsclient.ResourceData +// interface for route configuration resources. type RouteConfigResourceData struct { - ResourceData - - // TODO: We have always stored update structs by value. See if this can be - // switched to a pointer? Resource RouteConfigUpdate } -// RawEqual returns true if other is equal to r. -func (r *RouteConfigResourceData) RawEqual(other ResourceData) bool { - if r == nil && other == nil { - return true - } - if (r == nil) != (other == nil) { +// Equal returns true if other is equal to er. +func (r *RouteConfigResourceData) Equal(other xdsclient.ResourceData) bool { + if other == nil { return false } - return proto.Equal(r.Resource.Raw, other.Raw()) - -} - -// ToJSON returns a JSON string representation of the resource data. -func (r *RouteConfigResourceData) ToJSON() string { - return pretty.ToJSON(r.Resource) + return bytes.Equal(r.Bytes(), other.Bytes()) } -// Raw returns the underlying raw protobuf form of the route configuration -// resource. -func (r *RouteConfigResourceData) Raw() *anypb.Any { - return r.Resource.Raw +// Bytes returns the protobuf serialized bytes of the route config resource proto. +func (r *RouteConfigResourceData) Bytes() []byte { + return r.Resource.Raw.GetValue() } // RouteConfigWatcher wraps the callbacks to be invoked for different @@ -111,7 +82,7 @@ func (r *RouteConfigResourceData) Raw() *anypb.Any { // conditions. type RouteConfigWatcher interface { // ResourceChanged indicates a new version of the resource is available. - ResourceChanged(resource *RouteConfigResourceData, done func()) + ResourceChanged(resource *RouteConfigUpdate, done func()) // ResourceError indicates an error occurred while trying to fetch or // decode the associated resource. The previous version of the resource @@ -130,9 +101,9 @@ type delegatingRouteConfigWatcher struct { watcher RouteConfigWatcher } -func (d *delegatingRouteConfigWatcher) ResourceChanged(data ResourceData, onDone func()) { +func (d *delegatingRouteConfigWatcher) ResourceChanged(data xdsclient.ResourceData, onDone func()) { rc := data.(*RouteConfigResourceData) - d.watcher.ResourceChanged(rc, onDone) + d.watcher.ResourceChanged(&rc.Resource, onDone) } func (d *delegatingRouteConfigWatcher) ResourceError(err error, onDone func()) { @@ -146,12 +117,11 @@ func (d *delegatingRouteConfigWatcher) AmbientError(err error, onDone func()) { // WatchRouteConfig uses xDS to discover the configuration associated with the // provided route configuration resource name. func WatchRouteConfig(p Producer, name string, w RouteConfigWatcher) (cancel func()) { - delegator := &delegatingRouteConfigWatcher{watcher: w} - return p.WatchResource(routeConfigType, name, delegator) + return p.WatchResource(version.V3RouteConfigURL, name, &delegatingRouteConfigWatcher{watcher: w}) } -// NewGenericRouteConfigResourceTypeDecoder returns a xdsclient.Decoder that -// wraps the xdsresource.routeConfigType. -func NewGenericRouteConfigResourceTypeDecoder() xdsclient.Decoder { - return &GenericResourceTypeDecoder{ResourceType: routeConfigType} +// NewRouteConfigResourceTypeDecoder returns a xdsclient.Decoder that wraps +// the xdsresource.routeConfigType. +func NewRouteConfigResourceTypeDecoder(bc *bootstrap.Config) xdsclient.Decoder { + return &routeConfigResourceDecoder{bootstrapConfig: bc} } diff --git a/xds/csds/csds_e2e_test.go b/xds/csds/csds_e2e_test.go index 25b643e1516b..252e693dc28c 100644 --- a/xds/csds/csds_e2e_test.go +++ b/xds/csds/csds_e2e_test.go @@ -83,7 +83,7 @@ func (nopListenerWatcher) AmbientError(_ error, onDone func()) { type nopRouteConfigWatcher struct{} -func (nopRouteConfigWatcher) ResourceChanged(_ *xdsresource.RouteConfigResourceData, onDone func()) { +func (nopRouteConfigWatcher) ResourceChanged(_ *xdsresource.RouteConfigUpdate, onDone func()) { onDone() } func (nopRouteConfigWatcher) ResourceError(_ error, onDone func()) { @@ -95,7 +95,7 @@ func (nopRouteConfigWatcher) AmbientError(_ error, onDone func()) { type nopClusterWatcher struct{} -func (nopClusterWatcher) ResourceChanged(_ *xdsresource.ClusterResourceData, onDone func()) { +func (nopClusterWatcher) ResourceChanged(_ *xdsresource.ClusterUpdate, onDone func()) { onDone() } func (nopClusterWatcher) ResourceError(_ error, onDone func()) { @@ -107,7 +107,7 @@ func (nopClusterWatcher) AmbientError(_ error, onDone func()) { type nopEndpointsWatcher struct{} -func (nopEndpointsWatcher) ResourceChanged(_ *xdsresource.EndpointsResourceData, onDone func()) { +func (nopEndpointsWatcher) ResourceChanged(_ *xdsresource.EndpointsUpdate, onDone func()) { onDone() } func (nopEndpointsWatcher) ResourceError(_ error, onDone func()) {