Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tickets/DM-43715 #894

Draft
wants to merge 59 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
c3f20e8
Extended transient API of QMeta to read workers-to-chunks map from da…
iagaponenko Apr 5, 2024
a509b2c
Extended Replication Controller to update chunk map at Czar
iagaponenko Apr 5, 2024
c17c2d9
Added code to read chunk disposition map and organize for czar use.
jgates108 Mar 22, 2024
0e59a94
Added unit test.
jgates108 Mar 26, 2024
ee182f1
Made failure to create CzarChunkMaps a non-fatal error.
jgates108 Mar 27, 2024
1eb428a
Changed code to use QMeta::ChunkMap instead of json object.
jgates108 Apr 8, 2024
a06672c
Changes for review.
jgates108 Apr 9, 2024
2bd7c23
Added a basic test, which isn't working.
jgates108 Apr 9, 2024
8fefc59
Added CzarRegistry.
jgates108 Apr 15, 2024
196196a
Review changes.
jgates108 Apr 15, 2024
6e79737
Review changes.
jgates108 Apr 15, 2024
eaa7a33
Reformat.
jgates108 Jul 30, 2024
7c60f88
Added UberJob branch code.
jgates108 Apr 26, 2024
51253b6
Czar and workers can send http messages to each other.
jgates108 May 16, 2024
ba8ef43
Added new create tasks.
jgates108 May 28, 2024
b086f6b
Some integration test success.
jgates108 Jun 5, 2024
c70754b
Added some error handling.
jgates108 Jun 21, 2024
0760e4c
Errors reported correctly.
jgates108 Jun 27, 2024
61ec83d
Fails appropriately from max UberJob attempts.
jgates108 Jun 29, 2024
555119c
Fixed some issues with retries.
jgates108 Jul 2, 2024
5dfbb13
Integration tests past.
jgates108 Jul 10, 2024
418baee
Reformatted.
jgates108 Jul 10, 2024
aef42ed
Code cleanup.
jgates108 Jul 10, 2024
8919108
CzarFamilyMap create now waits for a usccessful read.
jgates108 Jul 22, 2024
c391a7c
Changed version numer to 35.
jgates108 Jul 23, 2024
7f40edc
Changed comments.
jgates108 Jul 30, 2024
41a8814
Rebase.
jgates108 Jul 30, 2024
35206c7
clang-format
fritzm Nov 26, 2024
16bd636
Removed QueryRequest and XrdSsiMocks.
jgates108 Aug 1, 2024
3359049
Removed unnecessary code.
jgates108 Aug 2, 2024
1874de6
Added ActiveWorker.
jgates108 Aug 6, 2024
6cc4ee3
Added unit test for query status message.
jgates108 Aug 30, 2024
6cc0484
Added cancellation code and for queries, uberjobs, and czar restart.
jgates108 Sep 3, 2024
845c871
More cancellation code added.
jgates108 Sep 10, 2024
e68750d
Added query retries.
jgates108 Sep 19, 2024
260e8a4
Added worker believed czar was dead handling.
jgates108 Oct 1, 2024
fc27d5a
Added dead message handling.
jgates108 Oct 4, 2024
408fc2c
Fixed problems with rowlimit and WorkerCzarComIssue.
jgates108 Oct 15, 2024
03dba84
Rebase.
jgates108 Oct 18, 2024
da56112
Added comments and removed dead code.
jgates108 Oct 21, 2024
0ccb646
Fixed dead worker check.
jgates108 Oct 23, 2024
dc1b36f
Created protojson namespace.
jgates108 Nov 18, 2024
37eed97
clang-format
fritzm Nov 26, 2024
b03c531
Added unit test.
jgates108 Nov 22, 2024
1df6bfd
Reworked the UberJob json message.
jgates108 Dec 5, 2024
541bef5
Enabled chunk Id replacement, and added connection pools.
jgates108 Dec 6, 2024
58dddd0
Rearranged UberJob building and removed chunkResultName.
jgates108 Dec 13, 2024
e95da66
Removed TaskMsgFactory.
jgates108 Dec 16, 2024
7e373f6
Changed Czar to catch 5GB limit.
jgates108 Dec 18, 2024
b49a978
Improved Job creation performance.
jgates108 Jan 9, 2025
b71af26
Contention testing.
jgates108 Jan 22, 2025
4b6534c
The blocking version of the FQDN retrieval function
jgates108 Feb 3, 2025
6306093
Reformated.
jgates108 Feb 3, 2025
e53e5a7
Fixed uninitialized variable.
jgates108 Feb 6, 2025
fddadc1
Message cleanup.
jgates108 Feb 8, 2025
ecdff0f
Improved UberJobFile recovery after file collection error.
jgates108 Feb 11, 2025
67df437
Added family map option to not use chunk size for distribution.
jgates108 Feb 12, 2025
7694472
Post-rebase corrections
iagaponenko Feb 21, 2025
36667f6
Configuration improvements.
jgates108 Feb 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions admin/local/docker/compose/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ services:
--registry-host=repl-registry
--controller-auto-register-workers=1
--qserv-sync-force
--qserv-chunk-map-update
--debug
expose:
- "25081"
Expand Down
4 changes: 2 additions & 2 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ add_subdirectory(mysql)
add_subdirectory(parser)
add_subdirectory(partition)
add_subdirectory(proto)
add_subdirectory(protojson)
add_subdirectory(proxy)
add_subdirectory(qana)
add_subdirectory(qdisp)
Expand All @@ -89,7 +90,6 @@ add_subdirectory(wpublish)
add_subdirectory(wsched)
add_subdirectory(www)
add_subdirectory(xrdlog)
add_subdirectory(xrdreq)
add_subdirectory(xrdsvc)

#-----------------------------------------------------------------------------
Expand All @@ -103,6 +103,7 @@ target_link_libraries(qserv_common PUBLIC
mysql
sql
util
protojson
)

install(
Expand Down Expand Up @@ -143,7 +144,6 @@ target_link_libraries(qserv_czar PUBLIC
rproc
qserv_css
qserv_meta
xrdreq
)

install(
Expand Down
1 change: 1 addition & 0 deletions src/admin/python/lsst/qserv/admin/itest.py
Original file line number Diff line number Diff line change
Expand Up @@ -978,6 +978,7 @@ def compareQueryResults(run_cases: List[str], outputs_dir: str) -> List[ITestCas
if not os.path.exists(os.path.join(outputs_dir, case)):
_log.warn("There are no query results to compare for %s", case)
continue

comparisons = (
(query_mode_mysql, query_mode_qserv_attached),
(query_mode_mysql, query_mode_qserv_detached),
Expand Down
2 changes: 2 additions & 0 deletions src/admin/python/lsst/qserv/admin/qservCli/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,9 @@ def cmake(
build_image,
"cmake",
"..",
"-DCMAKE_BUILD_TYPE=Debug"
]
# "-DCMAKE_BUILD_TYPE=Debug"
if dry:
print(" ".join(args))
return
Expand Down
2 changes: 0 additions & 2 deletions src/admin/templates/http/etc/qserv-czar.cnf.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,6 @@ largestPriority = 3
vectRunSizes = 50:50:50:50
# Minimum number of threads running for each queue. No spaces. Values separated by ':'
vectMinRunningSizes = 0:1:3:3
# Maximum number of QueryRequests allowed to be running at one time.
qReqPseudoFifoMaxRunning = 299

[replication]

Expand Down
10 changes: 4 additions & 6 deletions src/admin/templates/proxy/etc/qserv-czar.cnf.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -98,21 +98,19 @@ notifyWorkersOnCzarRestart = 1
#[debug]
#chunkLimit = -1

# Please see qdisp/QdispPool.h QdispPool::QdispPool for more information
# Please see util/QdispPool.h QdispPool::QdispPool for more information
[qdisppool]
#size of the pool
poolSize = 50
poolSize = 1000
# Low numbers are higher priority. Largest priority 3 creates 4 priority queues 0, 1, 2, 3
# Must be greater than 0.
largestPriority = 3
# Maximum number of threads running for each queue. No spaces. Values separated by ':'
# Using largestPriority = 2 and vectRunsizes = 3:5:8
# queue 0 would have runSize 3, queue 1 would have runSize 5, and queue 2 would have runSize 8.
vectRunSizes = 50:50:50:50
vectRunSizes = 800:800:500:500
# Minimum number of threads running for each queue. No spaces. Values separated by ':'
vectMinRunningSizes = 0:1:3:3
# Maximum number of QueryRequests allowed to be running at one time.
qReqPseudoFifoMaxRunning = 299
vectMinRunningSizes = 0:3:3:3

[replication]

Expand Down
7 changes: 3 additions & 4 deletions src/cconfig/CzarConfig.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,17 @@ namespace lsst::qserv::cconfig {

std::mutex CzarConfig::_mtxOnInstance;

std::shared_ptr<CzarConfig> CzarConfig::_instance;
CzarConfig::Ptr CzarConfig::_instance;

std::shared_ptr<CzarConfig> CzarConfig::create(std::string const& configFileName,
std::string const& czarName) {
CzarConfig::Ptr CzarConfig::create(std::string const& configFileName, std::string const& czarName) {
std::lock_guard<std::mutex> const lock(_mtxOnInstance);
if (_instance == nullptr) {
_instance = std::shared_ptr<CzarConfig>(new CzarConfig(util::ConfigStore(configFileName), czarName));
}
return _instance;
}

std::shared_ptr<CzarConfig> CzarConfig::instance() {
CzarConfig::Ptr CzarConfig::instance() {
std::lock_guard<std::mutex> const lock(_mtxOnInstance);
if (_instance == nullptr) {
throw std::logic_error("CzarConfig::" + std::string(__func__) + ": instance has not been created.");
Expand Down
93 changes: 53 additions & 40 deletions src/cconfig/CzarConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ namespace lsst::qserv::cconfig {
*/
class CzarConfig {
public:
using Ptr = std::shared_ptr<CzarConfig>;
/**
* Create an instance of CzarConfig and load parameters from the specifid file.
* @note One has to call this method at least once before trying to obtain
Expand All @@ -63,15 +64,15 @@ class CzarConfig {
* @param czarName - the unique name of Czar.
* @return the shared pointer to the configuration object
*/
static std::shared_ptr<CzarConfig> create(std::string const& configFileName, std::string const& czarName);
static Ptr create(std::string const& configFileName, std::string const& czarName);

/**
* Get a pointer to an instance that was created by the last call to
* the method 'create'.
* @return the shared pointer to the configuration object
* @throws std::logic_error when attempting to call the bethod before creating an instance.
*/
static std::shared_ptr<CzarConfig> instance();
static Ptr instance();

CzarConfig() = delete;
CzarConfig(CzarConfig const&) = delete;
Expand Down Expand Up @@ -108,36 +109,8 @@ class CzarConfig {
*/
int getInteractiveChunkLimit() const { return _interactiveChunkLimit->getVal(); }

/* Get hostname and port for xrootd manager
*
* "localhost:1094" is the most reasonable default, even though it is
* the wrong choice for all but small developer installations
*
* @return a string containing "<hostname>:<port>"
*/
std::string const& getXrootdFrontendUrl() const { return _xrootdFrontendUrl->getVal(); }

/* Get the maximum number of threads for xrootd to use.
*
* @return the maximum number of threads for xrootd to use.
*/
int getXrootdCBThreadsMax() const { return _xrootdCBThreadsMax->getVal(); }

/* Get the initial number of threads for xrootd to create and maintain.
*
* @return the initial number of threads for xrootd to use.
*/
int getXrootdCBThreadsInit() const { return _xrootdCBThreadsInit->getVal(); }

bool getQueryDistributionTestVer() const { return _queryDistributionTestVer->getVal(); }

/*
* @return A value of the "spread" parameter. This may improve a performance
* of xrootd for catalogs with the large number of chunks. The default value
* of this parameter in xrootd is 4.
*/
int getXrootdSpread() const { return _xrootdSpread->getVal(); }

/* Get minimum number of seconds between QMeta chunk completion updates.
*
* @return seconds between QMeta chunk completion updates.
Expand Down Expand Up @@ -198,6 +171,31 @@ class CzarConfig {
/// the OOM situation.
unsigned int czarStatsRetainPeriodSec() const { return _czarStatsRetainPeriodSec->getVal(); }

/// A worker is considered fully ALIVE if the last update from the worker has been
/// heard in less than _activeWorkerTimeoutAliveSecs seconds.
int getActiveWorkerTimeoutAliveSecs() const { return _activeWorkerTimeoutAliveSecs->getVal(); }

/// A worker is considered DEAD if it hasn't been heard from in more than
/// _activeWorkerTimeoutDeadSecs.
int getActiveWorkerTimeoutDeadSecs() const { return _activeWorkerTimeoutDeadSecs->getVal(); }

/// Max lifetime of a message to be sent to an active worker. If the czar has been
/// trying to send a message to a worker and has failed for this many seconds,
/// it gives up at this point, removing elements of the message to save memory.
int getActiveWorkerMaxLifetimeSecs() const { return _activeWorkerMaxLifetimeSecs->getVal(); }

/// The maximum number of chunks (basically Jobs) allowed in a single UberJob.
int getUberJobMaxChunks() const { return _uberJobMaxChunks->getVal(); }

/// Return the maximum number of http connections to use for czar commands.
int getCommandMaxHttpConnections() const { return _commandMaxHttpConnections->getVal(); }

/// Return the sleep time (in milliseconds) between messages sent to active workers.
int getMonitorSleepTimeMilliSec() const { return _monitorSleepTimeMilliSec->getVal(); }

/// Return true if family map chunk distribution should depend on chunk size.
bool getFamilyMapUsingChunkSize() const { return _familyMapUsingChunkSize->getVal(); }

// Parameters of the Czar management service

std::string const& replicationInstanceId() const { return _replicationInstanceId->getVal(); }
Expand Down Expand Up @@ -293,7 +291,7 @@ class CzarConfig {
CVTIntPtr _resultMaxConnections =
util::ConfigValTInt::create(_configValMap, "resultdb", "maxconnections", notReq, 40);
CVTIntPtr _resultMaxHttpConnections =
util::ConfigValTInt::create(_configValMap, "resultdb", "maxhttpconnections", notReq, 8192);
util::ConfigValTInt::create(_configValMap, "resultdb", "maxhttpconnections", notReq, 2000);
CVTIntPtr _oldestResultKeptDays =
util::ConfigValTInt::create(_configValMap, "resultdb", "oldestResultKeptDays", notReq, 30);

Expand Down Expand Up @@ -332,8 +330,6 @@ class CzarConfig {
CVTStrPtr _qstatusDb =
util::ConfigValTStr::create(_configValMap, "qstatus", "db", notReq, "qservStatusData");

CVTStrPtr _xrootdFrontendUrl =
util::ConfigValTStr::create(_configValMap, "frontend", "xrootd", notReq, "localhost:1094");
CVTStrPtr _emptyChunkPath =
util::ConfigValTStr::create(_configValMap, "partitioner", "emptyChunkPath", notReq, ".");
CVTIntPtr _maxMsgSourceStore =
Expand All @@ -344,19 +340,18 @@ class CzarConfig {
CVTIntPtr _qdispMaxPriority =
util::ConfigValTInt::create(_configValMap, "qdisppool", "largestPriority", notReq, 2);
CVTStrPtr _qdispVectRunSizes =
util::ConfigValTStr::create(_configValMap, "qdisppool", "vectRunSizes", notReq, "50:50:50:50");
util::ConfigValTStr::create(_configValMap, "qdisppool", "vectRunSizes", notReq, "800:800:500:50");
CVTStrPtr _qdispVectMinRunningSizes =
util::ConfigValTStr::create(_configValMap, "qdisppool", "vectMinRunningSizes", notReq, "0:1:3:3");
util::ConfigValTStr::create(_configValMap, "qdisppool", "vectMinRunningSizes", notReq, "0:3:3:3");

// UberJobs
CVTIntPtr _uberJobMaxChunks =
util::ConfigValTInt::create(_configValMap, "uberjob", "maxChunks", notReq, 10000);

CVTIntPtr _xrootdSpread = util::ConfigValTInt::create(_configValMap, "tuning", "xrootdSpread", notReq, 4);
CVTIntPtr _qMetaSecsBetweenChunkCompletionUpdates = util::ConfigValTInt::create(
_configValMap, "tuning", "qMetaSecsBetweenChunkCompletionUpdates", notReq, 60);
CVTIntPtr _interactiveChunkLimit =
util::ConfigValTInt::create(_configValMap, "tuning", "interactiveChunkLimit", notReq, 10);
CVTIntPtr _xrootdCBThreadsMax =
util::ConfigValTInt::create(_configValMap, "tuning", "xrootdCBThreadsMax", notReq, 500);
CVTIntPtr _xrootdCBThreadsInit =
util::ConfigValTInt::create(_configValMap, "tuning", "xrootdCBThreadsInit", notReq, 50);
CVTIntPtr _queryDistributionTestVer =
util::ConfigValTInt::create(_configValMap, "tuning", "queryDistributionTestVer", notReq, 0);
CVTBoolPtr _notifyWorkersOnQueryFinish =
Expand Down Expand Up @@ -385,6 +380,24 @@ class CzarConfig {
util::ConfigValTInt::create(_configValMap, "replication", "http_port", notReq, 0);
CVTUIntPtr _replicationNumHttpThreads =
util::ConfigValTUInt::create(_configValMap, "replication", "num_http_threads", notReq, 2);

// Active Worker
CVTIntPtr _activeWorkerTimeoutAliveSecs = // 5min
util::ConfigValTInt::create(_configValMap, "activeworker", "timeoutAliveSecs", notReq, 60 * 5);
CVTIntPtr _activeWorkerTimeoutDeadSecs = // 10min
util::ConfigValTInt::create(_configValMap, "activeworker", "timeoutDeadSecs", notReq, 60 * 10);
CVTIntPtr _activeWorkerMaxLifetimeSecs = // 1hr
util::ConfigValTInt::create(_configValMap, "activeworker", "maxLifetimeSecs", notReq, 60 * 60);
CVTIntPtr _monitorSleepTimeMilliSec = util::ConfigValTInt::create(
_configValMap, "activeworker", "monitorSleepTimeMilliSec", notReq, 15'000);

// FamilyMap
CVTBoolPtr _familyMapUsingChunkSize =
util::ConfigValTBool::create(_configValMap, "familymap", "usingChunkSize", notReq, 0);

/// This may impact `_resultMaxHttpConnections` as too many connections may cause kernel memory issues.
CVTIntPtr _commandMaxHttpConnections =
util::ConfigValTInt::create(_configValMap, "uberjob", "commandMaxHttpConnections", notReq, 2000);
};

} // namespace lsst::qserv::cconfig
Expand Down
2 changes: 0 additions & 2 deletions src/ccontrol/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ target_link_libraries(ccontrol PUBLIC
parser
replica
sphgeom
xrdreq
XrdCl
)

Expand All @@ -52,7 +51,6 @@ FUNCTION(ccontrol_tests)
qserv_meta
query
rproc
xrdreq
Boost::unit_test_framework
Threads::Threads
)
Expand Down
Loading