Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(manual_compact): fix replica lose manual compact finished status after replica migrate bug #1961

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions src/base/meta_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ const std::string meta_store::DATA_VERSION = "pegasus_data_version";
const std::string meta_store::LAST_FLUSHED_DECREE = "pegasus_last_flushed_decree";
const std::string meta_store::LAST_MANUAL_COMPACT_FINISH_TIME =
"pegasus_last_manual_compact_finish_time";
const std::string meta_store::LAST_MANUAL_COMPACT_USED_TIME =
"pegasus_last_manual_compact_used_time";

const std::string meta_store::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL = "normal";
const std::string meta_store::ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE = "prefer_write";
const std::string meta_store::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD = "bulk_load";
Expand All @@ -47,6 +50,21 @@ meta_store::meta_store(const char *log_prefix,
_wt_opts.disableWAL = true;
}

dsn::error_code meta_store::get_last_manual_compact_used_time(uint64_t *ts) const
{
LOG_AND_RETURN_NOT_OK(ERROR_PREFIX,
get_value_from_meta_cf(false, LAST_MANUAL_COMPACT_USED_TIME, ts),
"get_value_from_meta_cf failed");
return dsn::ERR_OK;
}

void meta_store::set_last_manual_compact_used_time(uint64_t last_manual_compact_used_time) const
{
CHECK_EQ_PREFIX(
::dsn::ERR_OK,
set_value_to_meta_cf(LAST_MANUAL_COMPACT_USED_TIME, last_manual_compact_used_time));
}

dsn::error_code meta_store::get_last_flushed_decree(uint64_t *decree) const
{
LOG_AND_RETURN_NOT_OK(ERROR_PREFIX,
Expand Down
5 changes: 5 additions & 0 deletions src/base/meta_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,16 @@ class meta_store
rocksdb::ColumnFamilyHandle *meta_cf) const;
dsn::error_code get_data_version(uint32_t *version) const;
dsn::error_code get_last_manual_compact_finish_time(uint64_t *ts) const;
dsn::error_code get_last_manual_compact_used_time(uint64_t *ts) const;
std::string get_usage_scenario() const;

void set_last_flushed_decree(uint64_t decree) const;
void set_data_version(uint32_t version) const;
void set_last_manual_compact_finish_time(uint64_t last_manual_compact_finish_time) const;
void set_usage_scenario(const std::string &usage_scenario) const;

void set_last_manual_compact_used_time(uint64_t last_manual_compact_used_time) const;

private:
::dsn::error_code
get_value_from_meta_cf(bool read_flushed_data, const std::string &key, uint64_t *value) const;
Expand Down Expand Up @@ -92,6 +95,8 @@ class meta_store
static const std::string DATA_VERSION;
static const std::string LAST_FLUSHED_DECREE;
static const std::string LAST_MANUAL_COMPACT_FINISH_TIME;
static const std::string LAST_MANUAL_COMPACT_USED_TIME;

static const std::string ROCKSDB_ENV_USAGE_SCENARIO_NORMAL;
static const std::string ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE;
static const std::string ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD;
Expand Down
7 changes: 6 additions & 1 deletion src/server/pegasus_manual_compact_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ void pegasus_manual_compact_service::init_last_finish_time_ms(uint64_t last_fini
_manual_compact_last_finish_time_ms.store(last_finish_time_ms);
}

void pegasus_manual_compact_service::init_last_used_time_ms(uint64_t last_used_time_ms)
{
_manual_compact_last_time_used_ms.store(last_used_time_ms);
}

void pegasus_manual_compact_service::start_manual_compact_if_needed(
const std::map<std::string, std::string> &envs)
{
Expand Down Expand Up @@ -312,7 +317,7 @@ void pegasus_manual_compact_service::manual_compact(const rocksdb::CompactRangeO
}

uint64_t start = begin_manual_compact();
uint64_t finish = _app->do_manual_compact(options);
uint64_t finish = _app->do_manual_compact(options, start);
end_manual_compact(start, finish);
}

Expand Down
2 changes: 2 additions & 0 deletions src/server/pegasus_manual_compact_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class pegasus_manual_compact_service : public dsn::replication::replica_base

void init_last_finish_time_ms(uint64_t last_finish_time_ms);

void init_last_used_time_ms(uint64_t last_used_time_ms);

void start_manual_compact_if_needed(const std::map<std::string, std::string> &envs);

// Called by pegasus_manual_compaction.sh
Expand Down
29 changes: 24 additions & 5 deletions src/server/pegasus_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1758,14 +1758,22 @@ dsn::error_code pegasus_server_impl::start(int argc, char **argv)
dsn::ERR_LOCAL_APP_FAILURE,
"open app failed, unsupported data version {}",
_pegasus_data_version);
// update last manual compact finish timestamp
uint64_t last_manual_compact_used_time = 0;
LOG_AND_RETURN_NOT_OK(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When upgrade from old versions, ERR_OBJECT_NOT_FOUND will be returned, right?

ERROR_PREFIX,
_meta_store->get_last_manual_compact_used_time(&last_manual_compact_used_time),
"get_last_manual_compact_used_time failed");

// update last manual compact finish & used timestamp
_manual_compact_svc.init_last_finish_time_ms(last_manual_compact_finish_time);
_manual_compact_svc.init_last_used_time_ms(last_manual_compact_used_time);
cleanup.cancel();
} else {
// Write initial meta data to meta CF and flush when create new DB.
_meta_store->set_data_version(PEGASUS_DATA_VERSION_MAX);
_meta_store->set_last_flushed_decree(0);
_meta_store->set_last_manual_compact_finish_time(0);
_meta_store->set_last_manual_compact_used_time(0);
flush_all_family_columns(true);
}

Expand Down Expand Up @@ -3334,7 +3342,8 @@ ::dsn::error_code pegasus_server_impl::check_column_families(const std::string &
return ::dsn::ERR_OK;
}

uint64_t pegasus_server_impl::do_manual_compact(const rocksdb::CompactRangeOptions &options)
uint64_t pegasus_server_impl::do_manual_compact(const rocksdb::CompactRangeOptions &options,
uint64_t start)
{
// wait flush before compact to make all data compacted.
uint64_t start_time = dsn_now_ms();
Expand All @@ -3355,6 +3364,11 @@ uint64_t pegasus_server_impl::do_manual_compact(const rocksdb::CompactRangeOptio
status.ToString(),
end_time - start_time);
_meta_store->set_last_manual_compact_finish_time(end_time);
uint64_t last_manual_compact_finish_time = 0;
CHECK_OK_PREFIX(
_meta_store->get_last_manual_compact_finish_time(&last_manual_compact_finish_time));
after_manual_compact(start_time, last_manual_compact_finish_time);

// generate new checkpoint and remove old checkpoints, in order to release storage asap
if (!release_storage_after_manual_compact()) {
// it is possible that the new checkpoint is not generated, if there was no data
Expand All @@ -3375,12 +3389,17 @@ uint64_t pegasus_server_impl::do_manual_compact(const rocksdb::CompactRangeOptio
// update rocksdb statistics immediately
update_replica_rocksdb_statistics();

uint64_t last_manual_compact_finish_time = 0;
CHECK_OK_PREFIX(
_meta_store->get_last_manual_compact_finish_time(&last_manual_compact_finish_time));
return last_manual_compact_finish_time;
}

void pegasus_server_impl::after_manual_compact(std::uint64_t starttime, uint64_t endtime)
{
// store last manual compact used time to meta store for learn situation
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the replica server shutdown before the replicas complete the manual compaction, can this patch resolve this issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this patch will still work. Cause if replica server shutdown before the replicas complete the manual compaction,the last_manual_compact_finish_time would not be update. So it will start compaction again.

uint64_t used_time = endtime - starttime;
_meta_store->set_last_manual_compact_used_time(used_time);
flush_all_family_columns(true);
}

bool pegasus_server_impl::release_storage_after_manual_compact()
{
int64_t old_last_durable = last_durable_decree();
Expand Down
4 changes: 3 additions & 1 deletion src/server/pegasus_server_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,9 @@ class pegasus_server_impl : public pegasus_read_service
std::string compression_type_to_str(rocksdb::CompressionType type);

// return finish time recorded in rocksdb
uint64_t do_manual_compact(const rocksdb::CompactRangeOptions &options);
uint64_t do_manual_compact(const rocksdb::CompactRangeOptions &options, uint64_t start);

void after_manual_compact(uint64_t starttime, uint64_t endtime);

// generate new checkpoint and remove old checkpoints, in order to release storage asap
// return true if release succeed (new checkpointed generated).
Expand Down
Loading