Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions src/spider/client/Data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,26 @@ class Data {
m_data_store->set_data_locality(*conn, *m_impl);
}

/**
* Sets the data as checkpointed, indicating the data should not be cleaned up.
*
* @throw spider::ConnectionException
*/
void set_checkpointed() {
m_impl->set_persisted(true);
if (nullptr != m_connection) {
m_data_store->set_data_persisted(*m_connection, *m_impl);
return;
}
std::variant<std::unique_ptr<core::StorageConnection>, core::StorageErr> conn_result
= m_storage_factory->provide_storage_connection();
if (std::holds_alternative<core::StorageErr>(conn_result)) {
throw ConnectionException(std::get<core::StorageErr>(conn_result).description);
}
auto conn = std::move(std::get<std::unique_ptr<core::StorageConnection>>(conn_result));
m_data_store->set_data_persisted(*conn, *m_impl);
}

class Builder {
public:
/**
Expand Down Expand Up @@ -107,6 +127,16 @@ class Data {
return *this;
}

/**
* Sets the data as checkpointed, indicating the data should not be cleaned up.
*
* @return self
*/
auto set_checkpointed() -> Builder& {
m_persisted = true;
return *this;
}

/**
* Builds the data object.
*
Expand All @@ -120,6 +150,7 @@ class Data {
auto data = std::make_unique<core::Data>(std::string{buffer.data(), buffer.size()});
data->set_locality(m_nodes);
data->set_hard_locality(m_hard_locality);
data->set_persisted(m_persisted);
std::shared_ptr<core::StorageConnection> conn = m_connection;
if (nullptr == conn) {
std::variant<std::unique_ptr<core::StorageConnection>, core::StorageErr> conn_result
Expand Down Expand Up @@ -176,6 +207,7 @@ class Data {
std::vector<std::string> m_nodes;
bool m_hard_locality = false;
std::function<void(T const&)> m_cleanup_func;
bool m_persisted = false;

std::shared_ptr<core::DataStorage> m_data_store;
std::shared_ptr<core::StorageFactory> m_storage_factory;
Expand Down
5 changes: 5 additions & 0 deletions src/spider/core/Data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,16 @@ class Data {

void set_hard_locality(bool const hard) { m_hard_locality = hard; }

void set_persisted(bool const persisted) { this->m_persisted = persisted; }

[[nodiscard]] auto is_persisted() const -> bool { return m_persisted; }

private:
boost::uuids::uuid m_id;
std::string m_value;
std::vector<std::string> m_locality;
bool m_hard_locality = false;
bool m_persisted = false;

void init_id() {
boost::uuids::random_generator gen;
Expand Down
1 change: 1 addition & 0 deletions src/spider/storage/DataStorage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class DataStorage {
) -> StorageErr
= 0;
virtual auto set_data_locality(StorageConnection& conn, Data const& data) -> StorageErr = 0;
virtual auto set_data_persisted(StorageConnection& conn, Data const& data) -> StorageErr = 0;
virtual auto remove_data(StorageConnection& conn, boost::uuids::uuid id) -> StorageErr = 0;
virtual auto
add_task_reference(StorageConnection& conn, boost::uuids::uuid id, boost::uuids::uuid task_id)
Expand Down
30 changes: 27 additions & 3 deletions src/spider/storage/mysql/MySqlStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1987,13 +1987,15 @@ auto MySqlDataStorage::add_driver_data(
try {
std::unique_ptr<sql::PreparedStatement> statement(
static_cast<MySqlConnection&>(conn)->prepareStatement(
"INSERT INTO `data` (`id`, `value`, `hard_locality`) VALUES(?, ?, ?)"
"INSERT INTO `data` (`id`, `value`, `hard_locality`, `persisted`) "
"VALUES(?, ?, ?, ?)"
)
);
sql::bytes id_bytes = uuid_get_bytes(data.get_id());
statement->setBytes(1, &id_bytes);
statement->setString(2, data.get_value());
statement->setBoolean(3, data.is_hard_locality());
statement->setBoolean(4, data.is_persisted());
statement->executeUpdate();

for (std::string const& addr : data.get_locality()) {
Expand Down Expand Up @@ -2035,13 +2037,15 @@ auto MySqlDataStorage::add_task_data(
try {
std::unique_ptr<sql::PreparedStatement> statement(
static_cast<MySqlConnection&>(conn)->prepareStatement(
"INSERT INTO `data` (`id`, `value`, `hard_locality`) VALUES(?, ?, ?)"
"INSERT INTO `data` (`id`, `value`, `hard_locality`, `persisted`) "
"VALUES(?, ?, ?, ?)"
)
);
sql::bytes id_bytes = uuid_get_bytes(data.get_id());
statement->setBytes(1, &id_bytes);
statement->setString(2, data.get_value());
statement->setBoolean(3, data.is_hard_locality());
statement->setBoolean(4, data.is_persisted());
statement->executeUpdate();

for (std::string const& addr : data.get_locality()) {
Expand Down Expand Up @@ -2081,7 +2085,7 @@ auto MySqlDataStorage::get_data_with_locality(
) -> StorageErr {
std::unique_ptr<sql::PreparedStatement> statement(
static_cast<MySqlConnection&>(conn)->prepareStatement(
"SELECT `id`, `value`, `hard_locality` FROM `data` WHERE `id` = ?"
"SELECT `id`, `value`, `hard_locality`, `persisted` FROM `data` WHERE `id` = ?"
)
);
sql::bytes id_bytes = uuid_get_bytes(id);
Expand All @@ -2097,6 +2101,7 @@ auto MySqlDataStorage::get_data_with_locality(
res->next();
*data = Data{id, get_sql_string(res->getString(2))};
data->set_hard_locality(res->getBoolean(3));
data->set_persisted(res->getBoolean(4));

std::unique_ptr<sql::PreparedStatement> locality_statement(
static_cast<MySqlConnection&>(conn)->prepareStatement(
Expand Down Expand Up @@ -2226,6 +2231,25 @@ auto MySqlDataStorage::set_data_locality(StorageConnection& conn, Data const& da
return StorageErr{};
}

auto MySqlDataStorage::set_data_persisted(StorageConnection& conn, Data const& data) -> StorageErr {
try {
sql::bytes id_bytes = uuid_get_bytes(data.get_id());
std::unique_ptr<sql::PreparedStatement> statement(
static_cast<MySqlConnection&>(conn)->prepareStatement(
"UPDATE `data` SET `persisted` = ? WHERE `id` = ?"
)
);
statement->setBoolean(1, data.is_persisted());
statement->setBytes(2, &id_bytes);
statement->executeUpdate();
} catch (sql::SQLException& e) {
static_cast<MySqlConnection&>(conn)->rollback();
return StorageErr{StorageErrType::OtherErr, e.what()};
}
static_cast<MySqlConnection&>(conn)->commit();
return StorageErr{};
}

auto MySqlDataStorage::remove_data(StorageConnection& conn, boost::uuids::uuid id) -> StorageErr {
try {
std::unique_ptr<sql::PreparedStatement> statement(
Expand Down
1 change: 1 addition & 0 deletions src/spider/storage/mysql/MySqlStorage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ class MySqlDataStorage : public DataStorage {
Data* data
) -> StorageErr override;
auto set_data_locality(StorageConnection& conn, Data const& data) -> StorageErr override;
auto set_data_persisted(StorageConnection& conn, Data const& data) -> StorageErr override;
auto remove_data(StorageConnection& conn, boost::uuids::uuid id) -> StorageErr override;
auto
add_task_reference(StorageConnection& conn, boost::uuids::uuid id, boost::uuids::uuid task_id)
Expand Down
9 changes: 8 additions & 1 deletion tests/storage/test-DataStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ TEMPLATE_LIST_TEST_CASE(
auto conn = std::move(std::get<std::unique_ptr<spider::core::StorageConnection>>(conn_result));

// Add driver and data
spider::core::Data const data{"value"};
spider::core::Data data{"value"};
boost::uuids::random_generator gen;
boost::uuids::uuid const driver_id = gen();
REQUIRE(metadata_storage->add_driver(*conn, spider::core::Driver{driver_id}).success());
Expand All @@ -56,6 +56,13 @@ TEMPLATE_LIST_TEST_CASE(
REQUIRE(data_storage->get_data(*conn, data.get_id(), &result).success());
REQUIRE(spider::test::data_equal(data, result));

// Set data persisted should succeed
data.set_persisted(true);
REQUIRE(data_storage->set_data_persisted(*conn, data).success());
// Get data should match
REQUIRE(data_storage->get_data(*conn, data.get_id(), &result).success());
REQUIRE(spider::test::data_equal(data, result));

// Remove data should succeed
REQUIRE(data_storage->remove_data(*conn, data.get_id()).success());

Expand Down
4 changes: 4 additions & 0 deletions tests/utils/CoreDataUtils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ inline auto data_equal(core::Data const& d1, core::Data const& d2) -> bool {
return false;
}

if (d1.is_persisted() != d2.is_persisted()) {
return false;
}

return true;
}
} // namespace spider::test
Expand Down