From 6bb55ac6a931fd4aacfa6985ede9397d6f28db1d Mon Sep 17 00:00:00 2001 From: Ilnaz Nizametdinov Date: Fri, 25 Apr 2025 20:21:49 +0300 Subject: [PATCH] Limit inflight config updates (#17725) --- ydb/core/cms/console/console.cpp | 2 +- .../cms/console/console_configs_manager.cpp | 2 +- .../cms/console/console_configs_manager.h | 6 +- .../cms/console/console_configs_provider.cpp | 82 +++++++++++++++---- .../cms/console/console_configs_provider.h | 46 ++++++++++- 5 files changed, 116 insertions(+), 22 deletions(-) diff --git a/ydb/core/cms/console/console.cpp b/ydb/core/cms/console/console.cpp index de9e88f686f0..e542eef3d5c9 100644 --- a/ydb/core/cms/console/console.cpp +++ b/ydb/core/cms/console/console.cpp @@ -31,7 +31,7 @@ void TConsole::OnActivateExecutor(const TActorContext &ctx) TValidatorsRegistry::Instance()->LockValidators(); - ConfigsManager = new TConfigsManager(*this); + ConfigsManager = new TConfigsManager(*this, Counters); ctx.RegisterWithSameMailbox(ConfigsManager); TenantsManager = new TTenantsManager(*this, domains->Domain, diff --git a/ydb/core/cms/console/console_configs_manager.cpp b/ydb/core/cms/console/console_configs_manager.cpp index 80bbe991408c..f3a00d79fb44 100644 --- a/ydb/core/cms/console/console_configs_manager.cpp +++ b/ydb/core/cms/console/console_configs_manager.cpp @@ -213,7 +213,7 @@ void TConfigsManager::Bootstrap(const TActorContext &ctx) ctx, false, NKikimrServices::CMS_CONFIGS); - ConfigsProvider = ctx.Register(new TConfigsProvider(ctx.SelfID)); + ConfigsProvider = ctx.Register(new TConfigsProvider(ctx.SelfID, Counters)); ui32 item = (ui32)NKikimrConsole::TConfigItem::AllowEditYamlInUiItem; ctx.Send(MakeConfigsDispatcherID(SelfId().NodeId()), diff --git a/ydb/core/cms/console/console_configs_manager.h b/ydb/core/cms/console/console_configs_manager.h index 2b7df208aff3..2079dc84e6b3 100644 --- a/ydb/core/cms/console/console_configs_manager.h +++ b/ydb/core/cms/console/console_configs_manager.h @@ -19,6 +19,8 @@ #include #include +#include + namespace NKikimr::NConsole { using NTabletFlatExecutor::ITransaction; @@ -301,8 +303,9 @@ class TConfigsManager : public TActorBootstrapped { } public: - TConfigsManager(TConsole &self) + TConfigsManager(TConsole &self, ::NMonitoring::TDynamicCounterPtr counters) : Self(self) + , Counters(counters) { } @@ -321,6 +324,7 @@ class TConfigsManager : public TActorBootstrapped { private: TConsole &Self; + ::NMonitoring::TDynamicCounterPtr Counters; TConfigsConfig Config; TString DomainName; // All config items by id. diff --git a/ydb/core/cms/console/console_configs_provider.cpp b/ydb/core/cms/console/console_configs_provider.cpp index a7c87f7878be..9ed9584f3426 100644 --- a/ydb/core/cms/console/console_configs_provider.cpp +++ b/ydb/core/cms/console/console_configs_provider.cpp @@ -323,6 +323,7 @@ class TSubscriptionClientSender : public TActorBootstrappedSubscriber.ToString() << ") received wake up"); + Send(OwnerId, new TConfigsProvider::TEvPrivate::TEvWorkerCoolDown(Subscription)); + } + void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr &/*ev*/, const TActorContext &ctx) { LOG_DEBUG_S(ctx, NKikimrServices::CMS_CONFIGS, @@ -370,6 +378,8 @@ class TSubscriptionClientSender : public TActorBootstrappedSubscriber.ToString() << ") send TEvConfigSubscriptionNotificationRequest: " << notification.Get()->Record.ShortDebugString()); + const float mbytes = notification.Get()->GetCachedByteSize() / 1'000'000.f; + Schedule(TDuration::MilliSeconds(100) * mbytes, new TEvents::TEvWakeup()); Send(Subscription->Subscriber, notification.Release(), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession); } @@ -522,7 +532,33 @@ void TConfigsProvider::CheckSubscriptions(const TInMemorySubscriptionSet &subscr const TActorContext &ctx) { for (auto &subscription : subscriptions) - CheckSubscription(subscription, ctx); + ScheduledUpdates[subscription->Subscriber] = EUpdate::All; + ProcessScheduledUpdates(ctx); +} + +void TConfigsProvider::ProcessScheduledUpdates(const TActorContext &ctx) +{ + while (!ScheduledUpdates.empty() && InflightUpdates.size() < MAX_INFLIGHT_UPDATES) { + auto it = ScheduledUpdates.begin(); + if (auto subscription = InMemoryIndex.GetSubscription(it->first)) { + switch (it->second) { + case EUpdate::All: + if (CheckSubscription(subscription, ctx)) { + InflightUpdates.insert(subscription->Subscriber); + } + break; + case EUpdate::Yaml: + if (UpdateConfig(subscription, ctx)) { + InflightUpdates.insert(subscription->Subscriber); + } + break; + } + } + ScheduledUpdates.erase(it); + } + + *Counters.ScheduledConfigUpdates = ScheduledUpdates.size(); + *Counters.InflightConfigUpdates = InflightUpdates.size(); } void TConfigsProvider::CheckSubscription(TSubscription::TPtr subscription, @@ -583,7 +619,7 @@ void TConfigsProvider::CheckSubscription(TSubscription::TPtr subscription, subscription->Worker = ctx.RegisterWithSameMailbox(worker); } -void TConfigsProvider::CheckSubscription(TInMemorySubscription::TPtr subscription, +bool TConfigsProvider::CheckSubscription(TInMemorySubscription::TPtr subscription, const TActorContext &ctx) { LOG_TRACE_S(ctx, NKikimrServices::CMS_CONFIGS, @@ -665,7 +701,7 @@ void TConfigsProvider::CheckSubscription(TInMemorySubscription::TPtr subscriptio LOG_TRACE_S(ctx, NKikimrServices::CMS_CONFIGS, "TConfigsProvider: no changes found for subscription" << " " << subscription->Subscriber.ToString() << ":" << subscription->Generation); - return; + return false; } LOG_TRACE_S(ctx, NKikimrServices::CMS_CONFIGS, @@ -693,13 +729,6 @@ void TConfigsProvider::CheckSubscription(TInMemorySubscription::TPtr subscriptio for (auto &[id, hash] : VolatileYamlConfigHashes) { auto *volatileConfig = request->Record.AddVolatileConfigs(); volatileConfig->SetId(id); - auto hashes = subscription->VolatileYamlConfigHashes.size(); - Y_UNUSED(hashes); - auto itt = subscription->VolatileYamlConfigHashes.find(id); - if (itt != subscription->VolatileYamlConfigHashes.end()) { - auto tmp = itt->second; - Y_UNUSED(tmp); - } if (auto it = subscription->VolatileYamlConfigHashes.find(id); it != subscription->VolatileYamlConfigHashes.end() && it->second == hash) { volatileConfig->SetNotChanged(true); } else { @@ -720,8 +749,9 @@ void TConfigsProvider::CheckSubscription(TInMemorySubscription::TPtr subscriptio } ctx.Send(subscription->Worker, request.Release()); - subscription->FirstUpdateSent = true; + + return true; } void TConfigsProvider::DumpStateHTML(IOutputStream &os) const { @@ -837,7 +867,8 @@ void TConfigsProvider::Handle(TEvConsole::TEvConfigSubscriptionRequest::TPtr &ev subscription->Worker = RegisterWithSameMailbox(new TSubscriptionClientSender(subscription, SelfId())); - CheckSubscription(subscription, ctx); + ScheduledUpdates[subscription->Subscriber] = EUpdate::All; + ProcessScheduledUpdates(ctx); } void TConfigsProvider::Handle(TEvConsole::TEvConfigSubscriptionCanceled::TPtr &ev, const TActorContext &ctx) @@ -865,11 +896,27 @@ void TConfigsProvider::Handle(TEvPrivate::TEvWorkerDisconnected::TPtr &ev, const auto existing = InMemoryIndex.GetSubscription(subscription->Subscriber); if (existing == subscription) { InMemoryIndex.RemoveSubscription(subscription->Subscriber); + ScheduledUpdates.erase(subscription->Subscriber); + InflightUpdates.erase(subscription->Subscriber); + Send(subscription->Subscriber, new TEvConsole::TEvConfigSubscriptionCanceled(subscription->Generation)); LOG_DEBUG_S(ctx, NKikimrServices::CMS_CONFIGS, "TConfigsProvider removed subscription " << subscription->Subscriber<< ":" << subscription->Generation << " (subscription worker died)"); } + + ProcessScheduledUpdates(ctx); +} + +void TConfigsProvider::Handle(TEvPrivate::TEvWorkerCoolDown::TPtr &ev, const TActorContext &ctx) +{ + auto subscription = ev->Get()->Subscription; + auto existing = InMemoryIndex.GetSubscription(subscription->Subscriber); + if (existing == subscription) { + InflightUpdates.erase(subscription->Subscriber); + } + + ProcessScheduledUpdates(ctx); } void TConfigsProvider::Handle(TEvConsole::TEvCheckConfigUpdatesRequest::TPtr &ev, const TActorContext &ctx) @@ -1259,19 +1306,21 @@ void TConfigsProvider::Handle(TEvPrivate::TEvUpdateYamlConfig::TPtr &ev, const T } for (auto &[_, subscription] : InMemoryIndex.GetSubscriptions()) { - UpdateConfig(subscription, ctx); + ScheduledUpdates.emplace(subscription->Subscriber, EUpdate::Yaml); } } else { const auto* subs = InMemoryIndex.GetSubscriptions(ev->Get()->ChangedDatabase); if (subs) { for (auto &subscription : *subs) { - UpdateConfig(subscription, ctx); + ScheduledUpdates.emplace(subscription->Subscriber, EUpdate::Yaml); } } } + + ProcessScheduledUpdates(ctx); } -void TConfigsProvider::UpdateConfig(TInMemorySubscription::TPtr subscription, +bool TConfigsProvider::UpdateConfig(TInMemorySubscription::TPtr subscription, const TActorContext &ctx) { if (subscription->ServeYaml) { @@ -1309,7 +1358,10 @@ void TConfigsProvider::UpdateConfig(TInMemorySubscription::TPtr subscription, } ctx.Send(subscription->Worker, request.Release()); + return true; } + + return false; } } // namespace NKikimr::NConsole diff --git a/ydb/core/cms/console/console_configs_provider.h b/ydb/core/cms/console/console_configs_provider.h index 412b467e9a35..541a44bcae70 100644 --- a/ydb/core/cms/console/console_configs_provider.h +++ b/ydb/core/cms/console/console_configs_provider.h @@ -8,9 +8,10 @@ #include #include #include - #include +#include + namespace NKikimr::NConsole { struct TDatabaseYamlConfig { @@ -31,6 +32,7 @@ class TConfigsProvider : public TActorBootstrapped { EvUpdateYamlConfig, EvUpdateSubscriptions, EvWorkerDisconnected, + EvWorkerCoolDown, EvEnd }; @@ -64,6 +66,15 @@ class TConfigsProvider : public TActorBootstrapped { TInMemorySubscription::TPtr Subscription; }; + struct TEvWorkerCoolDown: public TEventLocal { + explicit TEvWorkerCoolDown(TInMemorySubscription::TPtr subscription) + : Subscription(subscription) + { + } + + TInMemorySubscription::TPtr Subscription; + }; + struct TEvSetConfig : public TEventLocal { TEvSetConfig(const TConfigsConfig &config) : Config(config) @@ -178,12 +189,14 @@ class TConfigsProvider : public TActorBootstrapped { const TActorContext &ctx); void CheckSubscription(TSubscription::TPtr subscriptions, const TActorContext &ctx); - void CheckSubscription(TInMemorySubscription::TPtr subscriptions, + bool CheckSubscription(TInMemorySubscription::TPtr subscriptions, const TActorContext &ctx); - void UpdateConfig(TInMemorySubscription::TPtr subscription, + bool UpdateConfig(TInMemorySubscription::TPtr subscription, const TActorContext &ctx); + void ProcessScheduledUpdates(const TActorContext &ctx); + void Handle(NMon::TEvHttpInfo::TPtr &ev); void Handle(TEvConsole::TEvConfigSubscriptionRequest::TPtr &ev, const TActorContext &ctx); void Handle(TEvConsole::TEvConfigSubscriptionCanceled::TPtr &ev, const TActorContext &ctx); @@ -195,6 +208,7 @@ class TConfigsProvider : public TActorBootstrapped { void Handle(TEvConsole::TEvGetNodeConfigRequest::TPtr &ev, const TActorContext &ctx); void Handle(TEvConsole::TEvListConfigSubscriptionsRequest::TPtr &ev, const TActorContext &ctx); void Handle(TEvPrivate::TEvWorkerDisconnected::TPtr &ev, const TActorContext &ctx); + void Handle(TEvPrivate::TEvWorkerCoolDown::TPtr &ev, const TActorContext &ctx); void Handle(TEvPrivate::TEvNotificationTimeout::TPtr &ev, const TActorContext &ctx); void Handle(TEvPrivate::TEvSenderDied::TPtr &ev, const TActorContext &ctx); void Handle(TEvPrivate::TEvSetConfig::TPtr &ev, const TActorContext &ctx); @@ -225,6 +239,7 @@ class TConfigsProvider : public TActorBootstrapped { HFuncTraced(TEvConsole::TEvGetNodeConfigRequest, Handle); HFuncTraced(TEvConsole::TEvListConfigSubscriptionsRequest, Handle); HFuncTraced(TEvPrivate::TEvWorkerDisconnected, Handle); + HFuncTraced(TEvPrivate::TEvWorkerCoolDown, Handle); HFuncTraced(TEvPrivate::TEvNotificationTimeout, Handle); HFuncTraced(TEvPrivate::TEvSenderDied, Handle); HFuncTraced(TEvPrivate::TEvSetConfig, Handle); @@ -242,9 +257,22 @@ class TConfigsProvider : public TActorBootstrapped { } } + struct TCounters { + using TCounterPtr = ::NMonitoring::TDynamicCounters::TCounterPtr; + TCounterPtr ScheduledConfigUpdates; + TCounterPtr InflightConfigUpdates; + + explicit TCounters(::NMonitoring::TDynamicCounterPtr counters) + : ScheduledConfigUpdates(counters->GetCounter("ScheduledConfigUpdates", false)) + , InflightConfigUpdates(counters->GetCounter("InflightConfigUpdates", false)) + { + } + }; + public: - TConfigsProvider(TActorId ownerId) + TConfigsProvider(TActorId ownerId, ::NMonitoring::TDynamicCounterPtr counters) : ConfigsManager(ownerId) + , Counters(counters) { } @@ -264,10 +292,20 @@ class TConfigsProvider : public TActorBootstrapped { private: TActorId ConfigsManager; + TCounters Counters; TConfigsConfig Config; TConfigIndex ConfigIndex; TSubscriptionIndex SubscriptionIndex; + + enum class EUpdate { + All, + Yaml, + }; + TInMemorySubscriptionIndex InMemoryIndex; + THashMap ScheduledUpdates; + THashSet InflightUpdates; + static constexpr ui32 MAX_INFLIGHT_UPDATES = 50; TString MainYamlConfig; TMap VolatileYamlConfigs;