From c1dce2b10dcbd13bd556a059094cba0d9b877faa Mon Sep 17 00:00:00 2001 From: Martin Date: Thu, 27 Mar 2025 15:23:56 -0500 Subject: [PATCH 01/10] RetryAction compiles --- .../SonicCore/interface/RetryActionBase.h | 26 +++++++++++++++++++ .../interface/RetrySameServerAction.h | 14 ++++++++++ .../SonicCore/src/RetryActionBase.cc | 15 +++++++++++ .../SonicCore/src/RetrySameServerAction.cc | 11 ++++++++ 4 files changed, 66 insertions(+) create mode 100644 HeterogeneousCore/SonicCore/interface/RetryActionBase.h create mode 100644 HeterogeneousCore/SonicCore/interface/RetrySameServerAction.h create mode 100644 HeterogeneousCore/SonicCore/src/RetryActionBase.cc create mode 100644 HeterogeneousCore/SonicCore/src/RetrySameServerAction.cc diff --git a/HeterogeneousCore/SonicCore/interface/RetryActionBase.h b/HeterogeneousCore/SonicCore/interface/RetryActionBase.h new file mode 100644 index 0000000000000..3a95578783b3d --- /dev/null +++ b/HeterogeneousCore/SonicCore/interface/RetryActionBase.h @@ -0,0 +1,26 @@ +#ifndef RETRY_ACTION_BASE_H +#define RETRY_ACTION_BASE_H + +#include "FWCore/PluginManager/interface/PluginFactory.h" +#include "FWCore/ParameterSet/interface/ParameterSet.h" +#include "HeterogeneousCore/SonicCore/interface/SonicClientBase.h" +#include +#include + +// Base class for retry actions +class RetryActionBase { +public: + RetryActionBase(const edm::ParameterSet& conf, SonicClientBase* client); + virtual ~RetryActionBase() = default; +protected: + virtual void retry() = 0; // Pure virtual function for execution logic + void eval(); // interface for calling evaluate in client + +protected: + SonicClientBase* client_; +}; + +// Define the factory for creating retry actions +using RetryActionFactory = edmplugin::PluginFactory; + +#endif // RETRY_ACTION_BASE_H diff --git a/HeterogeneousCore/SonicCore/interface/RetrySameServerAction.h b/HeterogeneousCore/SonicCore/interface/RetrySameServerAction.h new file mode 100644 index 0000000000000..cb752262dce28 --- /dev/null +++ b/HeterogeneousCore/SonicCore/interface/RetrySameServerAction.h @@ -0,0 +1,14 @@ +#include "HeterogeneousCore/SonicCore/interface/RetryActionBase.h" +#include "HeterogeneousCore/SonicCore/interface/SonicClientBase.h" + +class RetrySameServerAction : public RetryActionBase { +public: + RetrySameServerAction(const edm::ParameterSet& pset, SonicClientBase* client) + : RetryActionBase(pset, client), + allowedTries_(pset.getUntrackedParameter("allowedTries", 0)) {} +protected: + void retry(); + +private: + unsigned allowedTries_,tries_; +}; diff --git a/HeterogeneousCore/SonicCore/src/RetryActionBase.cc b/HeterogeneousCore/SonicCore/src/RetryActionBase.cc new file mode 100644 index 0000000000000..ecdae15543654 --- /dev/null +++ b/HeterogeneousCore/SonicCore/src/RetryActionBase.cc @@ -0,0 +1,15 @@ +#include "HeterogeneousCore/SonicCore/interface/RetryActionBase.h" + +// Constructor implementation +RetryActionBase::RetryActionBase(const edm::ParameterSet& conf,SonicClientBase* client) : client_(client) {} + + +void RetryActionBase::eval() { + if (client_) { + client_->evaluate(); + } else { + edm::LogError("RetryActionBase") << "Client pointer is null, cannot evaluate."; + } +} + +EDM_REGISTER_PLUGINFACTORY(RetryActionFactory, "RetryActionFactory"); diff --git a/HeterogeneousCore/SonicCore/src/RetrySameServerAction.cc b/HeterogeneousCore/SonicCore/src/RetrySameServerAction.cc new file mode 100644 index 0000000000000..637c4fe2bbff9 --- /dev/null +++ b/HeterogeneousCore/SonicCore/src/RetrySameServerAction.cc @@ -0,0 +1,11 @@ +#include "HeterogeneousCore/SonicCore/interface/RetrySameServerAction.h" +#include "HeterogeneousCore/SonicCore/interface/SonicClientBase.h" + +void RetrySameServerAction::retry() { + ++tries_; + //if max retries has not been exceeded, call evaluate again + if (tries_ < allowedTries_) { + eval(); + return; + } +} From 4dde1c61684f0adc50a2aa1cf5ec1ce922820b61 Mon Sep 17 00:00:00 2001 From: Martin Date: Wed, 2 Apr 2025 09:52:35 -0500 Subject: [PATCH 02/10] Include RetryAction in SonicClientBase --- .../SonicCore/interface/RetryActionBase.h | 23 +++++--- .../interface/RetrySameServerAction.h | 12 ++-- .../SonicCore/interface/SonicClientBase.h | 12 ++++ .../SonicCore/src/RetryActionBase.cc | 13 ++--- .../SonicCore/src/RetrySameServerAction.cc | 19 ++++--- .../SonicCore/src/SonicClientBase.cc | 55 ++++++++++++++++--- 6 files changed, 99 insertions(+), 35 deletions(-) diff --git a/HeterogeneousCore/SonicCore/interface/RetryActionBase.h b/HeterogeneousCore/SonicCore/interface/RetryActionBase.h index 3a95578783b3d..4732abc27a38f 100644 --- a/HeterogeneousCore/SonicCore/interface/RetryActionBase.h +++ b/HeterogeneousCore/SonicCore/interface/RetryActionBase.h @@ -10,17 +10,24 @@ // Base class for retry actions class RetryActionBase { public: - RetryActionBase(const edm::ParameterSet& conf, SonicClientBase* client); - virtual ~RetryActionBase() = default; + RetryActionBase(const edm::ParameterSet& conf, SonicClientBase* client); + virtual ~RetryActionBase() = default; + + bool shouldRetry() const { return shouldRetry_; } // Getter for shouldRetry_ + + virtual void retry() = 0; // Pure virtual function for execution logic + virtual void start() = 0; // Pure virtual function for execution logic for initialization + protected: - virtual void retry() = 0; // Pure virtual function for execution logic - void eval(); // interface for calling evaluate in client - + void eval(); // interface for calling evaluate in client + protected: - SonicClientBase* client_; + SonicClientBase* client_; + bool shouldRetry_; // Flag to track if further retries should happen }; // Define the factory for creating retry actions -using RetryActionFactory = edmplugin::PluginFactory; +using RetryActionFactory = + edmplugin::PluginFactory; -#endif // RETRY_ACTION_BASE_H +#endif // RETRY_ACTION_BASE_H diff --git a/HeterogeneousCore/SonicCore/interface/RetrySameServerAction.h b/HeterogeneousCore/SonicCore/interface/RetrySameServerAction.h index cb752262dce28..cd8cda3a2d435 100644 --- a/HeterogeneousCore/SonicCore/interface/RetrySameServerAction.h +++ b/HeterogeneousCore/SonicCore/interface/RetrySameServerAction.h @@ -3,12 +3,14 @@ class RetrySameServerAction : public RetryActionBase { public: - RetrySameServerAction(const edm::ParameterSet& pset, SonicClientBase* client) - : RetryActionBase(pset, client), - allowedTries_(pset.getUntrackedParameter("allowedTries", 0)) {} + RetrySameServerAction(const edm::ParameterSet& pset, SonicClientBase* client) + : RetryActionBase(pset, client), allowedTries_(pset.getUntrackedParameter("allowedTries", 0)) {} + + void start() override { tries_=0;}; + protected: - void retry(); + void retry() override; private: - unsigned allowedTries_,tries_; + unsigned allowedTries_, tries_; }; diff --git a/HeterogeneousCore/SonicCore/interface/SonicClientBase.h b/HeterogeneousCore/SonicCore/interface/SonicClientBase.h index 47caaae8b2052..5038f566dbc27 100644 --- a/HeterogeneousCore/SonicCore/interface/SonicClientBase.h +++ b/HeterogeneousCore/SonicCore/interface/SonicClientBase.h @@ -9,12 +9,15 @@ #include "HeterogeneousCore/SonicCore/interface/SonicDispatcherPseudoAsync.h" #include +#include #include #include #include enum class SonicMode { Sync = 1, Async = 2, PseudoAsync = 3 }; +class RetryActionBase; + class SonicClientBase { public: //constructor @@ -57,11 +60,20 @@ class SonicClientBase { unsigned allowedTries_, tries_; std::optional holder_; + // Use a unique_ptr with a custom deleter to avoid incomplete type issues + struct RetryDeleter { + void operator()(RetryActionBase* ptr) const; + }; + + using RetryActionPtr = std::unique_ptr; + std::vector retryActions_; + //for logging/debugging std::string debugName_, clientName_, fullDebugName_; friend class SonicDispatcher; friend class SonicDispatcherPseudoAsync; + friend class RetryActionBase; }; #endif diff --git a/HeterogeneousCore/SonicCore/src/RetryActionBase.cc b/HeterogeneousCore/SonicCore/src/RetryActionBase.cc index ecdae15543654..c595458570b0d 100644 --- a/HeterogeneousCore/SonicCore/src/RetryActionBase.cc +++ b/HeterogeneousCore/SonicCore/src/RetryActionBase.cc @@ -1,15 +1,14 @@ #include "HeterogeneousCore/SonicCore/interface/RetryActionBase.h" // Constructor implementation -RetryActionBase::RetryActionBase(const edm::ParameterSet& conf,SonicClientBase* client) : client_(client) {} - +RetryActionBase::RetryActionBase(const edm::ParameterSet& conf, SonicClientBase* client) : client_(client), shouldRetry_(true) {} void RetryActionBase::eval() { - if (client_) { - client_->evaluate(); - } else { - edm::LogError("RetryActionBase") << "Client pointer is null, cannot evaluate."; - } + if (client_) { + client_->evaluate(); + } else { + edm::LogError("RetryActionBase") << "Client pointer is null, cannot evaluate."; + } } EDM_REGISTER_PLUGINFACTORY(RetryActionFactory, "RetryActionFactory"); diff --git a/HeterogeneousCore/SonicCore/src/RetrySameServerAction.cc b/HeterogeneousCore/SonicCore/src/RetrySameServerAction.cc index 637c4fe2bbff9..16959bec547a1 100644 --- a/HeterogeneousCore/SonicCore/src/RetrySameServerAction.cc +++ b/HeterogeneousCore/SonicCore/src/RetrySameServerAction.cc @@ -1,11 +1,16 @@ #include "HeterogeneousCore/SonicCore/interface/RetrySameServerAction.h" #include "HeterogeneousCore/SonicCore/interface/SonicClientBase.h" -void RetrySameServerAction::retry() { - ++tries_; - //if max retries has not been exceeded, call evaluate again - if (tries_ < allowedTries_) { - eval(); - return; - } +void RetrySameServerAction::retry() { + ++tries_; + //if max retries has not been exceeded, call evaluate again + if (tries_ < allowedTries_) { + eval(); + return; + }else{ + shouldRetry_ = false; // Flip flag when max retries are reached + edm::LogInfo("RetrySameServerAction") << "Max retry attempts reached. No further retries."; + } } + +DEFINE_EDM_PLUGIN(RetryActionFactory, RetrySameServerAction, "RetrySameServerAction"); diff --git a/HeterogeneousCore/SonicCore/src/SonicClientBase.cc b/HeterogeneousCore/SonicCore/src/SonicClientBase.cc index 745c51f17aaf3..2a4bb73a128b8 100644 --- a/HeterogeneousCore/SonicCore/src/SonicClientBase.cc +++ b/HeterogeneousCore/SonicCore/src/SonicClientBase.cc @@ -1,7 +1,14 @@ #include "HeterogeneousCore/SonicCore/interface/SonicClientBase.h" +#include "HeterogeneousCore/SonicCore/interface/RetryActionBase.h" #include "FWCore/Utilities/interface/Exception.h" #include "FWCore/ParameterSet/interface/allowedValues.h" + +// Custom deleter implementation +void SonicClientBase::RetryDeleter::operator()(RetryActionBase* ptr) const { + delete ptr; +} + SonicClientBase::SonicClientBase(const edm::ParameterSet& params, const std::string& debugName, const std::string& clientName) @@ -12,6 +19,18 @@ SonicClientBase::SonicClientBase(const edm::ParameterSet& params, if (!clientName_.empty()) fullDebugName_ += ":" + clientName_; + std::vector retryPSetList = params.getParameter>("Retry"); + + for (const auto& retryPSet : retryPSetList) { + std::string actionType = retryPSet.getParameter("retryType"); + + auto retryAction = RetryActionFactory::get()->create(actionType, retryPSet, this); + if (retryAction) { + //Convert to RetryActionPtr Type from raw pointer of retryAction + retryActions_.emplace_back(RetryActionPtr(retryAction.release())); + } + } + std::string modeName(params.getParameter("mode")); if (modeName == "Sync") setMode(SonicMode::Sync); @@ -40,19 +59,39 @@ void SonicClientBase::start(edm::WaitingTaskWithArenaHolder holder) { holder_ = std::move(holder); } -void SonicClientBase::start() { tries_ = 0; } +void SonicClientBase::start() { + tries_ = 0; + // initialize all actions + for (const auto& action : retryActions_) { + action->start(); + } +} void SonicClientBase::finish(bool success, std::exception_ptr eptr) { //retries are only allowed if no exception was raised if (!success and !eptr) { - ++tries_; - //if max retries has not been exceeded, call evaluate again - if (tries_ < allowedTries_) { - evaluate(); - //avoid calling doneWaiting() twice - return; + //++tries_; + ////if max retries has not been exceeded, call evaluate again + //if (tries_ < allowedTries_) { + // evaluate(); + // //avoid calling doneWaiting() twice + // return; + //} + + // Check if any retry actions are still valid + bool anyRetryAllowed = false; + for (const auto& action : retryActions_) { + if (action->shouldRetry()) { + action->retry(); // Call retry only if shouldRetry_ is true + return; + } + } + // If no actions allow retries, stop retrying + if (!anyRetryAllowed) { + edm::LogInfo("SonicClientBase") << "No retry actions available. Stopping retries."; + return; } - //prepare an exception if exceeded + //prepare an exception if no more retries left else { edm::Exception ex(edm::errors::ExternalFailure); ex << "SonicCallFailed: call failed after max " << tries_ << " tries"; From 5b093e4f8d610955dc4876b0efbc729b5d6111ae Mon Sep 17 00:00:00 2001 From: Martin Date: Mon, 7 Apr 2025 08:55:41 -0500 Subject: [PATCH 03/10] Update PR comments --- .../SonicCore/interface/RetryActionBase.h | 8 +- .../interface/RetrySameServerAction.h | 2 +- .../SonicCore/interface/SonicClientBase.h | 4 +- .../SonicCore/src/RetryActionBase.cc | 3 +- .../SonicCore/src/RetrySameServerAction.cc | 6 +- .../SonicCore/src/SonicClientBase.cc | 82 +++++++++---------- .../SonicCore/test/DummyClient.h | 2 +- .../SonicCore/test/sonicTest_cfg.py | 44 ++++++++-- 8 files changed, 91 insertions(+), 60 deletions(-) diff --git a/HeterogeneousCore/SonicCore/interface/RetryActionBase.h b/HeterogeneousCore/SonicCore/interface/RetryActionBase.h index 4732abc27a38f..d81183df39a47 100644 --- a/HeterogeneousCore/SonicCore/interface/RetryActionBase.h +++ b/HeterogeneousCore/SonicCore/interface/RetryActionBase.h @@ -1,5 +1,5 @@ -#ifndef RETRY_ACTION_BASE_H -#define RETRY_ACTION_BASE_H +#ifndef HeterogeneousCore_SonicCore_RetryActionBase +#define HeterogeneousCore_SonicCore_RetryActionBase #include "FWCore/PluginManager/interface/PluginFactory.h" #include "FWCore/ParameterSet/interface/ParameterSet.h" @@ -19,7 +19,7 @@ class RetryActionBase { virtual void start() = 0; // Pure virtual function for execution logic for initialization protected: - void eval(); // interface for calling evaluate in client + void eval(); // interface for calling evaluate in client protected: SonicClientBase* client_; @@ -30,4 +30,4 @@ class RetryActionBase { using RetryActionFactory = edmplugin::PluginFactory; -#endif // RETRY_ACTION_BASE_H +#endif diff --git a/HeterogeneousCore/SonicCore/interface/RetrySameServerAction.h b/HeterogeneousCore/SonicCore/interface/RetrySameServerAction.h index cd8cda3a2d435..8ecce2a170847 100644 --- a/HeterogeneousCore/SonicCore/interface/RetrySameServerAction.h +++ b/HeterogeneousCore/SonicCore/interface/RetrySameServerAction.h @@ -6,7 +6,7 @@ class RetrySameServerAction : public RetryActionBase { RetrySameServerAction(const edm::ParameterSet& pset, SonicClientBase* client) : RetryActionBase(pset, client), allowedTries_(pset.getUntrackedParameter("allowedTries", 0)) {} - void start() override { tries_=0;}; + void start() override { tries_ = 0; }; protected: void retry() override; diff --git a/HeterogeneousCore/SonicCore/interface/SonicClientBase.h b/HeterogeneousCore/SonicCore/interface/SonicClientBase.h index 5038f566dbc27..45a089701ed12 100644 --- a/HeterogeneousCore/SonicCore/interface/SonicClientBase.h +++ b/HeterogeneousCore/SonicCore/interface/SonicClientBase.h @@ -57,12 +57,12 @@ class SonicClientBase { SonicMode mode_; bool verbose_; std::unique_ptr dispatcher_; - unsigned allowedTries_, tries_; + unsigned totalTries_; std::optional holder_; // Use a unique_ptr with a custom deleter to avoid incomplete type issues struct RetryDeleter { - void operator()(RetryActionBase* ptr) const; + void operator()(RetryActionBase* ptr) const; }; using RetryActionPtr = std::unique_ptr; diff --git a/HeterogeneousCore/SonicCore/src/RetryActionBase.cc b/HeterogeneousCore/SonicCore/src/RetryActionBase.cc index c595458570b0d..41b9a6186da2b 100644 --- a/HeterogeneousCore/SonicCore/src/RetryActionBase.cc +++ b/HeterogeneousCore/SonicCore/src/RetryActionBase.cc @@ -1,7 +1,8 @@ #include "HeterogeneousCore/SonicCore/interface/RetryActionBase.h" // Constructor implementation -RetryActionBase::RetryActionBase(const edm::ParameterSet& conf, SonicClientBase* client) : client_(client), shouldRetry_(true) {} +RetryActionBase::RetryActionBase(const edm::ParameterSet& conf, SonicClientBase* client) + : client_(client), shouldRetry_(true) {} void RetryActionBase::eval() { if (client_) { diff --git a/HeterogeneousCore/SonicCore/src/RetrySameServerAction.cc b/HeterogeneousCore/SonicCore/src/RetrySameServerAction.cc index 16959bec547a1..b5a24af935596 100644 --- a/HeterogeneousCore/SonicCore/src/RetrySameServerAction.cc +++ b/HeterogeneousCore/SonicCore/src/RetrySameServerAction.cc @@ -7,9 +7,9 @@ void RetrySameServerAction::retry() { if (tries_ < allowedTries_) { eval(); return; - }else{ - shouldRetry_ = false; // Flip flag when max retries are reached - edm::LogInfo("RetrySameServerAction") << "Max retry attempts reached. No further retries."; + } else { + shouldRetry_ = false; // Flip flag when max retries are reached + edm::LogInfo("RetrySameServerAction") << "Max retry attempts reached. No further retries."; } } diff --git a/HeterogeneousCore/SonicCore/src/SonicClientBase.cc b/HeterogeneousCore/SonicCore/src/SonicClientBase.cc index 2a4bb73a128b8..514a680b2518b 100644 --- a/HeterogeneousCore/SonicCore/src/SonicClientBase.cc +++ b/HeterogeneousCore/SonicCore/src/SonicClientBase.cc @@ -3,35 +3,31 @@ #include "FWCore/Utilities/interface/Exception.h" #include "FWCore/ParameterSet/interface/allowedValues.h" - // Custom deleter implementation -void SonicClientBase::RetryDeleter::operator()(RetryActionBase* ptr) const { - delete ptr; -} +void SonicClientBase::RetryDeleter::operator()(RetryActionBase* ptr) const { delete ptr; } SonicClientBase::SonicClientBase(const edm::ParameterSet& params, const std::string& debugName, const std::string& clientName) - : allowedTries_(params.getUntrackedParameter("allowedTries", 0)), - debugName_(debugName), - clientName_(clientName), - fullDebugName_(debugName_) { + : debugName_(debugName), clientName_(clientName), fullDebugName_(debugName_) { if (!clientName_.empty()) fullDebugName_ += ":" + clientName_; - std::vector retryPSetList = params.getParameter>("Retry"); + const auto& retryPSetList = params.getParameter>("Retry"); + std::string modeName(params.getParameter("mode")); for (const auto& retryPSet : retryPSetList) { - std::string actionType = retryPSet.getParameter("retryType"); + const std::string& actionType = retryPSet.getParameter("retryType"); - auto retryAction = RetryActionFactory::get()->create(actionType, retryPSet, this); - if (retryAction) { - //Convert to RetryActionPtr Type from raw pointer of retryAction - retryActions_.emplace_back(RetryActionPtr(retryAction.release())); - } + auto retryAction = RetryActionFactory::get()->create(actionType, retryPSet, this); + if (retryAction) { + //Convert to RetryActionPtr Type from raw pointer of retryAction + retryActions_.emplace_back(RetryActionPtr(retryAction.release())); + } else { + throw cms::Exception("Configuration") << "Unknown Retry type" << actionType << " for SonicClient: " << modeName; + } } - std::string modeName(params.getParameter("mode")); if (modeName == "Sync") setMode(SonicMode::Sync); else if (modeName == "Async") @@ -59,42 +55,32 @@ void SonicClientBase::start(edm::WaitingTaskWithArenaHolder holder) { holder_ = std::move(holder); } -void SonicClientBase::start() { - tries_ = 0; - // initialize all actions - for (const auto& action : retryActions_) { - action->start(); - } +void SonicClientBase::start() { + totalTries_ = 0; + // initialize all actions + for (const auto& action : retryActions_) { + action->start(); + } } void SonicClientBase::finish(bool success, std::exception_ptr eptr) { //retries are only allowed if no exception was raised if (!success and !eptr) { - //++tries_; - ////if max retries has not been exceeded, call evaluate again - //if (tries_ < allowedTries_) { - // evaluate(); - // //avoid calling doneWaiting() twice - // return; - //} - + ++totalTries_; // Check if any retry actions are still valid bool anyRetryAllowed = false; for (const auto& action : retryActions_) { - if (action->shouldRetry()) { - action->retry(); // Call retry only if shouldRetry_ is true - return; - } - } - // If no actions allow retries, stop retrying - if (!anyRetryAllowed) { - edm::LogInfo("SonicClientBase") << "No retry actions available. Stopping retries."; + if (action->shouldRetry()) { + action->retry(); // Call retry only if shouldRetry_ is true return; + } } //prepare an exception if no more retries left - else { + if (!anyRetryAllowed) { + edm::LogInfo("SonicClientBase") << "SonicCallFailed: call failed, no retry actions available after " + << totalTries_ << " tries."; edm::Exception ex(edm::errors::ExternalFailure); - ex << "SonicCallFailed: call failed after max " << tries_ << " tries"; + ex << "SonicCallFailed: call failed, no retry actions available after " << totalTries_ << " tries."; eptr = make_exception_ptr(ex); } } @@ -113,7 +99,19 @@ void SonicClientBase::fillBasePSetDescription(edm::ParameterSetDescription& desc //restrict allowed values desc.ifValue(edm::ParameterDescription("mode", "PseudoAsync", true), edm::allowedValues("Sync", "Async", "PseudoAsync")); - if (allowRetry) - desc.addUntracked("allowedTries", 0); + if (allowRetry) { + // Defines the structure of each entry in the VPSet + edm::ParameterSetDescription retryDesc; + retryDesc.add("retryType", "RetrySameServerAction"); + + // Define a default retry action + edm::ParameterSet defaultRetry; + defaultRetry.addParameter("retryType", "RetrySameServerAction"); + defaultRetry.addUntrackedParameter("allowedTries", 0); + + // Add the VPSet with the default retry action + desc.addVPSet("Retry", retryDesc, {defaultRetry}); + } + desc.add("sonicClientBase", desc); desc.addUntracked("verbose", false); } diff --git a/HeterogeneousCore/SonicCore/test/DummyClient.h b/HeterogeneousCore/SonicCore/test/DummyClient.h index ccef888ad9f7d..6504843926c0a 100644 --- a/HeterogeneousCore/SonicCore/test/DummyClient.h +++ b/HeterogeneousCore/SonicCore/test/DummyClient.h @@ -36,7 +36,7 @@ class DummyClient : public SonicClient { this->output_ = this->input_ * factor_; //simulate a failure - if (this->tries_ < fails_) + if (this->totalTries_ < fails_) this->finish(false); else this->finish(true); diff --git a/HeterogeneousCore/SonicCore/test/sonicTest_cfg.py b/HeterogeneousCore/SonicCore/test/sonicTest_cfg.py index 2cc429138b85c..43a183372dc33 100644 --- a/HeterogeneousCore/SonicCore/test/sonicTest_cfg.py +++ b/HeterogeneousCore/SonicCore/test/sonicTest_cfg.py @@ -26,8 +26,13 @@ mode = cms.string("Sync"), factor = cms.int32(-1), wait = cms.int32(10), - allowedTries = cms.untracked.uint32(0), fails = cms.uint32(0), + Retry = cms.VPSet( + cms.PSet( + retryType = cms.string('RetrySameServerAction'), + allowedTries = cms.untracked.uint32(0) + ) + ) ), ) @@ -37,8 +42,14 @@ mode = cms.string("PseudoAsync"), factor = cms.int32(2), wait = cms.int32(10), - allowedTries = cms.untracked.uint32(0), fails = cms.uint32(0), + Retry = cms.VPSet( + cms.PSet( + retryType = cms.string('RetrySameServerAction'), + allowedTries = cms.untracked.uint32(0) + ) + ) + ), ) @@ -48,32 +59,53 @@ mode = cms.string("Async"), factor = cms.int32(5), wait = cms.int32(10), - allowedTries = cms.untracked.uint32(0), fails = cms.uint32(0), + Retry = cms.VPSet( + cms.PSet( + retryType = cms.string('RetrySameServerAction'), + allowedTries = cms.untracked.uint32(0) + ) + ) ), ) process.dummySyncRetry = process.dummySync.clone( Client = dict( wait = 2, - allowedTries = 2, fails = 1, + Retry = cms.VPSet( + cms.PSet( + retryType = cms.string('RetrySameServerAction'), + allowedTries = cms.untracked.uint32(2) + ) + ) + ) ) process.dummyPseudoAsyncRetry = process.dummyPseudoAsync.clone( Client = dict( wait = 2, - allowedTries = 2, fails = 1, + Retry = cms.VPSet( + cms.PSet( + retryType = cms.string('RetrySameServerAction'), + allowedTries = cms.untracked.uint32(2) + ) + ) ) ) process.dummyAsyncRetry = process.dummyAsync.clone( Client = dict( wait = 2, - allowedTries = 2, fails = 1, + Retry = cms.VPSet( + cms.PSet( + allowedTries = cms.untracked.uint32(2), + retryType = cms.string('RetrySameServerAction') + ) + ) ) ) From c062112a0673f1fe3c847e6919cce900929bdc59 Mon Sep 17 00:00:00 2001 From: Martin Date: Fri, 11 Apr 2025 11:43:38 -0500 Subject: [PATCH 04/10] PR comments, fix fillDescriptions --- .../SonicCore/interface/RetryActionBase.h | 2 ++ .../SonicCore/src/RetrySameServerAction.cc | 2 +- .../SonicCore/src/SonicClientBase.cc | 17 +++++++---------- .../SonicCore/test/sonicTest_cfg.py | 1 - 4 files changed, 10 insertions(+), 12 deletions(-) diff --git a/HeterogeneousCore/SonicCore/interface/RetryActionBase.h b/HeterogeneousCore/SonicCore/interface/RetryActionBase.h index d81183df39a47..e3fc0bbb8af9a 100644 --- a/HeterogeneousCore/SonicCore/interface/RetryActionBase.h +++ b/HeterogeneousCore/SonicCore/interface/RetryActionBase.h @@ -31,3 +31,5 @@ using RetryActionFactory = edmplugin::PluginFactory; #endif + +#define DEFINE_RETRY_ACTION(type) DEFINE_EDM_PLUGIN(RetryActionFactory, type, #type); diff --git a/HeterogeneousCore/SonicCore/src/RetrySameServerAction.cc b/HeterogeneousCore/SonicCore/src/RetrySameServerAction.cc index b5a24af935596..31c4fec227500 100644 --- a/HeterogeneousCore/SonicCore/src/RetrySameServerAction.cc +++ b/HeterogeneousCore/SonicCore/src/RetrySameServerAction.cc @@ -13,4 +13,4 @@ void RetrySameServerAction::retry() { } } -DEFINE_EDM_PLUGIN(RetryActionFactory, RetrySameServerAction, "RetrySameServerAction"); +DEFINE_RETRY_ACTION(RetrySameServerAction) diff --git a/HeterogeneousCore/SonicCore/src/SonicClientBase.cc b/HeterogeneousCore/SonicCore/src/SonicClientBase.cc index 514a680b2518b..9949d9d1f2ea2 100644 --- a/HeterogeneousCore/SonicCore/src/SonicClientBase.cc +++ b/HeterogeneousCore/SonicCore/src/SonicClientBase.cc @@ -24,7 +24,7 @@ SonicClientBase::SonicClientBase(const edm::ParameterSet& params, //Convert to RetryActionPtr Type from raw pointer of retryAction retryActions_.emplace_back(RetryActionPtr(retryAction.release())); } else { - throw cms::Exception("Configuration") << "Unknown Retry type" << actionType << " for SonicClient: " << modeName; + throw cms::Exception("Configuration") << "Unknown Retry type " << actionType << " for SonicClient: " << modeName; } } @@ -67,8 +67,6 @@ void SonicClientBase::finish(bool success, std::exception_ptr eptr) { //retries are only allowed if no exception was raised if (!success and !eptr) { ++totalTries_; - // Check if any retry actions are still valid - bool anyRetryAllowed = false; for (const auto& action : retryActions_) { if (action->shouldRetry()) { action->retry(); // Call retry only if shouldRetry_ is true @@ -76,13 +74,11 @@ void SonicClientBase::finish(bool success, std::exception_ptr eptr) { } } //prepare an exception if no more retries left - if (!anyRetryAllowed) { - edm::LogInfo("SonicClientBase") << "SonicCallFailed: call failed, no retry actions available after " - << totalTries_ << " tries."; - edm::Exception ex(edm::errors::ExternalFailure); - ex << "SonicCallFailed: call failed, no retry actions available after " << totalTries_ << " tries."; - eptr = make_exception_ptr(ex); - } + edm::LogInfo("SonicClientBase") << "SonicCallFailed: call failed, no retry actions available after " << totalTries_ + << " tries."; + edm::Exception ex(edm::errors::ExternalFailure); + ex << "SonicCallFailed: call failed, no retry actions available after " << totalTries_ << " tries."; + eptr = make_exception_ptr(ex); } if (holder_) { holder_->doneWaiting(eptr); @@ -103,6 +99,7 @@ void SonicClientBase::fillBasePSetDescription(edm::ParameterSetDescription& desc // Defines the structure of each entry in the VPSet edm::ParameterSetDescription retryDesc; retryDesc.add("retryType", "RetrySameServerAction"); + retryDesc.addUntracked("allowedTries", 0); // Define a default retry action edm::ParameterSet defaultRetry; diff --git a/HeterogeneousCore/SonicCore/test/sonicTest_cfg.py b/HeterogeneousCore/SonicCore/test/sonicTest_cfg.py index 43a183372dc33..bf7b44cb01519 100644 --- a/HeterogeneousCore/SonicCore/test/sonicTest_cfg.py +++ b/HeterogeneousCore/SonicCore/test/sonicTest_cfg.py @@ -19,7 +19,6 @@ process.options.numberOfThreads = 2 process.options.numberOfStreams = 0 - process.dummySync = _moduleClass(_moduleName, input = cms.int32(1), Client = cms.PSet( From ca570d7399b56e07fc538842568ffd8280216500 Mon Sep 17 00:00:00 2001 From: Martin Date: Fri, 11 Apr 2025 15:47:34 -0500 Subject: [PATCH 05/10] Move RetrySameServerAction to plugins. SonicTriton test works. --- HeterogeneousCore/SonicCore/BuildFile.xml | 3 ++- .../SonicCore/plugins/BuildFile.xml | 6 ++++++ .../RetrySameServerAction.cc} | 14 ++++++++++++++ .../SonicCore/src/RetrySameServerAction.cc | 16 ---------------- .../SonicTriton/src/TritonClient.cc | 2 +- .../SonicTriton/test/tritonTest_cfg.py | 7 ++++++- 6 files changed, 29 insertions(+), 19 deletions(-) create mode 100644 HeterogeneousCore/SonicCore/plugins/BuildFile.xml rename HeterogeneousCore/SonicCore/{interface/RetrySameServerAction.h => plugins/RetrySameServerAction.cc} (56%) delete mode 100644 HeterogeneousCore/SonicCore/src/RetrySameServerAction.cc diff --git a/HeterogeneousCore/SonicCore/BuildFile.xml b/HeterogeneousCore/SonicCore/BuildFile.xml index b0d5e2a08b98f..9796c4363c612 100644 --- a/HeterogeneousCore/SonicCore/BuildFile.xml +++ b/HeterogeneousCore/SonicCore/BuildFile.xml @@ -2,7 +2,8 @@ + - +i diff --git a/HeterogeneousCore/SonicCore/plugins/BuildFile.xml b/HeterogeneousCore/SonicCore/plugins/BuildFile.xml new file mode 100644 index 0000000000000..0ecf2187a0f82 --- /dev/null +++ b/HeterogeneousCore/SonicCore/plugins/BuildFile.xml @@ -0,0 +1,6 @@ + + + + + + diff --git a/HeterogeneousCore/SonicCore/interface/RetrySameServerAction.h b/HeterogeneousCore/SonicCore/plugins/RetrySameServerAction.cc similarity index 56% rename from HeterogeneousCore/SonicCore/interface/RetrySameServerAction.h rename to HeterogeneousCore/SonicCore/plugins/RetrySameServerAction.cc index 8ecce2a170847..9877013b93d5b 100644 --- a/HeterogeneousCore/SonicCore/interface/RetrySameServerAction.h +++ b/HeterogeneousCore/SonicCore/plugins/RetrySameServerAction.cc @@ -14,3 +14,17 @@ class RetrySameServerAction : public RetryActionBase { private: unsigned allowedTries_, tries_; }; + +void RetrySameServerAction::retry() { + ++tries_; + //if max retries has not been exceeded, call evaluate again + if (tries_ < allowedTries_) { + eval(); + return; + } else { + shouldRetry_ = false; // Flip flag when max retries are reached + edm::LogInfo("RetrySameServerAction") << "Max retry attempts reached. No further retries."; + } +} + +DEFINE_RETRY_ACTION(RetrySameServerAction) diff --git a/HeterogeneousCore/SonicCore/src/RetrySameServerAction.cc b/HeterogeneousCore/SonicCore/src/RetrySameServerAction.cc deleted file mode 100644 index 31c4fec227500..0000000000000 --- a/HeterogeneousCore/SonicCore/src/RetrySameServerAction.cc +++ /dev/null @@ -1,16 +0,0 @@ -#include "HeterogeneousCore/SonicCore/interface/RetrySameServerAction.h" -#include "HeterogeneousCore/SonicCore/interface/SonicClientBase.h" - -void RetrySameServerAction::retry() { - ++tries_; - //if max retries has not been exceeded, call evaluate again - if (tries_ < allowedTries_) { - eval(); - return; - } else { - shouldRetry_ = false; // Flip flag when max retries are reached - edm::LogInfo("RetrySameServerAction") << "Max retry attempts reached. No further retries."; - } -} - -DEFINE_RETRY_ACTION(RetrySameServerAction) diff --git a/HeterogeneousCore/SonicTriton/src/TritonClient.cc b/HeterogeneousCore/SonicTriton/src/TritonClient.cc index ddcdff83448d0..729b6b74ca8dc 100644 --- a/HeterogeneousCore/SonicTriton/src/TritonClient.cc +++ b/HeterogeneousCore/SonicTriton/src/TritonClient.cc @@ -369,7 +369,7 @@ void TritonClient::getResults(const std::vector //default case for sync and pseudo async void TritonClient::evaluate() { //undo previous signal from TritonException - if (tries_ > 0) { + if (totalTries_ > 0) { // If we are retrying then the evaluate method is called outside the frameworks TBB thread pool. // So we need to setup the service token for the current thread to access the service registry. edm::ServiceRegistry::Operate op(token_); diff --git a/HeterogeneousCore/SonicTriton/test/tritonTest_cfg.py b/HeterogeneousCore/SonicTriton/test/tritonTest_cfg.py index 9cede0e496706..f27d7711665af 100644 --- a/HeterogeneousCore/SonicTriton/test/tritonTest_cfg.py +++ b/HeterogeneousCore/SonicTriton/test/tritonTest_cfg.py @@ -123,9 +123,14 @@ modelVersion = cms.string(""), modelConfigPath = cms.FileInPath("HeterogeneousCore/SonicTriton/data/models/{}/config.pbtxt".format(model)), verbose = cms.untracked.bool(options.verbose or options.verboseClient), - allowedTries = cms.untracked.uint32(options.tries), useSharedMemory = cms.untracked.bool(not options.noShm), compression = cms.untracked.string(options.compression), + Retry = cms.VPSet( + cms.PSet( + retryType = cms.string('RetrySameServerAction'), + allowedTries = cms.untracked.uint32(options.tries) + ) + ) ) ) ) From fe03eb4f5a8cd4dde2b2d672c52ee9e4796b8084 Mon Sep 17 00:00:00 2001 From: Martin Date: Tue, 3 Jun 2025 18:04:44 -0500 Subject: [PATCH 06/10] Add update server function for client --- .../SonicTriton/interface/TritonClient.h | 1 + .../SonicTriton/src/TritonClient.cc | 41 +++++++++++-------- 2 files changed, 25 insertions(+), 17 deletions(-) diff --git a/HeterogeneousCore/SonicTriton/interface/TritonClient.h b/HeterogeneousCore/SonicTriton/interface/TritonClient.h index df8f9b559427c..670e1a750bf0a 100644 --- a/HeterogeneousCore/SonicTriton/interface/TritonClient.h +++ b/HeterogeneousCore/SonicTriton/interface/TritonClient.h @@ -65,6 +65,7 @@ class TritonClient : public SonicClient { bool handle_exception(F&& call); void reportServerSideStats(const ServerSideStats& stats) const; + void updateServer(std::string serverName); ServerSideStats summarizeServerStats(const inference::ModelStatistics& start_status, const inference::ModelStatistics& end_status) const; diff --git a/HeterogeneousCore/SonicTriton/src/TritonClient.cc b/HeterogeneousCore/SonicTriton/src/TritonClient.cc index 729b6b74ca8dc..f232414c1e9e5 100644 --- a/HeterogeneousCore/SonicTriton/src/TritonClient.cc +++ b/HeterogeneousCore/SonicTriton/src/TritonClient.cc @@ -61,7 +61,7 @@ TritonClient::TritonClient(const edm::ParameterSet& params, const std::string& d useSharedMemory_(params.getUntrackedParameter("useSharedMemory")), compressionAlgo_(getCompressionAlgo(params.getUntrackedParameter("compression"))) { options_.emplace_back(params.getParameter("modelName")); - //get appropriate server for this model + edm::Service ts; // We save the token to be able to notify the service in case of an exception in the evaluate method. @@ -70,22 +70,9 @@ TritonClient::TritonClient(const edm::ParameterSet& params, const std::string& d // create the context. token_ = edm::ServiceRegistry::instance().presentToken(); - const auto& server = - ts->serverInfo(options_[0].model_name_, params.getUntrackedParameter("preferredServer")); - serverType_ = server.type; - edm::LogInfo("TritonDiscovery") << debugName_ << " assigned server: " << server.url; - //enforce sync mode for fallback CPU server to avoid contention - //todo: could enforce async mode otherwise (unless mode was specified by user?) - if (serverType_ == TritonServerType::LocalCPU) - setMode(SonicMode::Sync); - isLocal_ = serverType_ == TritonServerType::LocalCPU or serverType_ == TritonServerType::LocalGPU; - - //connect to the server - TRITON_THROW_IF_ERROR( - tc::InferenceServerGrpcClient::Create(&client_, server.url, false, server.useSsl, server.sslOptions), - "TritonClient(): unable to create inference context", - isLocal_); - + //Connect to server + updateServer(params.getUntrackedParameter("preferredServer")); + //set options options_[0].model_version_ = params.getParameter("modelVersion"); options_[0].client_timeout_ = params.getUntrackedParameter("timeout"); @@ -574,6 +561,26 @@ inference::ModelStatistics TritonClient::getServerSideStatus() const { return inference::ModelStatistics{}; } +void TritonClient::updateServer(std::string serverName){ + //get appropriate server for this model + edm::Service ts; + + const auto& server = ts->serverInfo(options_[0].model_name_, serverName); + serverType_ = server.type; + edm::LogInfo("TritonDiscovery") << debugName_ << " assigned server: " << server.url; + //enforce sync mode for fallback CPU server to avoid contention + //todo: could enforce async mode otherwise (unless mode was specified by user?) + if (serverType_ == TritonServerType::LocalCPU) + setMode(SonicMode::Sync); + isLocal_ = serverType_ == TritonServerType::LocalCPU or serverType_ == TritonServerType::LocalGPU; + + //connect to the server + TRITON_THROW_IF_ERROR( + tc::InferenceServerGrpcClient::Create(&client_, server.url, false, server.useSsl, server.sslOptions), + "TritonClient(): unable to create inference context", + isLocal_); +} + //for fillDescriptions void TritonClient::fillPSetDescription(edm::ParameterSetDescription& iDesc) { edm::ParameterSetDescription descClient; From 7586344cb0a35971988efd35f7d0317d378f8507 Mon Sep 17 00:00:00 2001 From: Trevin Lee Date: Mon, 28 Jul 2025 06:26:58 +0200 Subject: [PATCH 07/10] Add test for Triton retry action in BuildFile.xml --- .../interface/RetryActionDiffServer.h | 20 +++++++++ .../SonicTriton/src/RetryActionDiffServer.cc | 12 +++++ .../SonicTriton/test/BuildFile.xml | 1 + .../test/tritonRetryActionTest_cfg.py | 44 +++++++++++++++++++ 4 files changed, 77 insertions(+) create mode 100644 HeterogeneousCore/SonicTriton/interface/RetryActionDiffServer.h create mode 100644 HeterogeneousCore/SonicTriton/src/RetryActionDiffServer.cc create mode 100644 HeterogeneousCore/SonicTriton/test/tritonRetryActionTest_cfg.py diff --git a/HeterogeneousCore/SonicTriton/interface/RetryActionDiffServer.h b/HeterogeneousCore/SonicTriton/interface/RetryActionDiffServer.h new file mode 100644 index 0000000000000..7fd5050762252 --- /dev/null +++ b/HeterogeneousCore/SonicTriton/interface/RetryActionDiffServer.h @@ -0,0 +1,20 @@ +#ifndef HeterogeneousCore_SonicTriton_RetryActionDiffServer_h +#define HeterogeneousCore_SonicTriton_RetryActionDiffServer_h + +#include "HeterogeneousCore/SonicCore/interface/RetryActionBase.h" + +class RetryActionDiffServer : public RetryActionBase { +public: + RetryActionDiffServer(const edm::ParameterSet& conf, SonicClientBase* client); + ~RetryActionDiffServer() override = default; + + void retry() override; + void start() override; + +private: + std::string diff_server_url_; + std::string diff_server_token_; +}; + +#endif + diff --git a/HeterogeneousCore/SonicTriton/src/RetryActionDiffServer.cc b/HeterogeneousCore/SonicTriton/src/RetryActionDiffServer.cc new file mode 100644 index 0000000000000..1056490b262d6 --- /dev/null +++ b/HeterogeneousCore/SonicTriton/src/RetryActionDiffServer.cc @@ -0,0 +1,12 @@ +#include "HeterogeneousCore/SonicTriton/interface/RetryActionDiffServer.h" + +RetryActionDiffServer::RetryActionDiffServer(const edm::ParameterSet& conf, SonicClientBase* client) + : RetryActionBase(conf, client) {} + +void RetryActionDiffServer::start() { + // to-do +} + +void RetryActionDiffServer::retry() { + // to-do +} \ No newline at end of file diff --git a/HeterogeneousCore/SonicTriton/test/BuildFile.xml b/HeterogeneousCore/SonicTriton/test/BuildFile.xml index e4ff7a0bb56f3..25cf78abe2664 100644 --- a/HeterogeneousCore/SonicTriton/test/BuildFile.xml +++ b/HeterogeneousCore/SonicTriton/test/BuildFile.xml @@ -1,5 +1,6 @@ + diff --git a/HeterogeneousCore/SonicTriton/test/tritonRetryActionTest_cfg.py b/HeterogeneousCore/SonicTriton/test/tritonRetryActionTest_cfg.py new file mode 100644 index 0000000000000..f07461f4cad9b --- /dev/null +++ b/HeterogeneousCore/SonicTriton/test/tritonRetryActionTest_cfg.py @@ -0,0 +1,44 @@ +import FWCore.ParameterSet.Config as cms +import os, sys, json +from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter +from Configuration.ProcessModifiers.enableSonicTriton_cff import enableSonicTriton + +process = cms.Process('tritonTest', enableSonicTriton) + +process.load("HeterogeneousCore.SonicTriton.TritonService_cff") + +process.maxEvents = cms.untracked.PSet(input=cms.untracked.int32(10)) + +process.source = cms.Source("EmptySource") + +process.myProducer = cms.EDProducer("TritonGraphProducer", + # minimal inputs for testing + nodeMin = cms.uint32(1), + nodeMax = cms.uint32(10), + edgeMin = cms.uint32(20), + edgeMax = cms.uint32(40), + # client setup + Client = cms.PSet( + # This address is fake to force an error + address = cms.string("localhost:9999"), + mode = cms.string("Sync"), + # This is your retry logic + Retry = cms.VPSet( + cms.PSet( + retryType = cms.string("RetryActionDiffServer"), + # The address of the real server will be filled in by the TritonService + diff_server_url = cms.string(""), + diff_server_token = cms.string("") + ) + ) + ) +) + +process.p = cms.Path(process.myProducer) + +process.load('FWCore/MessageService/MessageLogger_cfi') +process.MessageLogger.cerr.FwkReport.reportEvery = 1 +# enable verbose output for everything +process.MessageLogger.cerr.default = cms.untracked.PSet( + limit = cms.untracked.int32(10000000) +) \ No newline at end of file From ab9b0982d83a9c79b03fb881f59825617e55c449 Mon Sep 17 00:00:00 2001 From: Trevin Lee Date: Mon, 4 Aug 2025 07:28:31 +0200 Subject: [PATCH 08/10] Implement retry logic in RetryActionDiffServer and add connectToServer method in TritonClient. Update BuildFile.xml and fix formatting in header files. --- HeterogeneousCore/SonicCore/BuildFile.xml | 2 +- .../interface/RetryActionDiffServer.h | 2 +- .../SonicTriton/interface/TritonClient.h | 1 + .../SonicTriton/src/RetryActionDiffServer.cc | 44 ++++++++++++++++--- .../SonicTriton/src/TritonClient.cc | 19 ++++++++ 5 files changed, 61 insertions(+), 7 deletions(-) diff --git a/HeterogeneousCore/SonicCore/BuildFile.xml b/HeterogeneousCore/SonicCore/BuildFile.xml index 9796c4363c612..5208c91638f37 100644 --- a/HeterogeneousCore/SonicCore/BuildFile.xml +++ b/HeterogeneousCore/SonicCore/BuildFile.xml @@ -6,4 +6,4 @@ -i + diff --git a/HeterogeneousCore/SonicTriton/interface/RetryActionDiffServer.h b/HeterogeneousCore/SonicTriton/interface/RetryActionDiffServer.h index 7fd5050762252..7a7abfe84db5d 100644 --- a/HeterogeneousCore/SonicTriton/interface/RetryActionDiffServer.h +++ b/HeterogeneousCore/SonicTriton/interface/RetryActionDiffServer.h @@ -14,7 +14,7 @@ class RetryActionDiffServer : public RetryActionBase { private: std::string diff_server_url_; std::string diff_server_token_; -}; +}; #endif diff --git a/HeterogeneousCore/SonicTriton/interface/TritonClient.h b/HeterogeneousCore/SonicTriton/interface/TritonClient.h index 670e1a750bf0a..93d41568a853c 100644 --- a/HeterogeneousCore/SonicTriton/interface/TritonClient.h +++ b/HeterogeneousCore/SonicTriton/interface/TritonClient.h @@ -50,6 +50,7 @@ class TritonClient : public SonicClient { void reset() override; TritonServerType serverType() const { return serverType_; } bool isLocal() const { return isLocal_; } + void connectToServer(const std::string& url); //for fillDescriptions static void fillPSetDescription(edm::ParameterSetDescription& iDesc); diff --git a/HeterogeneousCore/SonicTriton/src/RetryActionDiffServer.cc b/HeterogeneousCore/SonicTriton/src/RetryActionDiffServer.cc index 1056490b262d6..c92310e189abb 100644 --- a/HeterogeneousCore/SonicTriton/src/RetryActionDiffServer.cc +++ b/HeterogeneousCore/SonicTriton/src/RetryActionDiffServer.cc @@ -1,12 +1,46 @@ #include "HeterogeneousCore/SonicTriton/interface/RetryActionDiffServer.h" +#include "HeterogeneousCore/SonicTriton/interface/TritonClient.h" +#include "FWCore/MessageLogger/interface/MessageLogger.h" -RetryActionDiffServer::RetryActionDiffServer(const edm::ParameterSet& conf, SonicClientBase* client) - : RetryActionBase(conf, client) {} +RetryActionDiffServer::RetryActionDiffServer( const edm::ParameterSet& conf, SonicClientBase* client) +: RetryActionBase(conf, client), + diff_server_url_(conf.getUntrackedParameter("diffServerUrl", "")), + diff_server_token_(conf.getUntrackedParameter("diffServerToken", "")) +{ + if (this->diff_server_url_.empty()) { + edm::LogWarning("RetryActionDiffServer") << "No alternative server URL provided. This retry action will be disabled."; + this->shouldRetry_ = false; + } +} void RetryActionDiffServer::start() { - // to-do + this->shouldRetry_ = true; } void RetryActionDiffServer::retry() { - // to-do -} \ No newline at end of file + if (!this->shouldRetry_ || this->diff_server_url_.empty()) { + this->shouldRetry_ = false; + edm::LogInfo("RetryActionDiffServer") << "No alternative server available for retry."; + return; + } + + try { + TritonClient* tritonClient = static_cast(client_); + + edm::LogInfo("RetryActionDiffServer") + << "Attempting retry by switching to server: " + << this->diff_server_url_; + + tritonClient->connectToServer(this->diff_server_url_); + eval(); + + } catch (const std::exception& e) { + edm::LogError("RetryActionDiffServer") + << "Failed to retry with alternative server: " + << e.what(); + } + + this->shouldRetry_ = false; +} + +DEFINE_RETRY_ACTION(RetryActionDiffServer); \ No newline at end of file diff --git a/HeterogeneousCore/SonicTriton/src/TritonClient.cc b/HeterogeneousCore/SonicTriton/src/TritonClient.cc index f232414c1e9e5..5f6d4ad48ad76 100644 --- a/HeterogeneousCore/SonicTriton/src/TritonClient.cc +++ b/HeterogeneousCore/SonicTriton/src/TritonClient.cc @@ -598,3 +598,22 @@ void TritonClient::fillPSetDescription(edm::ParameterSetDescription& iDesc) { descClient.addUntracked>("outputs", {}); iDesc.add("Client", descClient); } + +void TritonClient::connectToServer(const std::string& url) { + // Update client state for a generic remote server + serverType_ = TritonServerType::Remote; + isLocal_ = false; + + edm::LogInfo("TritonDiscovery") << debugName_ << " connecting to server: " << url; + + // Use default SSL options + triton::client::SslOptions sslOptions; + bool useSsl = false; // Assuming no SSL for direct URL connection + + // Connect to the server + TRITON_THROW_IF_ERROR( + triton::client::InferenceServerGrpcClient::Create(&client_, url, false, useSsl, sslOptions), + "TritonClient::connectToServer(): unable to create inference context", + false // isLocal is false + ); +} From 5b2c3c38c5594cadf23ccc872a592afb2b491a2c Mon Sep 17 00:00:00 2001 From: Trevin Lee Date: Mon, 4 Aug 2025 08:12:40 +0200 Subject: [PATCH 09/10] Add RetryActionDiffServer class documentation, implement testing constructor for TritonClient, and update BuildFile.xml to include Catch2 for testing. --- HeterogeneousCore/SonicTriton/BuildFile.xml | 6 +- .../interface/RetryActionDiffServer.h | 13 ++ .../SonicTriton/interface/TritonClient.h | 221 +++++++++--------- .../SonicTriton/src/TritonClient.cc | 4 + .../SonicTriton/test/RetryActionDiffServer.cc | 111 +++++++++ 5 files changed, 250 insertions(+), 105 deletions(-) create mode 100644 HeterogeneousCore/SonicTriton/test/RetryActionDiffServer.cc diff --git a/HeterogeneousCore/SonicTriton/BuildFile.xml b/HeterogeneousCore/SonicTriton/BuildFile.xml index b93d51e711e87..0f6e5f6bd24a6 100644 --- a/HeterogeneousCore/SonicTriton/BuildFile.xml +++ b/HeterogeneousCore/SonicTriton/BuildFile.xml @@ -7,9 +7,13 @@ + + - + + + diff --git a/HeterogeneousCore/SonicTriton/interface/RetryActionDiffServer.h b/HeterogeneousCore/SonicTriton/interface/RetryActionDiffServer.h index 7a7abfe84db5d..2b24d4a643786 100644 --- a/HeterogeneousCore/SonicTriton/interface/RetryActionDiffServer.h +++ b/HeterogeneousCore/SonicTriton/interface/RetryActionDiffServer.h @@ -3,6 +3,19 @@ #include "HeterogeneousCore/SonicCore/interface/RetryActionBase.h" +/** + * @class RetryActionDiffServer + * @brief A concrete implementation of RetryActionBase that attempts to retry an inference + * request on a different, user-specified Triton server. + * + * This class is designed to provide a fallback mechanism. If an initial inference + * request fails (e.g., due to server unavailability or a model-specific error), + * this action will be triggered. It reads an alternative server URL from the + * ParameterSet and instructs the TritonClient to reconnect to this new server + * for the retry attempt. This action is designed for one-time use per inference + * call; after the retry attempt, it disables itself until the next `start()` call. + */ + class RetryActionDiffServer : public RetryActionBase { public: RetryActionDiffServer(const edm::ParameterSet& conf, SonicClientBase* client); diff --git a/HeterogeneousCore/SonicTriton/interface/TritonClient.h b/HeterogeneousCore/SonicTriton/interface/TritonClient.h index 93d41568a853c..bb5cdab4164a8 100644 --- a/HeterogeneousCore/SonicTriton/interface/TritonClient.h +++ b/HeterogeneousCore/SonicTriton/interface/TritonClient.h @@ -1,104 +1,117 @@ -#ifndef HeterogeneousCore_SonicTriton_TritonClient -#define HeterogeneousCore_SonicTriton_TritonClient - -#include "FWCore/ParameterSet/interface/ParameterSet.h" -#include "FWCore/ParameterSet/interface/ParameterSetDescription.h" -#include "FWCore/ServiceRegistry/interface/ServiceToken.h" -#include "HeterogeneousCore/SonicCore/interface/SonicClient.h" -#include "HeterogeneousCore/SonicTriton/interface/TritonData.h" -#include "HeterogeneousCore/SonicTriton/interface/TritonService.h" - -#include -#include -#include -#include -#include - -#include "grpc_client.h" -#include "grpc_service.pb.h" - -enum class TritonBatchMode { Rectangular = 1, Ragged = 2 }; - -class TritonClient : public SonicClient { -public: - struct ServerSideStats { - uint64_t inference_count_; - uint64_t execution_count_; - uint64_t success_count_; - uint64_t cumm_time_ns_; - uint64_t queue_time_ns_; - uint64_t compute_input_time_ns_; - uint64_t compute_infer_time_ns_; - uint64_t compute_output_time_ns_; - }; - - //constructor - TritonClient(const edm::ParameterSet& params, const std::string& debugName); - - //destructor - ~TritonClient() override; - - //accessors - unsigned batchSize() const; - TritonBatchMode batchMode() const { return batchMode_; } - bool verbose() const { return verbose_; } - bool useSharedMemory() const { return useSharedMemory_; } - void setUseSharedMemory(bool useShm) { useSharedMemory_ = useShm; } - bool setBatchSize(unsigned bsize); - void setBatchMode(TritonBatchMode batchMode); - void resetBatchMode(); - void reset() override; - TritonServerType serverType() const { return serverType_; } - bool isLocal() const { return isLocal_; } - void connectToServer(const std::string& url); - - //for fillDescriptions - static void fillPSetDescription(edm::ParameterSetDescription& iDesc); - -protected: - //helpers - bool noOuterDim() const { return noOuterDim_; } - unsigned outerDim() const { return outerDim_; } - unsigned nEntries() const; - void getResults(const std::vector>& results); - void evaluate() override; - template - bool handle_exception(F&& call); - - void reportServerSideStats(const ServerSideStats& stats) const; - void updateServer(std::string serverName); - ServerSideStats summarizeServerStats(const inference::ModelStatistics& start_status, - const inference::ModelStatistics& end_status) const; - - inference::ModelStatistics getServerSideStatus() const; - - //members - unsigned maxOuterDim_; - unsigned outerDim_; - bool noOuterDim_; - unsigned nEntries_; - TritonBatchMode batchMode_; - bool manualBatchMode_; - bool verbose_; - bool useSharedMemory_; - TritonServerType serverType_; - bool isLocal_; - grpc_compression_algorithm compressionAlgo_; - triton::client::Headers headers_; - - std::unique_ptr client_; - //stores timeout, model name and version - std::vector options_; - edm::ServiceToken token_; - -private: - friend TritonInputData; - friend TritonOutputData; - - //private accessors only used by data - auto client() { return client_.get(); } - void addEntry(unsigned entry); - void resizeEntries(unsigned entry); -}; - -#endif +#ifndef HeterogeneousCore_SonicTriton_TritonClient +#define HeterogeneousCore_SonicTriton_TritonClient + +#include "FWCore/ParameterSet/interface/ParameterSet.h" +#include "FWCore/ParameterSet/interface/ParameterSetDescription.h" +#include "FWCore/ServiceRegistry/interface/ServiceToken.h" +#include "HeterogeneousCore/SonicCore/interface/SonicClient.h" +#include "HeterogeneousCore/SonicTriton/interface/TritonData.h" +#include "HeterogeneousCore/SonicTriton/interface/TritonService.h" + +#include +#include +#include +#include +#include + +#include "grpc_client.h" +#include "grpc_service.pb.h" + +enum class TritonBatchMode { Rectangular = 1, Ragged = 2 }; + +class TritonClient : public SonicClient { +public: + struct ServerSideStats { + uint64_t inference_count_; + uint64_t execution_count_; + uint64_t success_count_; + uint64_t cumm_time_ns_; + uint64_t queue_time_ns_; + uint64_t compute_input_time_ns_; + uint64_t compute_infer_time_ns_; + uint64_t compute_output_time_ns_; + }; + + //constructor + TritonClient(const edm::ParameterSet& params, const std::string& debugName); + + //destructor + ~TritonClient() override; + + //accessors + unsigned batchSize() const; + TritonBatchMode batchMode() const { return batchMode_; } + bool verbose() const { return verbose_; } + bool useSharedMemory() const { return useSharedMemory_; } + void setUseSharedMemory(bool useShm) { useSharedMemory_ = useShm; } + bool setBatchSize(unsigned bsize); + void setBatchMode(TritonBatchMode batchMode); + void resetBatchMode(); + void reset() override; + TritonServerType serverType() const { return serverType_; } + bool isLocal() const { return isLocal_; } + virtual void connectToServer(const std::string& url); + + //for fillDescriptions + static void fillPSetDescription(edm::ParameterSetDescription& iDesc); + +protected: + /** + * @brief Constructor for unit testing purposes only. + * + * This constructor is provided to allow the creation of a TritonClient + * instance (or a mock derived from it) without needing the full CMSSW + * Service framework, which is required by the standard constructor. + * This is essential for writing isolated unit tests that do not depend + * on external services. It initializes the base SonicClient with dummy + * parameters. + * @param is_testing A boolean flag to select this constructor. + */ + TritonClient(bool is_testing); + + //helpers + bool noOuterDim() const { return noOuterDim_; } + unsigned outerDim() const { return outerDim_; } + unsigned nEntries() const; + void getResults(const std::vector>& results); + virtual void evaluate() override; + template + bool handle_exception(F&& call); + + void reportServerSideStats(const ServerSideStats& stats) const; + void updateServer(std::string serverName); + ServerSideStats summarizeServerStats(const inference::ModelStatistics& start_status, + const inference::ModelStatistics& end_status) const; + + inference::ModelStatistics getServerSideStatus() const; + + //members + unsigned maxOuterDim_; + unsigned outerDim_; + bool noOuterDim_; + unsigned nEntries_; + TritonBatchMode batchMode_; + bool manualBatchMode_; + bool verbose_; + bool useSharedMemory_; + TritonServerType serverType_; + bool isLocal_; + grpc_compression_algorithm compressionAlgo_; + triton::client::Headers headers_; + + std::unique_ptr client_; + //stores timeout, model name and version + std::vector options_; + edm::ServiceToken token_; + +private: + friend TritonInputData; + friend TritonOutputData; + + //private accessors only used by data + auto client() { return client_.get(); } + void addEntry(unsigned entry); + void resizeEntries(unsigned entry); +}; + +#endif diff --git a/HeterogeneousCore/SonicTriton/src/TritonClient.cc b/HeterogeneousCore/SonicTriton/src/TritonClient.cc index 5f6d4ad48ad76..a112896e33026 100644 --- a/HeterogeneousCore/SonicTriton/src/TritonClient.cc +++ b/HeterogeneousCore/SonicTriton/src/TritonClient.cc @@ -617,3 +617,7 @@ void TritonClient::connectToServer(const std::string& url) { false // isLocal is false ); } + +//constructor for testing +TritonClient::TritonClient(bool /*is_testing*/) : SonicClient(edm::ParameterSet(), "TritonClient_test", "TritonClient") {} + diff --git a/HeterogeneousCore/SonicTriton/test/RetryActionDiffServer.cc b/HeterogeneousCore/SonicTriton/test/RetryActionDiffServer.cc new file mode 100644 index 0000000000000..6c429a95a9816 --- /dev/null +++ b/HeterogeneousCore/SonicTriton/test/RetryActionDiffServer.cc @@ -0,0 +1,111 @@ +#define CATCH_CONFIG_MAIN +#include "catch.hpp" + +#include "FWCore/ParameterSet/interface/ParameterSet.h" +#include "HeterogeneousCore/SonicTriton/interface/TritonClient.h" +#include "HeterogeneousCore/SonicTriton/interface/RetryActionDiffServer.h" + +#include + +// Anonymous namespace to hold our mock object, keeping it local to this test file. +namespace { + // Mock TritonClient to intercept and verify method calls without needing a real server or CMSSW services. + class MockTritonClient : public TritonClient { + public: + // Use the protected, testing-only constructor from the base class. + MockTritonClient() : TritonClient(true) {} + + // --- Methods to override for testing --- + void evaluate() override { + // This method is called by RetryActionBase::eval() + // We can leave it empty as the test directly calls retry(). + } + + void connectToServer(const std::string& url) override { + connectToServer_called_ = true; + last_url_ = url; + } + + // --- Test utility methods --- + bool connectToServerCalled() const { return connectToServer_called_; } + const std::string& getLastUrl() const { return last_url_; } + void reset() { + connectToServer_called_ = false; + last_url_ = ""; + } + + private: + bool connectToServer_called_ = false; + std::string last_url_; + }; +} + +TEST_CASE("Test RetryActionDiffServer Logic", "[RetryActionDiffServer]") { + + // 1. Create the mock client object. + MockTritonClient mockClient; + + // 2. Create the ParameterSet that configures the retry action. + edm::ParameterSet retryPSet; + const std::string alternate_server = "grpc://new-server-for-retry.com:8001"; + retryPSet.addUntrackedParameter("diffServerUrl", alternate_server); + + // 3. Create an instance of the class we are testing. + RetryActionDiffServer retryAction(retryPSet, &mockClient); + + SECTION("Retry calls connectToServer with the correct URL") { + // ARRANGE: Reset state before the test. + mockClient.reset(); + retryAction.start(); // Arms the action, setting shouldRetry_ = true + + // ACT: Manually call the retry method to simulate a failure event. + retryAction.retry(); + + // ASSERT: Verify that our mock's overridden method was called with the expected arguments. + REQUIRE(mockClient.connectToServerCalled()); + REQUIRE(mockClient.getLastUrl() == alternate_server); + } + + SECTION("Retry action is a one-shot") { + // ARRANGE + mockClient.reset(); + retryAction.start(); + + // ACT + retryAction.retry(); // First retry, should work. + + // After the first retry, the internal `shouldRetry_` flag should be false. + // A second call to retry() should do nothing. + // We can verify this by checking that connectToServer was not called a second time. + mockClient.reset(); // Reset our trackers. + retryAction.retry(); // Second retry, should fail silently. + + // ASSERT + REQUIRE_FALSE(mockClient.connectToServerCalled()); + } + + SECTION("Start method re-arms the action") { + // ARRANGE + mockClient.reset(); + retryAction.start(); + retryAction.retry(); // Use up the action. + REQUIRE_FALSE(retryAction.shouldRetry()); // Verify it's spent. + + // ACT: A new inference call begins, so `start()` is called again. + retryAction.start(); + + // ASSERT: The action should now be ready for another retry. + REQUIRE(retryAction.shouldRetry()); + } + + SECTION("Constructor disables action if URL is missing") { + // ARRANGE: Create a PSet with no URL. + edm::ParameterSet emptyPSet; + + // ACT + RetryActionDiffServer disabledAction(emptyPSet, &mockClient); + + // ASSERT + REQUIRE_FALSE(disabledAction.shouldRetry()); + } +} From 0212e703576691af1759b503b63a38c88ce3737d Mon Sep 17 00:00:00 2001 From: Trevin Lee Date: Tue, 12 Aug 2025 01:30:49 +0200 Subject: [PATCH 10/10] SonicTriton: implement retry action against different server; update tests; remove old cfg --- .../interface/RetryActionDiffServer.h | 4 +- .../SonicTriton/src/RetryActionDiffServer.cc | 32 +++++++------- .../SonicTriton/test/RetryActionDiffServer.cc | 2 - .../test/tritonRetryActionTest_cfg.py | 44 ------------------- 4 files changed, 18 insertions(+), 64 deletions(-) delete mode 100644 HeterogeneousCore/SonicTriton/test/tritonRetryActionTest_cfg.py diff --git a/HeterogeneousCore/SonicTriton/interface/RetryActionDiffServer.h b/HeterogeneousCore/SonicTriton/interface/RetryActionDiffServer.h index 2b24d4a643786..d823d78d3a960 100644 --- a/HeterogeneousCore/SonicTriton/interface/RetryActionDiffServer.h +++ b/HeterogeneousCore/SonicTriton/interface/RetryActionDiffServer.h @@ -25,8 +25,8 @@ class RetryActionDiffServer : public RetryActionBase { void start() override; private: - std::string diff_server_url_; - std::string diff_server_token_; + std::string alt_server_url_; + std::string alt_server_token_; }; #endif diff --git a/HeterogeneousCore/SonicTriton/src/RetryActionDiffServer.cc b/HeterogeneousCore/SonicTriton/src/RetryActionDiffServer.cc index c92310e189abb..f516289e655da 100644 --- a/HeterogeneousCore/SonicTriton/src/RetryActionDiffServer.cc +++ b/HeterogeneousCore/SonicTriton/src/RetryActionDiffServer.cc @@ -2,15 +2,19 @@ #include "HeterogeneousCore/SonicTriton/interface/TritonClient.h" #include "FWCore/MessageLogger/interface/MessageLogger.h" -RetryActionDiffServer::RetryActionDiffServer( const edm::ParameterSet& conf, SonicClientBase* client) -: RetryActionBase(conf, client), - diff_server_url_(conf.getUntrackedParameter("diffServerUrl", "")), - diff_server_token_(conf.getUntrackedParameter("diffServerToken", "")) -{ - if (this->diff_server_url_.empty()) { - edm::LogWarning("RetryActionDiffServer") << "No alternative server URL provided. This retry action will be disabled."; - this->shouldRetry_ = false; - } +RetryActionDiffServer::RetryActionDiffServer( + const edm::ParameterSet& conf, + SonicClientBase* client +): RetryActionBase(conf, client) { + alt_server_url_ = conf.getUntrackedParameter("altServerUrl", ""); + alt_server_token_ = conf.getUntrackedParameter("altServerToken", ""); + + if (this->alt_server_url_.empty()) { + edm::LogWarning("RetryActionDiffServer") + << "No alternative server URL provided. " + << "This retry action will be disabled."; + this->shouldRetry_ = false; + } } void RetryActionDiffServer::start() { @@ -18,7 +22,7 @@ void RetryActionDiffServer::start() { } void RetryActionDiffServer::retry() { - if (!this->shouldRetry_ || this->diff_server_url_.empty()) { + if (!this->shouldRetry_ || this->alt_server_url_.empty()) { this->shouldRetry_ = false; edm::LogInfo("RetryActionDiffServer") << "No alternative server available for retry."; return; @@ -26,20 +30,16 @@ void RetryActionDiffServer::retry() { try { TritonClient* tritonClient = static_cast(client_); - edm::LogInfo("RetryActionDiffServer") << "Attempting retry by switching to server: " - << this->diff_server_url_; - - tritonClient->connectToServer(this->diff_server_url_); + << this->alt_server_url_; + tritonClient->connectToServer(this->alt_server_url_); eval(); - } catch (const std::exception& e) { edm::LogError("RetryActionDiffServer") << "Failed to retry with alternative server: " << e.what(); } - this->shouldRetry_ = false; } diff --git a/HeterogeneousCore/SonicTriton/test/RetryActionDiffServer.cc b/HeterogeneousCore/SonicTriton/test/RetryActionDiffServer.cc index 6c429a95a9816..2648abfc202d2 100644 --- a/HeterogeneousCore/SonicTriton/test/RetryActionDiffServer.cc +++ b/HeterogeneousCore/SonicTriton/test/RetryActionDiffServer.cc @@ -7,9 +7,7 @@ #include -// Anonymous namespace to hold our mock object, keeping it local to this test file. namespace { - // Mock TritonClient to intercept and verify method calls without needing a real server or CMSSW services. class MockTritonClient : public TritonClient { public: // Use the protected, testing-only constructor from the base class. diff --git a/HeterogeneousCore/SonicTriton/test/tritonRetryActionTest_cfg.py b/HeterogeneousCore/SonicTriton/test/tritonRetryActionTest_cfg.py deleted file mode 100644 index f07461f4cad9b..0000000000000 --- a/HeterogeneousCore/SonicTriton/test/tritonRetryActionTest_cfg.py +++ /dev/null @@ -1,44 +0,0 @@ -import FWCore.ParameterSet.Config as cms -import os, sys, json -from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter -from Configuration.ProcessModifiers.enableSonicTriton_cff import enableSonicTriton - -process = cms.Process('tritonTest', enableSonicTriton) - -process.load("HeterogeneousCore.SonicTriton.TritonService_cff") - -process.maxEvents = cms.untracked.PSet(input=cms.untracked.int32(10)) - -process.source = cms.Source("EmptySource") - -process.myProducer = cms.EDProducer("TritonGraphProducer", - # minimal inputs for testing - nodeMin = cms.uint32(1), - nodeMax = cms.uint32(10), - edgeMin = cms.uint32(20), - edgeMax = cms.uint32(40), - # client setup - Client = cms.PSet( - # This address is fake to force an error - address = cms.string("localhost:9999"), - mode = cms.string("Sync"), - # This is your retry logic - Retry = cms.VPSet( - cms.PSet( - retryType = cms.string("RetryActionDiffServer"), - # The address of the real server will be filled in by the TritonService - diff_server_url = cms.string(""), - diff_server_token = cms.string("") - ) - ) - ) -) - -process.p = cms.Path(process.myProducer) - -process.load('FWCore/MessageService/MessageLogger_cfi') -process.MessageLogger.cerr.FwkReport.reportEvery = 1 -# enable verbose output for everything -process.MessageLogger.cerr.default = cms.untracked.PSet( - limit = cms.untracked.int32(10000000) -) \ No newline at end of file