Skip to content

Commit

Permalink
[XdsClient] fix edge case when rapidly subscribing and unsubscribing
Browse files Browse the repository at this point in the history
  • Loading branch information
markdroth committed Feb 7, 2025
1 parent 780a9df commit e6cde8c
Show file tree
Hide file tree
Showing 3 changed files with 341 additions and 54 deletions.
128 changes: 98 additions & 30 deletions src/core/xds/xds_client/xds_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,10 @@ class XdsClient::XdsChannel::AdsCall final
// The ctor and dtor should not be used directly.
explicit AdsCall(RefCountedPtr<RetryableCall<AdsCall>> retryable_call);

void Orphan() override;
// Disable thread-safety analysis because this method is called via
// OrphanablePtr<>, but there's no way to pass the lock annotation
// through there.
void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;

RetryableCall<AdsCall>* retryable_call() const {
return retryable_call_.get();
Expand Down Expand Up @@ -204,7 +207,7 @@ class XdsClient::XdsChannel::AdsCall final
// optimize by not resending the resource that we already have.
auto& authority_state =
ads_call->xds_client()->authority_state_map_[name_.authority];
ResourceState& state = authority_state.resource_map[type_][name_.key];
ResourceState& state = authority_state.type_map[type_][name_.key];
if (state.HasResource()) return;
// Start timer.
ads_call_ = std::move(ads_call);
Expand All @@ -230,7 +233,7 @@ class XdsClient::XdsChannel::AdsCall final
timer_handle_.reset();
auto& authority_state =
ads_call_->xds_client()->authority_state_map_[name_.authority];
ResourceState& state = authority_state.resource_map[type_][name_.key];
ResourceState& state = authority_state.type_map[type_][name_.key];
// We might have received the resource after the timer fired but before
// the callback ran.
if (!state.HasResource()) {
Expand Down Expand Up @@ -497,7 +500,7 @@ bool XdsClient::XdsChannel::MaybeFallbackLocked(
++i) {
authority_state.xds_channels.emplace_back(
xds_client_->GetOrCreateXdsChannelLocked(*xds_servers[i], "fallback"));
for (const auto& [type, resource_map] : authority_state.resource_map) {
for (const auto& [type, resource_map] : authority_state.type_map) {
for (const auto& [key, _] : resource_map) {
authority_state.xds_channels.back()->SubscribeLocked(type,
{authority, key});
Expand Down Expand Up @@ -564,7 +567,7 @@ void XdsClient::XdsChannel::SetChannelStatusLocked(absl::Status status) {
MaybeFallbackLocked(authority, authority_state)) {
continue;
}
for (const auto& [_, resource_map] : authority_state.resource_map) {
for (const auto& [_, resource_map] : authority_state.type_map) {
for (const auto& [_, resource_state] : resource_map) {
auto& watchers =
resource_state.HasResource() ? watchers_cached : watchers_uncached;
Expand Down Expand Up @@ -723,7 +726,7 @@ XdsClient::XdsChannel::AdsCall::AdsCall(
// Skip authorities that are not using this xDS channel. The channel can be
// anywhere in the list.
if (it == authority_state.xds_channels.end()) continue;
for (const auto& [type, resource_map] : authority_state.resource_map) {
for (const auto& [type, resource_map] : authority_state.type_map) {
for (const auto& [resource_key, _] : resource_map) {
SubscribeLocked(type, {authority, resource_key}, /*delay_send=*/true);
}
Expand All @@ -737,6 +740,7 @@ XdsClient::XdsChannel::AdsCall::AdsCall(
}

void XdsClient::XdsChannel::AdsCall::Orphan() {
xds_client()->MaybeRemoveUnsubscribedCacheEntriesLocked(xds_channel());
state_map_.clear();
// Note that the initial ref is held by the StreamEventHandler, which
// will be destroyed when streaming_call_ is destroyed, which may not happen
Expand Down Expand Up @@ -876,6 +880,8 @@ void XdsClient::XdsChannel::AdsCall::SendMessageLocked(
buffered_requests_.insert(type);
return;
}
xds_client()->MaybeRemoveUnsubscribedCacheEntriesForTypeLocked(xds_channel(),
type);
auto& state = state_map_[type];
std::string serialized_message = CreateAdsRequest(
type->type_url(), xds_channel()->resource_type_version_map_[type],
Expand Down Expand Up @@ -997,8 +1003,8 @@ void XdsClient::XdsChannel::AdsCall::ParseResource(
}
AuthorityState& authority_state = authority_it->second;
// Found authority, so look up type.
auto type_it = authority_state.resource_map.find(context->type);
if (type_it == authority_state.resource_map.end()) {
auto type_it = authority_state.type_map.find(context->type);
if (type_it == authority_state.type_map.end()) {
return; // Skip resource -- we don't have a subscription for it.
}
auto& type_map = type_it->second;
Expand Down Expand Up @@ -1108,8 +1114,8 @@ void XdsClient::XdsChannel::AdsCall::HandleServerReportedResourceError(
}
AuthorityState& authority_state = authority_it->second;
// Found authority, so look up type.
auto type_it = authority_state.resource_map.find(context->type);
if (type_it == authority_state.resource_map.end()) {
auto type_it = authority_state.type_map.find(context->type);
if (type_it == authority_state.type_map.end()) {
return; // Skip resource -- we don't have a subscription for it.
}
auto& type_map = type_it->second;
Expand Down Expand Up @@ -1313,8 +1319,8 @@ void XdsClient::XdsChannel::AdsCall::OnRecvMessage(absl::string_view payload) {
}
auto seen_authority_it = context.resources_seen.find(authority);
// Find this resource type.
auto type_it = authority_state.resource_map.find(context.type);
if (type_it == authority_state.resource_map.end()) continue;
auto type_it = authority_state.type_map.find(context.type);
if (type_it == authority_state.type_map.end()) continue;
// Iterate over resource ids.
for (auto& [resource_key, resource_state] : type_it->second) {
if (seen_authority_it == context.resources_seen.end() ||
Expand Down Expand Up @@ -1595,7 +1601,12 @@ void XdsClient::Orphaned() {
MutexLock lock(&mu_);
shutting_down_ = true;
// Clear cache and any remaining watchers that may not have been cancelled.
authority_state_map_.clear();
// Note: We move authority_state_map_ out of the way before clearing
// it, because clearing the map will trigger calls to
// MaybeRemoveUnsubscribedCacheEntriesLocked(), which would try to modify
// the map while we are iterating over it.
auto authority_state_map = std::move(authority_state_map_);
authority_state_map.clear();
invalid_watchers_.clear();
}

Expand All @@ -1614,7 +1625,7 @@ RefCountedPtr<XdsClient::XdsChannel> XdsClient::GetOrCreateXdsChannelLocked(
}

bool XdsClient::HasUncachedResources(const AuthorityState& authority_state) {
for (const auto& [_, resource_map] : authority_state.resource_map) {
for (const auto& [_, resource_map] : authority_state.type_map) {
for (const auto& [_, resource_state] : resource_map) {
if (resource_state.client_status() ==
ResourceState::ClientResourceStatus::REQUESTED) {
Expand Down Expand Up @@ -1662,12 +1673,11 @@ void XdsClient::WatchResource(const XdsResourceType* type,
MaybeRegisterResourceTypeLocked(type);
AuthorityState& authority_state =
authority_state_map_[resource_name->authority];
auto [it, first_watcher_for_resource] =
authority_state.resource_map[type].emplace(resource_name->key,
ResourceState());
auto [it, created_entry] = authority_state.type_map[type].emplace(
resource_name->key, ResourceState());
ResourceState& resource_state = it->second;
resource_state.AddWatcher(watcher);
if (first_watcher_for_resource) {
if (created_entry) {
// We try to add new channels in 2 cases:
// - This is the first resource for this authority (i.e., the list
// of channels is empty).
Expand All @@ -1690,9 +1700,6 @@ void XdsClient::WatchResource(const XdsResourceType* type,
}
}
}
for (const auto& channel : authority_state.xds_channels) {
channel->SubscribeLocked(type, *resource_name);
}
} else {
// If we already have a cached value for the resource, notify the new
// watcher immediately.
Expand All @@ -1711,6 +1718,10 @@ void XdsClient::WatchResource(const XdsResourceType* type,
{watcher});
}
}
// Make sure all channels are subscribing to the resource.
for (const auto& channel : authority_state.xds_channels) {
channel->SubscribeLocked(type, *resource_name);
}
// If the channel is not connected, report an error to the watcher.
absl::Status channel_status = authority_state.xds_channels.back()->status();
if (!channel_status.ok()) {
Expand All @@ -1737,8 +1748,8 @@ void XdsClient::CancelResourceWatch(const XdsResourceType* type,
if (authority_it == authority_state_map_.end()) return;
AuthorityState& authority_state = authority_it->second;
// Find type map.
auto type_it = authority_state.resource_map.find(type);
if (type_it == authority_state.resource_map.end()) return;
auto type_it = authority_state.type_map.find(type);
if (type_it == authority_state.type_map.end()) return;
auto& type_map = type_it->second;
// Find resource key.
auto resource_it = type_map.find(resource_name->key);
Expand All @@ -1752,10 +1763,64 @@ void XdsClient::CancelResourceWatch(const XdsResourceType* type,
xds_channel->UnsubscribeLocked(type, *resource_name,
delay_unsubscription);
}
type_map.erase(resource_it);
if (type_map.empty()) {
authority_state.resource_map.erase(type_it);
if (authority_state.resource_map.empty()) {
}
}

void XdsClient::MaybeRemoveUnsubscribedCacheEntriesLocked(
XdsChannel* xds_channel) {
// Look at each authority for which xds_channel is the last channel.
for (auto& [_, authority_state] : authority_state_map_) {
if (authority_state.xds_channels.back() != xds_channel) continue;
// Look at each resource type.
for (auto type_it = authority_state.type_map.begin();
type_it != authority_state.type_map.end();) {
auto& resource_map = type_it->second;
// Remove the cache entry for any resource without watchers.
for (auto resource_it = resource_map.begin();
resource_it != resource_map.end();) {
ResourceState& resource_state = resource_it->second;
if (!resource_state.HasWatchers()) {
resource_map.erase(resource_it++);
} else {
++resource_it;
}
}
// Clean up empty entries in the map.
if (resource_map.empty()) {
authority_state.type_map.erase(type_it++);
} else {
++type_it;
}
}
if (authority_state.type_map.empty()) {
authority_state.xds_channels.clear();
}
}
}

void XdsClient::MaybeRemoveUnsubscribedCacheEntriesForTypeLocked(
XdsChannel* xds_channel, const XdsResourceType* type) {
// Look at each authority for which xds_channel is the last channel.
for (auto& [_, authority_state] : authority_state_map_) {
if (authority_state.xds_channels.back() != xds_channel) continue;
// Find type map.
auto type_it = authority_state.type_map.find(type);
if (type_it == authority_state.type_map.end()) return;
auto& resource_map = type_it->second;
// Remove the cache entry for any resource without watchers.
for (auto resource_it = resource_map.begin();
resource_it != resource_map.end();) {
ResourceState& resource_state = resource_it->second;
if (!resource_state.HasWatchers()) {
resource_map.erase(resource_it++);
} else {
++resource_it;
}
}
// Clean up empty entries in the map.
if (resource_map.empty()) {
authority_state.type_map.erase(type_it);
if (authority_state.type_map.empty()) {
authority_state.xds_channels.clear();
}
}
Expand Down Expand Up @@ -1896,13 +1961,14 @@ void XdsClient::DumpClientConfig(
node, arena);
// Dump each resource.
for (const auto& [authority, authority_state] : authority_state_map_) {
for (const auto& [type, resource_map] : authority_state.resource_map) {
for (const auto& [type, resource_map] : authority_state.type_map) {
auto it =
string_pool
->emplace(absl::StrCat("type.googleapis.com/", type->type_url()))
.first;
upb_StringView type_url = StdStringToUpbString(*it);
for (const auto& [resource_key, resource_state] : resource_map) {
if (!resource_state.HasWatchers()) continue;
auto it2 = string_pool
->emplace(ConstructFullXdsResourceName(
authority, type->type_url(), resource_key))
Expand All @@ -1923,12 +1989,14 @@ void XdsClient::ReportResourceCounts(
ResourceCountLabels labels;
for (const auto& [authority, authority_state] : authority_state_map_) {
labels.xds_authority = authority;
for (const auto& [type, resource_map] : authority_state.resource_map) {
for (const auto& [type, resource_map] : authority_state.type_map) {
labels.resource_type = type->type_url();
// Count the number of entries in each state.
std::map<absl::string_view, uint64_t> counts;
for (const auto& [_, resource_state] : resource_map) {
++counts[resource_state.CacheStateString()];
if (resource_state.HasWatchers()) {
++counts[resource_state.CacheStateString()];
}
}
// Report the count for each state.
for (const auto& [state, count] : counts) {
Expand Down
9 changes: 8 additions & 1 deletion src/core/xds/xds_client/xds_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ class XdsClient : public DualRefCounted<XdsClient> {
struct AuthorityState {
std::vector<RefCountedPtr<XdsChannel>> xds_channels;
std::map<const XdsResourceType*, std::map<XdsResourceKey, ResourceState>>
resource_map;
type_map;
};

absl::Status AppendNodeToStatus(const absl::Status& status) const;
Expand Down Expand Up @@ -391,6 +391,13 @@ class XdsClient : public DualRefCounted<XdsClient> {

bool HasUncachedResources(const AuthorityState& authority_state);

void MaybeRemoveUnsubscribedCacheEntriesLocked(XdsChannel* xds_channel)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);

void MaybeRemoveUnsubscribedCacheEntriesForTypeLocked(
XdsChannel* xds_channel, const XdsResourceType* type)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);

absl::StatusOr<XdsResourceName> ParseXdsResourceName(
absl::string_view name, const XdsResourceType* type);
static std::string ConstructFullXdsResourceName(
Expand Down
Loading

0 comments on commit e6cde8c

Please sign in to comment.