Skip to content

Commit

Permalink
Post-rebase corrections
Browse files Browse the repository at this point in the history
The corrections were made to reconcile changes made in both the main
branch and in the current one. The following problems were addressed:

- both branches introduced changes to the Qmeta schema (up to version 10)
- a configuration of the worker's result delivery REST service changed in the main branch
- class move was made in the feature branch: qdisp::MessageStore -> qmeta::MessageStore
  • Loading branch information
iagaponenko committed Feb 21, 2025
1 parent 67df437 commit 7694472
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 28 deletions.
4 changes: 2 additions & 2 deletions src/ccontrol/UserQueryQueries.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
// Qserv headers
#include "css/CssAccess.h"
#include "css/CssError.h"
#include "qdisp/MessageStore.h"
#include "qmeta/MessageStore.h"
#include "qmeta/Exceptions.h"
#include "qmeta/QMetaSelect.h"
#include "query/FromList.h"
Expand Down Expand Up @@ -68,7 +68,7 @@ UserQueryQueries::UserQueryQueries(std::shared_ptr<query::SelectStmt> const& sta
: _resultDbConn(resultDbConn),
_qMetaSelect(qMetaSelect),
_qMetaCzarId(qMetaCzarId),
_messageStore(std::make_shared<qdisp::MessageStore>()),
_messageStore(std::make_shared<qmeta::MessageStore>()),
_resultTableName(::g_nextResultTableId(userQueryId)),
_resultDb(resultDb) {
// The SQL statement should be mostly OK alredy but we need to change
Expand Down
4 changes: 2 additions & 2 deletions src/ccontrol/UserQueryQueries.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class UserQueryQueries : public UserQuery {
void discard() override;

// Delegate objects
std::shared_ptr<qdisp::MessageStore> getMessageStore() override { return _messageStore; }
std::shared_ptr<qmeta::MessageStore> getMessageStore() override { return _messageStore; }

/// @return Name of the result table for this query, can be empty
std::string getResultTableName() const override { return _resultTableName; }
Expand All @@ -108,7 +108,7 @@ class UserQueryQueries : public UserQuery {
std::shared_ptr<qmeta::QMetaSelect> _qMetaSelect;
qmeta::CzarId const _qMetaCzarId; ///< Czar ID in QMeta database
QueryState _qState = UNKNOWN;
std::shared_ptr<qdisp::MessageStore> _messageStore;
std::shared_ptr<qmeta::MessageStore> _messageStore;
std::string _resultTableName;
std::string _query; ///< query to execute on QMeta database
std::string _orderBy;
Expand Down
2 changes: 1 addition & 1 deletion src/qmeta/QMetaMysql.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ using namespace std;
namespace {

// Current version of QMeta schema
char const VERSION_STR[] = "10";
char const VERSION_STR[] = "11";

LOG_LOGGER _log = LOG_GET("lsst.qserv.qmeta.QMetaMysql");

Expand Down
13 changes: 13 additions & 0 deletions src/qmeta/schema/migrate-10-to-11.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
CREATE TABLE IF NOT EXISTS `chunkMap` (
`worker` VARCHAR(256) NOT NULL COMMENT 'A unique identifier of a worker hosting the chunk replica',
`database` VARCHAR(256) NOT NULL COMMENT 'The name of a database',
`table` VARCHAR(256) NOT NULL COMMENT 'The name of a table',
`chunk` INT UNSIGNED NOT NULL COMMENT 'The number of a chunk',
`size` BIGINT UNSIGNED NOT NULL COMMENT 'The size of a chunk')
ENGINE = InnoDB
COMMENT = 'Chunk disposition across workers';

CREATE TABLE IF NOT EXISTS `chunkMapStatus` (
`update_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'The most recent update time of the map')
ENGINE = InnoDB
COMMENT = 'Satus info on the chunk map';
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,28 @@ CREATE TABLE IF NOT EXISTS `QMessages` (
ENGINE = InnoDB
COMMENT = 'Table of messages generated during queries.';

-- -----------------------------------------------------
-- Table `chunkMap`
-- -----------------------------------------------------

CREATE TABLE IF NOT EXISTS `chunkMap` (
`worker` VARCHAR(256) NOT NULL COMMENT 'A unique identifier of a worker hosting the chunk replica',
`database` VARCHAR(256) NOT NULL COMMENT 'The name of a database',
`table` VARCHAR(256) NOT NULL COMMENT 'The name of a table',
`chunk` INT UNSIGNED NOT NULL COMMENT 'The number of a chunk',
`size` BIGINT UNSIGNED NOT NULL COMMENT 'The size of a chunk')
ENGINE = InnoDB
COMMENT = 'Chunk disposition across workers';

-- -----------------------------------------------------
-- Table `chunkMapStatus`
-- -----------------------------------------------------

CREATE TABLE IF NOT EXISTS `chunkMapStatus` (
`update_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'The most recent update time of the map')
ENGINE = InnoDB
COMMENT = 'Satus info on the chunk map';

-- Update version on every schema change.
-- Version 0 corresponds to initial QMeta release and it had no
-- QMetadata table at all.
Expand All @@ -208,4 +230,5 @@ COMMENT = 'Table of messages generated during queries.';
-- Version 8 replaced INT with BIGINT in the byte and row counter columns of QInfo.
-- Version 9 removed the full-text index on the query text from QInfo.
-- Version 10 redefined schema of the ProcessList tables.
INSERT INTO `QMetadata` (`metakey`, `value`) VALUES ('version', '10');
-- Version 11 added the worker-to-chunk map tables chunkMap and chunkMapStatus
INSERT INTO `QMetadata` (`metakey`, `value`) VALUES ('version', '11');
2 changes: 0 additions & 2 deletions src/wbase/Task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ size_t const MB_SIZE_BYTES = 1024 * 1024;

namespace lsst::qserv::wbase {

string const Task::_fqdn = util::get_current_host_fqdn();

// Task::ChunkEqual functor
bool Task::ChunkEqual::operator()(Task::Ptr const& x, Task::Ptr const& y) {
if (!x || !y) {
Expand Down
27 changes: 11 additions & 16 deletions src/wbase/UberJobData.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
using namespace std;
using namespace nlohmann;

namespace fs = boost::filesystem;

namespace {

LOG_LOGGER _log = LOG_GET("lsst.qserv.wbase.UberJobData");
Expand Down Expand Up @@ -180,35 +182,28 @@ void UberJobData::_queueUJResponse(http::Method method_, std::vector<std::string
}
}

string UberJobData::buildUjResultFilePath(string const& resultsDirname) {
if (resultsDirname.empty()) return resultsDirname;
boost::filesystem::path path(resultsDirname);
string UberJobData::_resultFileName() const {
// UberJobs have multiple chunks which can each have different attempt numbers.
// However, each CzarID + UberJobId should be unique as UberJobs are not retried.
path /= to_string(getCzarId()) + "-" + to_string(getQueryId()) + "-" + to_string(getUberJobId()) + "-0" +
".proto";
return path.string();
return to_string(getCzarId()) + "-" + to_string(getQueryId()) + "-" + to_string(getUberJobId()) + "-0" +
".proto";
}

string UberJobData::resultFilePath() {
auto const workerConfig = wconfig::WorkerConfig::instance();
string resultFilePath = buildUjResultFilePath(workerConfig->resultsDirname());
return resultFilePath;
string UberJobData::resultFilePath() const {
string const resultsDirname = wconfig::WorkerConfig::instance()->resultsDirname();
if (resultsDirname.empty()) return resultsDirname;
return (fs::path(resultsDirname) / _resultFileName()).string();
}

std::string UberJobData::resultFileHttpUrl() {
std::string UberJobData::resultFileHttpUrl() const {
auto const workerConfig = wconfig::WorkerConfig::instance();
auto const resultDeliveryProtocol = workerConfig->resultDeliveryProtocol();

string resFilePath = resultFilePath();
auto const fqdn = _foreman->getFqdn();
if (resultDeliveryProtocol != wconfig::ConfigValResultDeliveryProtocol::HTTP) {
throw runtime_error("wbase::Task::Task: unsupported results delivery protocol: " +
wconfig::ConfigValResultDeliveryProtocol::toString(resultDeliveryProtocol));
}
// TODO:UJ it seems like this should just be part of the FileChannelShared???
string resultFileHttpUrl = "http://" + fqdn + ":" + to_string(_resultsHttpPort) + resFilePath;
return resultFileHttpUrl;
return "http://" + _foreman->getFqdn() + ":" + to_string(_resultsHttpPort) + "/" + _resultFileName();
}

void UberJobData::cancelAllTasks() {
Expand Down
10 changes: 6 additions & 4 deletions src/wbase/UberJobData.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,10 @@ class UberJobData : public std::enable_shared_from_this<UberJobData> {
/// that there is no limit to the number of rows sent back by the worker.
/// Workers can only safely limit rows for queries that have the LIMIT clause without other related
/// clauses like ORDER BY.
int getRowLimit() { return _rowLimit; }
int getRowLimit() const { return _rowLimit; }

std::string buildUjResultFilePath(std::string const& resultsDirname);
std::string resultFilePath();
std::string resultFileHttpUrl();
std::string resultFilePath() const;
std::string resultFileHttpUrl() const;

private:
UberJobData(UberJobId uberJobId, std::string const& czarName, qmeta::CzarId czarId, std::string czarHost,
Expand All @@ -130,6 +129,9 @@ class UberJobData : public std::enable_shared_from_this<UberJobData> {
std::string const& workerId, std::shared_ptr<wcontrol::Foreman> const& foreman,
std::string const& authKey, uint16_t resultsHttpPort);

/// Return the name of the file that will contain the results of the query.
std::string _resultFileName() const;

/// Queue the response to be sent to the originating czar.
void _queueUJResponse(http::Method method_, std::vector<std::string> const& headers_,
std::string const& url_, std::string const& requestContext_,
Expand Down

0 comments on commit 7694472

Please sign in to comment.