Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 99 additions & 4 deletions src/Checkpoint/CheckpointCoordinator_Checkpoint.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <Checkpoint/CheckpointCoordinator.h>
#include <Checkpoint/FileCheckpoint.h>
#include <Checkpoint/LocalFileSystemCheckpointStorage.h>
#include <Checkpoint/LogStoreCheckpointContext.h>


#include <Common/Stopwatch.h>
Expand All @@ -19,8 +20,27 @@ extern const int RECOVER_CHECKPOINT_FAILED;

void CheckpointCoordinator::preCheckpoint(DB::CheckpointContextPtr ckpt_ctx)
{
/// Always use local storage directly
ckpt_ctx->storage.preCheckpoint(ckpt_ctx);
if (!ckpt_ctx->request_ctx)
{
ckpt_ctx->storage.preCheckpoint(ckpt_ctx);
return;
}

const auto & settings = ckpt_ctx->request_ctx->settings;
/// If the checkpoint is async, pre-checkpoint the storage locally first.
if (settings->isAsync() && !ckpt_ctx->storage.isLocal())
{
/// Always checkpoints to local storage so that we can recover from local disk fastly
local_ckpt_storage->preCheckpoint(ckpt_ctx);

/// Metadata needs to be checkpointed to the target storage directly
if (ckpt_ctx->epoch.empty())
ckpt_ctx->storage.preCheckpoint(ckpt_ctx);
}
else
{
ckpt_ctx->storage.preCheckpoint(ckpt_ctx);
}
}

void CheckpointCoordinator::checkpoint(
Expand Down Expand Up @@ -48,7 +68,59 @@ void CheckpointCoordinator::checkpoint(UInt32 node_id, CheckpointPtr ckpt, Check

void CheckpointCoordinator::checkpoint(const String & key, CheckpointPtr ckpt, CheckpointContextPtr ckpt_ctx)
{
ckpt_ctx->storage.checkpoint(key, std::move(ckpt), ckpt_ctx);
if (!ckpt_ctx->request_ctx)
{
ckpt_ctx->storage.checkpoint(key, std::move(ckpt), ckpt_ctx);
return;
}

const auto & settings = ckpt_ctx->request_ctx->settings;

bool incremental = settings->isIncremental() && ckpt->supportsIncremental() && ckpt_ctx->epoch.epoch > 1;
if (auto logstore_ckpt_ctx = ckpt_ctx->tryGetExtra<LogStoreCheckpointContext>();
logstore_ckpt_ctx && logstore_ckpt_ctx->force_request_full_ckpt)
incremental = false;

/// If the checkpoint is async, we need to checkpoint to local storage first.
if (settings->isAsync() && !ckpt_ctx->storage.isLocal())
{
if (incremental)
{
auto prev_ckpt_ctx = ckpt_ctx->cloneWithEpoch(ckpt_ctx->request_ctx->last_epoch);
CheckpointPtr prev_ckpt;
if (local_ckpt_storage->exists(key, prev_ckpt_ctx))
prev_ckpt = local_ckpt_storage->recover(key, prev_ckpt_ctx);
else if (ckpt_ctx->storage.exists(key, prev_ckpt_ctx))
prev_ckpt = ckpt_ctx->storage.recover(key, prev_ckpt_ctx);

if (prev_ckpt)
ckpt->enableIncremental(std::move(prev_ckpt));
}

/// Always checkpoints to local storage so that we can recover from local disk fastly
local_ckpt_storage->checkpoint(key, std::move(ckpt), ckpt_ctx);

/// Metadata needs to be checkpointed to the target storage directly
if (ckpt_ctx->epoch.empty())
{
auto local_ckpt = local_ckpt_storage->recover(key, ckpt_ctx);
ckpt_ctx->storage.checkpoint(key, std::move(local_ckpt), ckpt_ctx);
}
}
else
{
if (incremental)
{
auto prev_ckpt_ctx = ckpt_ctx->cloneWithEpoch(ckpt_ctx->request_ctx->last_epoch);
if (ckpt_ctx->storage.exists(key, prev_ckpt_ctx))
{
auto prev_ckpt = ckpt_ctx->storage.recover(key, std::move(prev_ckpt_ctx));
ckpt->enableIncremental(std::move(prev_ckpt));
}
}

ckpt_ctx->storage.checkpoint(key, std::move(ckpt), ckpt_ctx);
}
}

void CheckpointCoordinator::checkpointed(VersionType /*version*/, UInt32 node_id, CheckpointContextPtr ckpt_ctx)
Expand All @@ -64,7 +136,30 @@ void CheckpointCoordinator::checkpointed(VersionType /*version*/, UInt32 node_id
/// Unregistered
return;

chassert(ckpt_ctx->epoch == iter->second->current_epoch);
if (ckpt_ctx->epoch != iter->second->current_epoch)
{
if (!iter->second->ack_nodes_readonly.contains(node_id))
{
LOG_WARNING(
logger,
"Ignored checkpoint ack from non-ack node: node_id={} ack_epoch={} current_epoch={} query_id={} ack_nodes_count={}",
node_id,
ckpt_ctx->epoch,
iter->second->current_epoch,
ckpt_ctx->qid,
iter->second->ack_nodes.size());
return;
}

throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Checkpoint ack epoch mismatch: node_id={} ack_epoch={} current_epoch={} query_id={} ack_nodes_count={}",
node_id,
ckpt_ctx->epoch,
iter->second->current_epoch,
ckpt_ctx->qid,
iter->second->ack_nodes.size());
}
chassert(ckpt_ctx->request_ctx);

if (iter->second->ack(node_id))
Expand Down
16 changes: 13 additions & 3 deletions src/Checkpoint/CheckpointSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ extern const int INVALID_SETTING_VALUE;

namespace
{
#if 0
bool parseBoolTextWord(const std::string & str, std::string_view setting_name)
{
if (str.empty())
Expand All @@ -39,7 +38,6 @@ bool parseBoolTextWord(const std::string & str, std::string_view setting_name)
throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "Invalid setting '{}': {}", setting_name, e.what());
}
}
#endif
}

void CheckpointSettings::serialize(VersionType, WriteBuffer & wb) const
Expand Down Expand Up @@ -68,7 +66,13 @@ void CheckpointSettings::deserialize(VersionType version, ReadBuffer & rb)

/// Repair raw settings
std::string replication_type_str = "local_file_system"; // Always local
raw_settings = fmt::format("type={};replication_type={};interval={};", "file", replication_type_str, interval);
raw_settings = fmt::format(
"type={};replication_type={};interval={};async={};incremental={}",
"file",
replication_type_str,
interval,
isAsync(),
isIncremental());
}
}

Expand Down Expand Up @@ -153,6 +157,12 @@ void CheckpointSettings::parseImpl()
if (auto iter = settings_map.find("interval"); iter != settings_map.end())
interval = std::stoul(iter->second);

if (auto iter = settings_map.find("async"); iter != settings_map.end())
strategy.async = parseBoolTextWord(iter->second, "async");

if (auto iter = settings_map.find("incremental"); iter != settings_map.end())
strategy.incremental = parseBoolTextWord(iter->second, "incremental");

/// Other unknown settings are ignored
}

Expand Down
Loading