Skip to content

Commit

Permalink
Improved UberJobFile recovery after file collection error.
Browse files Browse the repository at this point in the history
  • Loading branch information
jgates108 committed Feb 11, 2025
1 parent 68418b5 commit 9495456
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 15 deletions.
18 changes: 13 additions & 5 deletions src/ccontrol/MergingHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ lsst::qserv::TimeCountTracker<double>::CALLBACKFUNC const reportFileRecvRate =
}
};

/// If success, then everything is fine.
/// If not success, and not mergeHappened, the user query can be saved by abandoning
/// this UberJob. If mergeHappened, the result table is fouled and the user query is ruined.
/// @return bool success - true if operation was successful.
/// @return bool mergeHappened - true if merging was started.
std::tuple<bool, bool> readHttpFileAndMergeHttp(
lsst::qserv::qdisp::UberJob::Ptr const& uberJob, string const& httpUrl,
function<bool(char const*, uint32_t, bool&)> const& messageIsReady,
Expand Down Expand Up @@ -119,7 +124,7 @@ std::tuple<bool, bool> readHttpFileAndMergeHttp(
// The value is stays 0 while reading the frame header.
uint32_t msgSizeBytes = 0;
bool success = true;
bool mergeSuccess = true;
bool mergeHappened = false;
int headerCount = 0;
uint64_t totalBytesRead = 0;
try {
Expand Down Expand Up @@ -204,9 +209,12 @@ std::tuple<bool, bool> readHttpFileAndMergeHttp(
}

// Parse and evaluate the message.
mergeSuccess = messageIsReady(msgBuf.get(), msgSizeBytes, last);
//&&&mergeHappened = messageIsReady(msgBuf.get(), msgSizeBytes, last);
mergeHappened = true;
bool messageReadyResult = messageIsReady(msgBuf.get(), msgSizeBytes, last);
totalBytesRead += msgSizeBytes;
if (!mergeSuccess) {
//&&&if (!mergeHappened) {
if (!messageReadyResult) {
success = false;
throw runtime_error("message processing failed at offset " +
to_string(offset - msgSizeBytes) + ", file: " + httpUrl);
Expand Down Expand Up @@ -247,8 +255,8 @@ std::tuple<bool, bool> readHttpFileAndMergeHttp(
}
// If the merge failed, that indicates something went wrong in the local database table,
// is likely this user query is doomed and should be cancelled.
LOGS(_log, LOG_LVL_DEBUG, context << " end succes=" << success << " mergeSuccess=" << mergeSuccess);
return {success, mergeSuccess};
LOGS(_log, LOG_LVL_DEBUG, context << " end succes=" << success << " mergeSuccess=" << mergeHappened);
return {success, mergeHappened};
}

} // namespace
Expand Down
30 changes: 20 additions & 10 deletions src/qdisp/UberJob.cc
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ void UberJob::runUberJob() {
_unassignJobs(); // locks _jobsMtx
setStatusIfOk(qmeta::JobStatus::RESPONSE_ERROR,
cName(__func__) + " not transmitSuccess " + exceptionWhat);

} else {
setStatusIfOk(qmeta::JobStatus::REQUEST, cName(__func__) + " transmitSuccess"); // locks _jobsMtx
}
Expand Down Expand Up @@ -310,26 +309,37 @@ json UberJob::importResultFile(string const& fileUrl, uint64_t rowCount, uint64_
return;
}
uint64_t resultRows = 0;
auto [flushSuccess, flushShouldCancel] =
auto [flushSuccess, mergeHappened] =
ujPtr->getRespHandler()->flushHttp(fileUrl, rowCount, resultRows);
LOGS(_log, LOG_LVL_DEBUG, ujPtr->cName(__func__) << "::fileCollectFunc");
LOGS(_log, LOG_LVL_TRACE,
ujPtr->cName(__func__) << "::fileCollectFunc success=" << flushSuccess
<< " mergeHappened=" << mergeHappened);
if (!flushSuccess) {
// This would probably indicate malformed file+rowCount or
// writing the result table failed.
bool flushShouldCancel = false;
if (mergeHappened) {
// This would probably indicate malformed file+rowCount or writing the result table failed.
// If any merging happened, the result table is ruined.
LOGS(_log, LOG_LVL_ERROR,
ujPtr->cName(__func__)
<< "::fileCollectFunc flushHttp failed after merging, results ruined.");
flushShouldCancel = true;
} else {
// Perhaps something went wrong with file collection, so it is worth trying the jobs again
// by abandoning this UberJob.
LOGS(_log, LOG_LVL_ERROR,
ujPtr->cName(__func__) << "::fileCollectFunc flushHttp failed, retrying Jobs.");
}
ujPtr->_importResultError(flushShouldCancel, "mergeError", "merging failed");
}

// At this point all data for this job have been read, there's no point in
// having XrdSsi wait for anything.
// At this point all data for this job have been read.
ujPtr->_importResultFinish(resultRows);
};

auto cmd = util::PriorityCommand::Ptr(new util::PriorityCommand(fileCollectFunc));
exec->queueFileCollect(cmd);

// If the query meets the limit row complete complete criteria, it will start
// squashing superfluous results so the answer can be returned quickly.

// The file collection has been queued for later, let the worker know that it's okay so far.
json jsRet = {{"success", 1}, {"errortype", ""}, {"note", "queued for collection"}};
return jsRet;
}
Expand Down

0 comments on commit 9495456

Please sign in to comment.