Skip to content
Open
Show file tree
Hide file tree
Changes from 46 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
467861d
Add storage functions to get job with failed tasks and partially rese…
sitaowang1998 May 15, 2025
409970b
Update retry count when resetting tasks
sitaowang1998 May 15, 2025
319c21f
Fix typo
sitaowang1998 May 15, 2025
39c2f97
Set the job fail only if we run out of retries
sitaowang1998 May 15, 2025
cfb49d2
Bug fix for sql statments
sitaowang1998 May 15, 2025
7a8a0a2
Add unit test for partial job reset
sitaowang1998 May 15, 2025
4a30602
WIP for job recovery
sitaowang1998 May 20, 2025
66eb5d2
Add persisted in data
sitaowang1998 May 21, 2025
22fa399
Add data persistence in storage and client
sitaowang1998 May 21, 2025
cd3b530
Add data persistence test
sitaowang1998 May 21, 2025
d8a99ca
WIP
sitaowang1998 May 21, 2025
cb1a1e5
Add persisted in data
sitaowang1998 May 21, 2025
42411df
Add data persistence in storage and client
sitaowang1998 May 21, 2025
041679f
Add data persistence test
sitaowang1998 May 21, 2025
f9bcb0e
Fix the tests
sitaowang1998 May 21, 2025
027c835
Merge branch 'main' into data_persistence
sitaowang1998 May 26, 2025
6cc86a0
Merge branch 'data_persistence' into checkpoint-frontier
sitaowang1998 May 26, 2025
cc031dc
Fix relative header
sitaowang1998 May 26, 2025
46ae85a
Add job recovery
sitaowang1998 May 26, 2025
74aade0
Add getters for job recovery result
sitaowang1998 May 26, 2025
f93723d
Add docstring
sitaowang1998 May 26, 2025
11c2cd7
Fix clang-tidy
sitaowang1998 May 26, 2025
584ad73
Fix clang tidy
sitaowang1998 May 26, 2025
e9ccff6
Fix clang tidy
sitaowang1998 May 26, 2025
b7e0911
Add recovery loop in scheduler
sitaowang1998 May 26, 2025
24b858a
Fix clang tidy
sitaowang1998 May 26, 2025
668902b
Fix clang tidy
sitaowang1998 May 26, 2025
dc38efc
Bug fix
sitaowang1998 May 26, 2025
dc6918e
Add simple job recovery unit test
sitaowang1998 May 26, 2025
14f2b94
Adding children of rollbacked task into queue
sitaowang1998 May 28, 2025
0307dda
Break up job recovery step into separate function
sitaowang1998 May 28, 2025
0ef1aa2
Fix clang-tidy
sitaowang1998 May 28, 2025
8838cda
Fix typo
sitaowang1998 May 29, 2025
7172eee
Add multiple task recovery test
sitaowang1998 May 29, 2025
ddfc24a
Merge branch 'main' into checkpoint-frontier
sitaowang1998 May 29, 2025
b09b56a
Add cleanup for tests
sitaowang1998 May 29, 2025
cfb0159
Add driver removel in tests
sitaowang1998 May 29, 2025
5a2d396
Add more tests for job recovery
sitaowang1998 May 29, 2025
084adc8
Fix not updating task state to ready when multiple children exists
sitaowang1998 May 29, 2025
0e96dc7
Fix multiple parents
sitaowang1998 May 29, 2025
cdc841f
Add more tests
sitaowang1998 May 29, 2025
7571304
Fix clang-tidy
sitaowang1998 May 29, 2025
accbdfd
Use dot graph for docstring
sitaowang1998 May 29, 2025
bd57749
Make JobRecovery getters const
sitaowang1998 May 29, 2025
d32dc7f
Use const for ready and pending tasks in test
sitaowang1998 May 29, 2025
f412bc3
Bug fix
sitaowang1998 May 29, 2025
c3f438c
Bug fix
sitaowang1998 May 29, 2025
55bfdaf
Fix interval
sitaowang1998 May 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/spider/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ set(SPIDER_CORE_SOURCES
core/DriverCleaner.cpp
core/JobCleaner.cpp
core/Task.cpp
core/JobRecovery.cpp
storage/mysql/MySqlConnection.cpp
storage/mysql/MySqlStorageFactory.cpp
storage/mysql/MySqlJobSubmissionBatch.cpp
Expand All @@ -27,6 +28,7 @@ set(SPIDER_CORE_HEADERS
core/Task.hpp
core/TaskGraph.hpp
core/JobMetadata.hpp
core/JobRecovery.hpp
io/BoostAsio.hpp
io/MsgPack.hpp
io/msgpack_message.hpp
Expand Down
32 changes: 32 additions & 0 deletions src/spider/client/Data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,26 @@ class Data {
m_data_store->set_data_locality(*conn, *m_impl);
}

/**
* Sets the data as checkpointed, indicating the data should not be cleaned up.
*
* @throw spider::ConnectionException
*/
void set_checkpointed() {
m_impl->set_persisted(true);
if (nullptr != m_connection) {
m_data_store->set_data_persisted(*m_connection, *m_impl);
return;
}
std::variant<std::unique_ptr<core::StorageConnection>, core::StorageErr> conn_result
= m_storage_factory->provide_storage_connection();
if (std::holds_alternative<core::StorageErr>(conn_result)) {
throw ConnectionException(std::get<core::StorageErr>(conn_result).description);
}
auto conn = std::move(std::get<std::unique_ptr<core::StorageConnection>>(conn_result));
m_data_store->set_data_persisted(*conn, *m_impl);
}

class Builder {
public:
/**
Expand Down Expand Up @@ -106,6 +126,16 @@ class Data {
return *this;
}

/**
* Sets the data as checkpointed, indicating the data should not be cleaned up.
*
* @return self
*/
auto set_checkpointed() -> Builder& {
m_persisted = true;
return *this;
}

/**
* Builds the data object.
*
Expand All @@ -119,6 +149,7 @@ class Data {
auto data = std::make_unique<core::Data>(std::string{buffer.data(), buffer.size()});
data->set_locality(m_nodes);
data->set_hard_locality(m_hard_locality);
data->set_persisted(m_persisted);
std::shared_ptr<core::StorageConnection> conn = m_connection;
if (nullptr == conn) {
std::variant<std::unique_ptr<core::StorageConnection>, core::StorageErr> conn_result
Expand Down Expand Up @@ -166,6 +197,7 @@ class Data {
std::vector<std::string> m_nodes;
bool m_hard_locality = false;
std::function<void(T const&)> m_cleanup_func;
bool m_persisted = false;

std::shared_ptr<core::DataStorage> m_data_store;
std::shared_ptr<core::StorageFactory> m_storage_factory;
Expand Down
5 changes: 5 additions & 0 deletions src/spider/core/Data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,16 @@ class Data {

void set_hard_locality(bool const hard) { m_hard_locality = hard; }

void set_persisted(bool const persisted) { this->m_persisted = persisted; }

[[nodiscard]] auto is_persisted() const -> bool { return m_persisted; }

private:
boost::uuids::uuid m_id;
std::string m_value;
std::vector<std::string> m_locality;
bool m_hard_locality = false;
bool m_persisted = false;

void init_id() {
boost::uuids::random_generator gen;
Expand Down
167 changes: 167 additions & 0 deletions src/spider/core/JobRecovery.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
#include "JobRecovery.hpp"

#include <cstdint>
#include <deque>
#include <memory>
#include <optional>
#include <tuple>
#include <utility>
#include <vector>

#include <absl/container/flat_hash_set.h>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <fmt/format.h>

#include <spider/core/Data.hpp>
#include <spider/core/Error.hpp>
#include <spider/core/Task.hpp>
#include <spider/storage/DataStorage.hpp>
#include <spider/storage/MetadataStorage.hpp>
#include <spider/storage/StorageConnection.hpp>

namespace spider::core {
JobRecovery::JobRecovery(
boost::uuids::uuid const job_id,
std::shared_ptr<StorageConnection> storage_connection,
std::shared_ptr<DataStorage> data_store,
std::shared_ptr<MetadataStorage> metadata_store
)
: m_job_id{job_id},
m_conn{std::move(storage_connection)},
m_data_store{std::move(data_store)},
m_metadata_store{std::move(metadata_store)} {}

auto JobRecovery::compute_graph() -> StorageErr {
StorageErr err = m_metadata_store->get_task_graph(*m_conn, m_job_id, &m_task_graph);
if (false == err.success()) {
return err;
}

for (auto const& [task_id, task] : m_task_graph.get_tasks()) {
if (TaskState::Failed == task.get_state()) {
m_task_set.insert(task_id);
m_task_queue.push_front(task_id);
}
}

while (!m_task_queue.empty()) {
auto const task_id = m_task_queue.front();
m_task_queue.pop_front();
err = process_task(task_id);
if (false == err.success()) {
return err;
}
}

return StorageErr{};
}

auto JobRecovery::get_data(boost::uuids::uuid data_id, Data& data) -> StorageErr {
auto it = m_data_map.find(data_id);
if (it != m_data_map.end()) {
data = it->second;
return StorageErr{};
}
StorageErr const err = m_data_store->get_data(*m_conn, data_id, &data);
if (err.success()) {
m_data_map[data_id] = data;
}
return err;
}

auto JobRecovery::check_task_input(
Task const& task,
absl::flat_hash_set<boost::uuids::uuid>& not_persisted
) -> StorageErr {
for (auto const& task_input : task.get_inputs()) {
std::optional<boost::uuids::uuid> optional_data_id = task_input.get_data_id();
if (false == optional_data_id.has_value()) {
continue;
}
boost::uuids::uuid const data_id = optional_data_id.value();
Data data;
StorageErr err = get_data(data_id, data);
if (false == err.success()) {
return err;
}
if (false == data.is_persisted()) {
std::optional<std::tuple<boost::uuids::uuid, uint8_t>> optional_parent
= task_input.get_task_output();
if (false == optional_parent.has_value()) {
continue;
}
boost::uuids::uuid const parent_task_id = std::get<0>(optional_parent.value());
not_persisted.insert(parent_task_id);
}
}
return StorageErr{};
}

auto JobRecovery::process_task(boost::uuids::uuid task_id) -> StorageErr {
std::optional<Task*> const optional_task = m_task_graph.get_task(task_id);
if (false == optional_task.has_value()) {
return StorageErr{
StorageErrType::KeyNotFoundErr,
fmt::format("No task with id {}", to_string(task_id))
};
}

for (boost::uuids::uuid const& child_id : m_task_graph.get_child_tasks(task_id)) {
if (m_task_set.contains(child_id)) {
continue;
}
std::optional<Task*> optional_child_task = m_task_graph.get_task(child_id);
if (false == optional_child_task.has_value()) {
return StorageErr{
StorageErrType::KeyNotFoundErr,
fmt::format("No task with id {}", to_string(child_id))
};
}
Task const& child_task = *optional_child_task.value();
if (TaskState::Pending != child_task.get_state()) {
m_task_queue.push_back(child_id);
m_task_set.insert(child_id);
}
}

Task const& task = *optional_task.value();
absl::flat_hash_set<boost::uuids::uuid> not_persisted;
StorageErr err = check_task_input(task, not_persisted);
if (false == err.success()) {
return err;
}

if (not_persisted.empty()) {
m_ready_tasks.insert(task_id);
} else {
m_pending_tasks.insert(task_id);
for (auto const& parent_id : not_persisted) {
if (false == m_task_set.contains(parent_id)) {
m_task_queue.push_back(parent_id);
m_task_set.insert(parent_id);
}
}
}

return StorageErr{};
}

auto JobRecovery::get_pending_tasks() const -> std::vector<boost::uuids::uuid> {
std::vector<boost::uuids::uuid> pending_tasks;
pending_tasks.reserve(m_pending_tasks.size());
for (auto const& task_id : m_pending_tasks) {
pending_tasks.push_back(task_id);
}
return pending_tasks;
}

auto JobRecovery::get_ready_tasks() const -> std::vector<boost::uuids::uuid> {
std::vector<boost::uuids::uuid> ready_tasks;
ready_tasks.reserve(m_ready_tasks.size());
for (auto const& task_id : m_ready_tasks) {
ready_tasks.push_back(task_id);
}
return ready_tasks;
}
} // namespace spider::core
95 changes: 95 additions & 0 deletions src/spider/core/JobRecovery.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#ifndef SPIDER_CORE_JOBRECOVERY_HPP
#define SPIDER_CORE_JOBRECOVERY_HPP

#include <deque>
#include <memory>
#include <vector>

#include <absl/container/flat_hash_map.h>
#include <absl/container/flat_hash_set.h>
#include <boost/uuid/uuid.hpp>

#include <spider/core/Data.hpp>
#include <spider/core/Error.hpp>
#include <spider/core/Task.hpp>
#include <spider/core/TaskGraph.hpp>
#include <spider/storage/DataStorage.hpp>
#include <spider/storage/MetadataStorage.hpp>
#include <spider/storage/StorageConnection.hpp>

namespace spider::core {
class JobRecovery {
public:
JobRecovery(
boost::uuids::uuid job_id,
std::shared_ptr<StorageConnection> storage_connection,
std::shared_ptr<DataStorage> data_store,
std::shared_ptr<MetadataStorage> metadata_store
);

/**
* Recover the job by loading the task graph and data from the storage,
* compute the minimal subgraph that contains all the failed tasks and the
* data across edge are all persisted.
* The result is stored in m_ready_tasks and m_pending_tasks, where
* m_ready_tasks contains the tasks on the boundary of the subgraph, and
* m_pending_tasks contains the tasks that are not ready to run yet.
* @return StorageErr
*/
auto compute_graph() -> StorageErr;

[[nodiscard]] auto get_ready_tasks() const -> std::vector<boost::uuids::uuid>;

[[nodiscard]] auto get_pending_tasks() const -> std::vector<boost::uuids::uuid>;

private:
/**
* Check if task has any parents with non-persisted Data that feed into the task.
* @param task
* @param not_persisted Returns parents with non-persisted Data that feed into the task.
* @return
*/
auto check_task_input(Task const& task, absl::flat_hash_set<boost::uuids::uuid>& not_persisted)
-> StorageErr;

/**
* Get the data associated with the given data_id. If the data is cached in
* m_data_map, return it. Otherwise, fetch it from the data store and cache
* it.
* @param data_id
* @param data
* @return
*/
auto get_data(boost::uuids::uuid data_id, Data& data) -> StorageErr;

/*
* Process the task from the task queue with the given task_id.
* 1. Add the non-pending children of the task to the working queue.
* 2. Check if its inputs contains non-persisted Data.
* 3. If the task has non-persisted Data input and has parents, add it to pending tasks and add
* its parents with non-persistent Data to the working queue.
* 4. Otherwise, add it to ready tasks.
*
* @param task_id
* @return StorageErr
*/
auto process_task(boost::uuids::uuid task_id) -> StorageErr;

boost::uuids::uuid m_job_id;

std::shared_ptr<StorageConnection> m_conn;
std::shared_ptr<DataStorage> m_data_store;
std::shared_ptr<MetadataStorage> m_metadata_store;

absl::flat_hash_map<boost::uuids::uuid, Data> m_data_map;

TaskGraph m_task_graph;

absl::flat_hash_set<boost::uuids::uuid> m_task_set;
std::deque<boost::uuids::uuid> m_task_queue;
absl::flat_hash_set<boost::uuids::uuid> m_ready_tasks;
absl::flat_hash_set<boost::uuids::uuid> m_pending_tasks;
};
} // namespace spider::core

#endif
Loading
Loading