diff --git a/src/Checkpoint/CheckpointCoordinator_Checkpoint.cpp b/src/Checkpoint/CheckpointCoordinator_Checkpoint.cpp index 8e748cadf5..8013e58be3 100644 --- a/src/Checkpoint/CheckpointCoordinator_Checkpoint.cpp +++ b/src/Checkpoint/CheckpointCoordinator_Checkpoint.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include @@ -19,8 +20,27 @@ extern const int RECOVER_CHECKPOINT_FAILED; void CheckpointCoordinator::preCheckpoint(DB::CheckpointContextPtr ckpt_ctx) { - /// Always use local storage directly - ckpt_ctx->storage.preCheckpoint(ckpt_ctx); + if (!ckpt_ctx->request_ctx) + { + ckpt_ctx->storage.preCheckpoint(ckpt_ctx); + return; + } + + const auto & settings = ckpt_ctx->request_ctx->settings; + /// If the checkpoint is async, pre-checkpoint the storage locally first. + if (settings->isAsync() && !ckpt_ctx->storage.isLocal()) + { + /// Always checkpoints to local storage so that we can recover from local disk fastly + local_ckpt_storage->preCheckpoint(ckpt_ctx); + + /// Metadata needs to be checkpointed to the target storage directly + if (ckpt_ctx->epoch.empty()) + ckpt_ctx->storage.preCheckpoint(ckpt_ctx); + } + else + { + ckpt_ctx->storage.preCheckpoint(ckpt_ctx); + } } void CheckpointCoordinator::checkpoint( @@ -48,7 +68,59 @@ void CheckpointCoordinator::checkpoint(UInt32 node_id, CheckpointPtr ckpt, Check void CheckpointCoordinator::checkpoint(const String & key, CheckpointPtr ckpt, CheckpointContextPtr ckpt_ctx) { - ckpt_ctx->storage.checkpoint(key, std::move(ckpt), ckpt_ctx); + if (!ckpt_ctx->request_ctx) + { + ckpt_ctx->storage.checkpoint(key, std::move(ckpt), ckpt_ctx); + return; + } + + const auto & settings = ckpt_ctx->request_ctx->settings; + + bool incremental = settings->isIncremental() && ckpt->supportsIncremental() && ckpt_ctx->epoch.epoch > 1; + if (auto logstore_ckpt_ctx = ckpt_ctx->tryGetExtra(); + logstore_ckpt_ctx && logstore_ckpt_ctx->force_request_full_ckpt) + incremental = false; + + /// If the checkpoint is async, we need to checkpoint to local storage first. + if (settings->isAsync() && !ckpt_ctx->storage.isLocal()) + { + if (incremental) + { + auto prev_ckpt_ctx = ckpt_ctx->cloneWithEpoch(ckpt_ctx->request_ctx->last_epoch); + CheckpointPtr prev_ckpt; + if (local_ckpt_storage->exists(key, prev_ckpt_ctx)) + prev_ckpt = local_ckpt_storage->recover(key, prev_ckpt_ctx); + else if (ckpt_ctx->storage.exists(key, prev_ckpt_ctx)) + prev_ckpt = ckpt_ctx->storage.recover(key, prev_ckpt_ctx); + + if (prev_ckpt) + ckpt->enableIncremental(std::move(prev_ckpt)); + } + + /// Always checkpoints to local storage so that we can recover from local disk fastly + local_ckpt_storage->checkpoint(key, std::move(ckpt), ckpt_ctx); + + /// Metadata needs to be checkpointed to the target storage directly + if (ckpt_ctx->epoch.empty()) + { + auto local_ckpt = local_ckpt_storage->recover(key, ckpt_ctx); + ckpt_ctx->storage.checkpoint(key, std::move(local_ckpt), ckpt_ctx); + } + } + else + { + if (incremental) + { + auto prev_ckpt_ctx = ckpt_ctx->cloneWithEpoch(ckpt_ctx->request_ctx->last_epoch); + if (ckpt_ctx->storage.exists(key, prev_ckpt_ctx)) + { + auto prev_ckpt = ckpt_ctx->storage.recover(key, std::move(prev_ckpt_ctx)); + ckpt->enableIncremental(std::move(prev_ckpt)); + } + } + + ckpt_ctx->storage.checkpoint(key, std::move(ckpt), ckpt_ctx); + } } void CheckpointCoordinator::checkpointed(VersionType /*version*/, UInt32 node_id, CheckpointContextPtr ckpt_ctx) @@ -64,7 +136,30 @@ void CheckpointCoordinator::checkpointed(VersionType /*version*/, UInt32 node_id /// Unregistered return; - chassert(ckpt_ctx->epoch == iter->second->current_epoch); + if (ckpt_ctx->epoch != iter->second->current_epoch) + { + if (!iter->second->ack_nodes_readonly.contains(node_id)) + { + LOG_WARNING( + logger, + "Ignored checkpoint ack from non-ack node: node_id={} ack_epoch={} current_epoch={} query_id={} ack_nodes_count={}", + node_id, + ckpt_ctx->epoch, + iter->second->current_epoch, + ckpt_ctx->qid, + iter->second->ack_nodes.size()); + return; + } + + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Checkpoint ack epoch mismatch: node_id={} ack_epoch={} current_epoch={} query_id={} ack_nodes_count={}", + node_id, + ckpt_ctx->epoch, + iter->second->current_epoch, + ckpt_ctx->qid, + iter->second->ack_nodes.size()); + } chassert(ckpt_ctx->request_ctx); if (iter->second->ack(node_id)) diff --git a/src/Checkpoint/CheckpointSettings.cpp b/src/Checkpoint/CheckpointSettings.cpp index 546123d324..794fb985bc 100644 --- a/src/Checkpoint/CheckpointSettings.cpp +++ b/src/Checkpoint/CheckpointSettings.cpp @@ -16,7 +16,6 @@ extern const int INVALID_SETTING_VALUE; namespace { -#if 0 bool parseBoolTextWord(const std::string & str, std::string_view setting_name) { if (str.empty()) @@ -39,7 +38,6 @@ bool parseBoolTextWord(const std::string & str, std::string_view setting_name) throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "Invalid setting '{}': {}", setting_name, e.what()); } } -#endif } void CheckpointSettings::serialize(VersionType, WriteBuffer & wb) const @@ -68,7 +66,13 @@ void CheckpointSettings::deserialize(VersionType version, ReadBuffer & rb) /// Repair raw settings std::string replication_type_str = "local_file_system"; // Always local - raw_settings = fmt::format("type={};replication_type={};interval={};", "file", replication_type_str, interval); + raw_settings = fmt::format( + "type={};replication_type={};interval={};async={};incremental={}", + "file", + replication_type_str, + interval, + isAsync(), + isIncremental()); } } @@ -153,6 +157,12 @@ void CheckpointSettings::parseImpl() if (auto iter = settings_map.find("interval"); iter != settings_map.end()) interval = std::stoul(iter->second); + if (auto iter = settings_map.find("async"); iter != settings_map.end()) + strategy.async = parseBoolTextWord(iter->second, "async"); + + if (auto iter = settings_map.find("incremental"); iter != settings_map.end()) + strategy.incremental = parseBoolTextWord(iter->second, "incremental"); + /// Other unknown settings are ignored }