From 8fd6026d6db501fd2d6459f27aac1feb62ae9faf Mon Sep 17 00:00:00 2001 From: gitccl Date: Tue, 25 Apr 2023 21:19:46 +0800 Subject: [PATCH] [Enhancement (compaction) support parallel compaction for single tablet --- be/src/http/action/compaction_action.cpp | 57 +++++- be/src/olap/base_compaction.cpp | 13 +- be/src/olap/compaction.cpp | 15 +- be/src/olap/compaction.h | 8 + be/src/olap/cumulative_compaction.cpp | 44 ++-- be/src/olap/cumulative_compaction.h | 3 +- be/src/olap/cumulative_compaction_policy.cpp | 202 ++++++++++++++----- be/src/olap/cumulative_compaction_policy.h | 11 +- be/src/olap/olap_common.h | 1 + be/src/olap/olap_server.cpp | 45 +++-- be/src/olap/storage_engine.h | 2 +- be/src/olap/tablet.cpp | 115 +++++++++-- be/src/olap/tablet.h | 70 ++++++- be/src/olap/tablet_manager.cpp | 7 +- be/src/olap/task/engine_clone_task.cpp | 3 +- 15 files changed, 447 insertions(+), 149 deletions(-) diff --git a/be/src/http/action/compaction_action.cpp b/be/src/http/action/compaction_action.cpp index 96d450c5990488..1cfd27f2e939a4 100644 --- a/be/src/http/action/compaction_action.cpp +++ b/be/src/http/action/compaction_action.cpp @@ -155,8 +155,8 @@ Status CompactionAction::_handle_run_status_compaction(HttpRequest* req, std::st { // use try lock to check this tablet is running cumulative compaction - std::unique_lock lock_cumulative(tablet->get_cumulative_compaction_lock(), - std::try_to_lock); + std::unique_lock lock_cumulative(tablet->get_cumulative_compaction_lock(), + std::try_to_lock); if (!lock_cumulative.owns_lock()) { msg = "compaction task for this tablet is running"; compaction_type = "cumulative"; @@ -199,8 +199,30 @@ Status CompactionAction::_execute_compaction_callback(TabletSharedPtr tablet, } Status res = Status::OK(); if (compaction_type == PARAM_COMPACTION_BASE) { - BaseCompaction base_compaction(tablet); - res = base_compaction.compact(); + std::shared_ptr base_compaction; + bool need_reset = false; + { + std::unique_lock compaction_meta_lock(tablet->get_compaction_meta_lock()); + if (tablet->get_base_compaction() != nullptr) { + LOG(INFO) << "another base compaction is running, tablet=" << tablet->full_name(); + res = Status::Error(); + } else { + StorageEngine::instance()->create_base_compaction(tablet, base_compaction); + Status res = base_compaction->prepare_compact(); + if (res.ok()) { + tablet->set_base_compaction(base_compaction); + need_reset = true; + } + } + } + + if (res.ok()) { + res = base_compaction->execute_compact(); + } + if (need_reset) { + tablet->reset_compaction(BASE_COMPACTION, base_compaction); + } + if (!res) { if (res.is()) { // Ignore this error code. @@ -208,22 +230,39 @@ Status CompactionAction::_execute_compaction_callback(TabletSharedPtr tablet, << tablet->full_name(); } else { DorisMetrics::instance()->base_compaction_request_failed->increment(1); - LOG(WARNING) << "failed to init base compaction. res=" << res + LOG(WARNING) << "failed to do base compaction. res=" << res << ", tablet=" << tablet->full_name(); } } } else if (compaction_type == PARAM_COMPACTION_CUMULATIVE) { - CumulativeCompaction cumulative_compaction(tablet); - res = cumulative_compaction.compact(); + std::shared_ptr cumu_compaction; + StorageEngine::instance()->create_cumulative_compaction(tablet, cumu_compaction); + bool need_reset = false; + { + std::unique_lock compaction_meta_lock(tablet->get_compaction_meta_lock()); + Status res = cumu_compaction->prepare_compact(); + if (res.ok()) { + tablet->add_cumulative_compaction_unlocked(cumu_compaction); + need_reset = true; + } + } + + if (res.ok()) { + res = cumu_compaction->execute_compact(); + } + if (need_reset) { + tablet->reset_compaction(CUMULATIVE_COMPACTION, cumu_compaction); + } + if (!res) { - if (res.is()) { + if (res.is()) { // Ignore this error code. VLOG_NOTICE << "failed to init cumulative compaction due to no suitable version," << "tablet=" << tablet->full_name(); } else { DorisMetrics::instance()->cumulative_compaction_request_failed->increment(1); LOG(WARNING) << "failed to do cumulative compaction. res=" << res - << ", table=" << tablet->full_name(); + << ", tablet=" << tablet->full_name(); } } } diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp index c1fcbe3f339739..5346af84cebabf 100644 --- a/be/src/olap/base_compaction.cpp +++ b/be/src/olap/base_compaction.cpp @@ -47,18 +47,10 @@ Status BaseCompaction::prepare_compact() { return Status::Error(); } - std::unique_lock lock(_tablet->get_base_compaction_lock(), std::try_to_lock); - if (!lock.owns_lock()) { - LOG(WARNING) << "another base compaction is running. tablet=" << _tablet->full_name(); - return Status::Error(); - } - TRACE("got base compaction lock"); - // 1. pick rowsets to compact RETURN_NOT_OK(pick_rowsets_to_compact()); TRACE("rowsets picked"); TRACE_COUNTER_INCREMENT("input_rowsets_count", _input_rowsets.size()); - _tablet->set_clone_occurred(false); return Status::OK(); } @@ -71,15 +63,14 @@ Status BaseCompaction::execute_compact_impl() { #endif std::unique_lock lock(_tablet->get_base_compaction_lock(), std::try_to_lock); if (!lock.owns_lock()) { - LOG(WARNING) << "another base compaction is running. tablet=" << _tablet->full_name(); + LOG(INFO) << "The tablet is under clone, tablet=" << _tablet->full_name(); return Status::Error(); } TRACE("got base compaction lock"); // Clone task may happen after compaction task is submitted to thread pool, and rowsets picked // for compaction may change. In this case, current compaction task should not be executed. - if (_tablet->get_clone_occurred()) { - _tablet->set_clone_occurred(false); + if (get_clone_occurred()) { return Status::Error(); } diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 71470fc60ec9dd..fb153be3f53b6a 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -64,7 +64,8 @@ Compaction::Compaction(const TabletSharedPtr& tablet, const std::string& label) _input_row_num(0), _input_num_segments(0), _input_index_size(0), - _state(CompactionState::INITED) { + _state(CompactionState::INITED), + _is_clone_occurred(false) { _mem_tracker = std::make_shared(MemTrackerLimiter::Type::COMPACTION, label); } @@ -84,6 +85,18 @@ Status Compaction::execute_compact() { return st; } +const std::vector& Compaction::get_input_rowsets() const { + return _input_rowsets; +} + +void Compaction::set_clone_occurred() { + _is_clone_occurred = true; +} + +bool Compaction::get_clone_occurred() { + return _is_clone_occurred; +} + Status Compaction::do_compaction(int64_t permits) { TRACE("start to do compaction"); uint32_t checksum_before; diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h index 17acb45710fc24..38bc98b7a28689 100644 --- a/be/src/olap/compaction.h +++ b/be/src/olap/compaction.h @@ -57,6 +57,12 @@ class Compaction { virtual Status prepare_compact() = 0; Status execute_compact(); virtual Status execute_compact_impl() = 0; + + const std::vector& get_input_rowsets() const; + + void set_clone_occurred(); + bool get_clone_occurred(); + #ifdef BE_TEST void set_input_rowset(const std::vector& rowsets); RowsetSharedPtr output_rowset(); @@ -115,6 +121,8 @@ class Compaction { RowIdConversion _rowid_conversion; TabletSchemaSPtr _cur_tablet_schema; + std::atomic _is_clone_occurred; + DISALLOW_COPY_AND_ASSIGN(Compaction); }; diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index 303b58eefe258c..301e5772be5112 100644 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -47,13 +47,6 @@ Status CumulativeCompaction::prepare_compact() { return Status::Error(); } - std::unique_lock lock(_tablet->get_cumulative_compaction_lock(), std::try_to_lock); - if (!lock.owns_lock()) { - LOG(INFO) << "The tablet is under cumulative compaction. tablet=" << _tablet->full_name(); - return Status::Error(); - } - TRACE("got cumulative compaction lock"); - // 1. calculate cumulative point _tablet->calculate_cumulative_point(); TRACE("calculated cumulative point"); @@ -64,23 +57,21 @@ Status CumulativeCompaction::prepare_compact() { RETURN_NOT_OK(pick_rowsets_to_compact()); TRACE("rowsets picked"); TRACE_COUNTER_INCREMENT("input_rowsets_count", _input_rowsets.size()); - _tablet->set_clone_occurred(false); return Status::OK(); } Status CumulativeCompaction::execute_compact_impl() { - std::unique_lock lock(_tablet->get_cumulative_compaction_lock(), std::try_to_lock); + std::shared_lock lock(_tablet->get_cumulative_compaction_lock(), std::try_to_lock); if (!lock.owns_lock()) { - LOG(INFO) << "The tablet is under cumulative compaction. tablet=" << _tablet->full_name(); + LOG(INFO) << "The tablet is under clone, tablet=" << _tablet->full_name(); return Status::Error(); } TRACE("got cumulative compaction lock"); // Clone task may happen after compaction task is submitted to thread pool, and rowsets picked // for compaction may change. In this case, current compaction task should not be executed. - if (_tablet->get_clone_occurred()) { - _tablet->set_clone_occurred(false); + if (get_clone_occurred()) { return Status::Error(); } @@ -95,8 +86,12 @@ Status CumulativeCompaction::execute_compact_impl() { _state = CompactionState::SUCCESS; // 5. set cumulative point - _tablet->cumulative_compaction_policy()->update_cumulative_point( - _tablet.get(), _input_rowsets, _output_rowset, _last_delete_version); + { + std::lock_guard compact_meta_lock(_tablet->get_compaction_meta_lock()); + _tablet->cumulative_compaction_policy()->update_cumulative_point( + _tablet.get(), _input_rowsets, _output_rowset, _last_delete_version); + } + VLOG_CRITICAL << "after cumulative compaction, current cumulative point is " << _tablet->cumulative_layer_point() << ", tablet=" << _tablet->full_name(); @@ -114,27 +109,24 @@ Status CumulativeCompaction::pick_rowsets_to_compact() { return Status::Error(); } - // candidate_rowsets may not be continuous - // So we need to choose the longest continuous path from it. - std::vector missing_versions; - RETURN_NOT_OK(find_longest_consecutive_version(&candidate_rowsets, &missing_versions)); - if (!missing_versions.empty()) { - DCHECK(missing_versions.size() == 2); - LOG(WARNING) << "There are missed versions among rowsets. " - << "prev rowset verison=" << missing_versions[0] - << ", next rowset version=" << missing_versions[1] - << ", tablet=" << _tablet->full_name(); - } - size_t compaction_score = 0; _tablet->cumulative_compaction_policy()->pick_input_rowsets( _tablet.get(), candidate_rowsets, config::cumulative_compaction_max_deltas, config::cumulative_compaction_min_deltas, &_input_rowsets, &_last_delete_version, &compaction_score); + if (candidate_rowsets.empty()) { + return Status::Error(); + } + // Cumulative compaction will process with at least 1 rowset. // So when there is no rowset being chosen, we should return Status::Error(): if (_input_rowsets.empty()) { + Version begin_version = candidate_rowsets[0]->version(); + if (begin_version.first != _tablet->cumulative_layer_point()) { + return Status::Error(); + } + if (_last_delete_version.first != -1) { // we meet a delete version, should increase the cumulative point to let base compaction handle the delete version. // plus 1 to skip the delete version. diff --git a/be/src/olap/cumulative_compaction.h b/be/src/olap/cumulative_compaction.h index d74542a2ffe070..e2b1a4d038d65b 100644 --- a/be/src/olap/cumulative_compaction.h +++ b/be/src/olap/cumulative_compaction.h @@ -36,10 +36,11 @@ class CumulativeCompaction : public Compaction { CumulativeCompaction(const TabletSharedPtr& tablet); ~CumulativeCompaction() override; + // caller should hold compaction_meta_lock when call this function Status prepare_compact() override; Status execute_compact_impl() override; - std::vector get_input_rowsets() { return _input_rowsets; } + const std::vector& get_input_rowsets() { return _input_rowsets; } protected: Status pick_rowsets_to_compact() override; diff --git a/be/src/olap/cumulative_compaction_policy.cpp b/be/src/olap/cumulative_compaction_policy.cpp index d5d86ad14249b0..ffea1dbedb2fc5 100644 --- a/be/src/olap/cumulative_compaction_policy.cpp +++ b/be/src/olap/cumulative_compaction_policy.cpp @@ -141,33 +141,152 @@ void SizeBasedCumulativeCompactionPolicy::update_cumulative_point( // if tablet under alter process, do not update cumulative point return; } - // if rowsets have delete version, move to the last directly - if (last_delete_version.first != -1) { - tablet->set_cumulative_layer_point(output_rowset->end_version() + 1); + + int64_t promotion_size = tablet->cumulative_promotion_size(); + auto can_forward = [=](const RowsetMetaSharedPtr& rs_meta) { + return rs_meta->has_delete_predicate() || (!rs_meta->is_segments_overlapping() && + rs_meta->total_disk_size() >= promotion_size); + }; + + auto rowsets = tablet->pick_candidate_rowsets_to_cumulative_compaction(); + int64_t new_point = tablet->cumulative_layer_point(); + // first, forward cumulative_point if has delete predicate or size exceeded promotion_size + for (auto& rs : rowsets) { + if (rs->start_version() == new_point && can_forward(rs->rowset_meta())) { + new_point = rs->end_version() + 1; + } else { + break; + } + } + + bool need_forward_cumulative_point = false; + if (new_point == output_rowset->start_version()) { + if (last_delete_version.first != -1) { + new_point = output_rowset->end_version() + 1; + need_forward_cumulative_point = true; + } else if (output_rowset->rowset_meta()->total_disk_size() >= promotion_size) { + new_point = output_rowset->end_version() + 1; + need_forward_cumulative_point = true; + } + } + + if (need_forward_cumulative_point) { + for (auto& rs : rowsets) { + if (rs->start_version() < new_point) { + continue; + } + + if (rs->start_version() > new_point) { + break; + } + + if (can_forward(rs->rowset_meta())) { + new_point = rs->end_version() + 1; + } + } + } + + if (new_point > tablet->cumulative_layer_point()) { + tablet->set_cumulative_layer_point(new_point); + LOG(INFO) << "successfully forward cumulative_point to " << new_point + << ", output_rowset=" << output_rowset->version() + << ", tablet=" << tablet->full_name(); } else { - // if rowsets have no delete version, check output_rowset total disk size - // satisfies promotion size. - size_t total_size = output_rowset->rowset_meta()->total_disk_size(); - if (total_size >= tablet->cumulative_promotion_size()) { - tablet->set_cumulative_layer_point(output_rowset->end_version() + 1); + LOG(INFO) << "cannot forward cumulative_point, current point=" + << tablet->cumulative_layer_point() + << ", out size=" << output_rowset->rowset_meta()->total_disk_size() + << ", promotion_size=" << promotion_size + << ", output_rowset=" << output_rowset->version() + << ", tablet=" << tablet->full_name(); + } +} + +uint32_t SizeBasedCumulativeCompactionPolicy::_calc_max_score( + const std::vector& rowsets, int64_t promotion_size, + std::vector* max_score_rowsets) const { + uint32_t max_score = 0; + size_t rowsets_sz = rowsets.size(); + for (size_t idx = 0; idx < rowsets_sz;) { + // firstly skip the rowsets whose size exceeds promoto_size + for (; idx < rowsets_sz; ++idx) { + if (rowsets[idx]->is_segments_overlapping() || + rowsets[idx]->rowset_meta()->total_disk_size() < promotion_size) { + break; + } + } + + if (idx >= rowsets_sz) { + break; + } + + // get successive version + std::vector rowset_to_compact; + { + rowset_to_compact.push_back(rowsets[idx]); + size_t inner_idx = idx + 1; + for (; inner_idx < rowsets_sz; ++inner_idx) { + // break if the version isn't successive + if (rowsets[inner_idx]->start_version() != + rowset_to_compact.back()->end_version() + 1) { + break; + } + rowset_to_compact.push_back(rowsets[inner_idx]); + } + idx = inner_idx; + } + + // calcuate compaction score of the successive rowsets + uint32_t score = 0; + int64_t total_size = 0; + for (auto& rs : rowset_to_compact) { + total_size += rs->rowset_meta()->total_disk_size(); + score += rs->rowset_meta()->get_compaction_score(); + } + + // pruning + if (score <= max_score) { + continue; + } + + if (total_size < promotion_size) { + // calculate the rowsets to do cumulative compaction + // eg: size of rowset_to_compact are: + // 128, 16, 16, 16 + // we will choose [16,16,16] to compact. + for (auto& rs : rowset_to_compact) { + int current_level = _level_size(rs->rowset_meta()->total_disk_size()); + int remain_level = _level_size(total_size - rs->rowset_meta()->total_disk_size()); + // if current level less then remain level, score contains current rowset + // and process return; otherwise, score does not contains current rowset. + if (current_level <= remain_level) { + break; + } + total_size -= rs->rowset_meta()->total_disk_size(); + score -= rs->rowset_meta()->get_compaction_score(); + } + } + + if (score > max_score) { + max_score = score; + if (max_score_rowsets != nullptr) { + *max_score_rowsets = std::move(rowset_to_compact); + } } } + + return max_score; } uint32_t SizeBasedCumulativeCompactionPolicy::calc_cumulative_compaction_score(Tablet* tablet) { - uint32_t score = 0; bool base_rowset_exist = false; - const int64_t point = tablet->cumulative_layer_point(); int64_t promotion_size = 0; - std::vector rowset_to_compact; - int64_t total_size = 0; - RowsetMetaSharedPtr first_meta; int64_t first_version = INT64_MAX; - // NOTE: tablet._meta_lock is hold + // NOTE: tablet._meta_lock is held auto& rs_metas = tablet->tablet_meta()->all_rs_metas(); - // check the base rowset and collect the rowsets of cumulative part + + // check the base rowset for (auto& rs_meta : rs_metas) { if (rs_meta->start_version() < first_version) { first_version = rs_meta->start_version(); @@ -177,15 +296,6 @@ uint32_t SizeBasedCumulativeCompactionPolicy::calc_cumulative_compaction_score(T if (rs_meta->start_version() == 0) { base_rowset_exist = true; } - if (rs_meta->end_version() < point || !rs_meta->is_local()) { - // all_rs_metas() is not sorted, so we use _continue_ other than _break_ here. - continue; - } else { - // collect the rowsets of cumulative part - total_size += rs_meta->total_disk_size(); - score += rs_meta->get_compaction_score(); - rowset_to_compact.push_back(rs_meta); - } } if (first_meta == nullptr) { @@ -203,38 +313,30 @@ uint32_t SizeBasedCumulativeCompactionPolicy::calc_cumulative_compaction_score(T return 0; } - // if total_size is greater than promotion_size, return total score - if (total_size >= promotion_size) { - return score; - } - - // sort the rowsets of cumulative part - std::sort(rowset_to_compact.begin(), rowset_to_compact.end(), RowsetMeta::comparator); - - // calculate the rowsets to do cumulative compaction - // eg: size of rowset_to_compact are: - // 128, 16, 16, 16 - // we will choose [16,16,16] to compact. - for (auto& rs_meta : rowset_to_compact) { - int current_level = _level_size(rs_meta->total_disk_size()); - int remain_level = _level_size(total_size - rs_meta->total_disk_size()); - // if current level less then remain level, score contains current rowset - // and process return; otherwise, score does not contains current rowset. - if (current_level <= remain_level) { - return score; - } - total_size -= rs_meta->total_disk_size(); - score -= rs_meta->get_compaction_score(); - } - return score; + auto rowsets = tablet->pick_rowsets_not_in_compaction(); + return _calc_max_score(rowsets, promotion_size); } int SizeBasedCumulativeCompactionPolicy::pick_input_rowsets( - Tablet* tablet, const std::vector& candidate_rowsets, + Tablet* tablet, std::vector& candidate_rowsets, const int64_t max_compaction_score, const int64_t min_compaction_score, std::vector* input_rowsets, Version* last_delete_version, size_t* compaction_score) { size_t promotion_size = tablet->cumulative_promotion_size(); + { + // candidate_rowsets may not be continuous + // we need to choose the successive rowsets with max score. + std::vector rowsets_to_compact; + _calc_max_score(candidate_rowsets, promotion_size, &rowsets_to_compact); + + // change candidate_rowsets because the caller will use it + candidate_rowsets = std::move(rowsets_to_compact); + + if (candidate_rowsets.empty()) { + return 0; + } + } + auto max_version = tablet->max_version().first; int transient_size = 0; *compaction_score = 0; @@ -335,7 +437,7 @@ int SizeBasedCumulativeCompactionPolicy::pick_input_rowsets( return transient_size; } -int64_t SizeBasedCumulativeCompactionPolicy::_level_size(const int64_t size) { +int64_t SizeBasedCumulativeCompactionPolicy::_level_size(const int64_t size) const { if (size < 1024) return 0; int64_t max_level = (int64_t)1 << (sizeof(_promotion_size) * 8 - 1 - __builtin_clzl(_promotion_size / 2)); diff --git a/be/src/olap/cumulative_compaction_policy.h b/be/src/olap/cumulative_compaction_policy.h index a12d1d8e8b447e..cc0aff148555a2 100644 --- a/be/src/olap/cumulative_compaction_policy.h +++ b/be/src/olap/cumulative_compaction_policy.h @@ -66,8 +66,7 @@ class CumulativeCompactionPolicy { /// return input_rowsets, the vector container as return /// return last_delete_version, if has delete rowset, record the delete version from input_rowsets /// return compaction_score, calculate the compaction score of picked input rowset - virtual int pick_input_rowsets(Tablet* tablet, - const std::vector& candidate_rowsets, + virtual int pick_input_rowsets(Tablet* tablet, std::vector& candidate_rowsets, const int64_t max_compaction_score, const int64_t min_compaction_score, std::vector* input_rowsets, @@ -130,7 +129,7 @@ class SizeBasedCumulativeCompactionPolicy final : public CumulativeCompactionPol /// Its main policy is picking rowsets from candidate rowsets by comparing accumulative compaction_score, /// max_cumulative_compaction_num_singleton_deltas or checking whether there is delete version rowset, /// and choose those rowset in the same level to do cumulative compaction. - int pick_input_rowsets(Tablet* tablet, const std::vector& candidate_rowsets, + int pick_input_rowsets(Tablet* tablet, std::vector& candidate_rowsets, const int64_t max_compaction_score, const int64_t min_compaction_score, std::vector* input_rowsets, Version* last_delete_version, size_t* compaction_score) override; @@ -155,11 +154,15 @@ class SizeBasedCumulativeCompactionPolicy final : public CumulativeCompactionPol /// calculate the disk size belong to which level, the level is divide by power of 2 /// between compaction_promotion_size_mbytes and 1KB - int64_t _level_size(const int64_t size); + int64_t _level_size(const int64_t size) const; /// when policy calculate cumulative_compaction_score, update promotion size at the same time void _refresh_tablet_promotion_size(Tablet* tablet, int64_t promotion_size); + // calculate max compaction score of rowsets, and set max_score_rowsets to successive rowsets with max score + uint32_t _calc_max_score(const std::vector& rowsets, int64_t promotion_size, + std::vector* max_score_rowsets = nullptr) const; + private: /// cumulative compaction promotion size, unit is byte. int64_t _promotion_size; diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h index ad81578a6ac8da..bd0903bad5b0bb 100644 --- a/be/src/olap/olap_common.h +++ b/be/src/olap/olap_common.h @@ -462,5 +462,6 @@ struct HashOfRowsetId { }; using RowsetIdUnorderedSet = std::unordered_set; +using VersionUnorderedSet = std::unordered_set; } // namespace doris diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 3f21adb229d705..cb4ff49d699396 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -443,7 +443,7 @@ void StorageEngine::_compaction_tasks_producer_callback() { #endif LOG(INFO) << "try to start compaction producer process!"; - std::unordered_set tablet_submitted_cumu; + std::unordered_multiset tablet_submitted_cumu; std::unordered_set tablet_submitted_base; std::vector data_dirs; for (auto& tmp_store : _store_map) { @@ -544,13 +544,15 @@ std::vector StorageEngine::_generate_compaction_tasks( // Copy _tablet_submitted_xxx_compaction map so that we don't need to hold _tablet_submitted_compaction_mutex // when traversing the data dir - std::map> copied_cumu_map; + std::map> copied_cumu_map; std::map> copied_base_map; { std::unique_lock lock(_tablet_submitted_compaction_mutex); copied_cumu_map = _tablet_submitted_cumu_compaction; copied_base_map = _tablet_submitted_base_compaction; } + + std::unordered_set empty_cumu_map; for (auto data_dir : data_dirs) { bool need_pick_tablet = true; // We need to reserve at least one Slot for cumulative compaction. @@ -586,7 +588,7 @@ std::vector StorageEngine::_generate_compaction_tasks( TabletSharedPtr tablet = _tablet_manager->find_best_tablet_to_compaction( compaction_type, data_dir, compaction_type == CompactionType::CUMULATIVE_COMPACTION - ? copied_cumu_map[data_dir] + ? empty_cumu_map : copied_base_map[data_dir], &disk_max_score, _cumulative_compaction_policy); if (tablet != nullptr) { @@ -630,9 +632,7 @@ bool StorageEngine::_push_tablet_into_submitted_compaction(TabletSharedPtr table bool already_existed = false; switch (compaction_type) { case CompactionType::CUMULATIVE_COMPACTION: - already_existed = !(_tablet_submitted_cumu_compaction[tablet->data_dir()] - .insert(tablet->tablet_id()) - .second); + _tablet_submitted_cumu_compaction[tablet->data_dir()].insert(tablet->tablet_id()); break; default: already_existed = !(_tablet_submitted_base_compaction[tablet->data_dir()] @@ -648,9 +648,14 @@ void StorageEngine::_pop_tablet_from_submitted_compaction(TabletSharedPtr tablet std::unique_lock lock(_tablet_submitted_compaction_mutex); int removed = 0; switch (compaction_type) { - case CompactionType::CUMULATIVE_COMPACTION: - removed = _tablet_submitted_cumu_compaction[tablet->data_dir()].erase(tablet->tablet_id()); + case CompactionType::CUMULATIVE_COMPACTION: { + auto it = _tablet_submitted_cumu_compaction[tablet->data_dir()].find(tablet->tablet_id()); + if (it != _tablet_submitted_cumu_compaction[tablet->data_dir()].end()) { + _tablet_submitted_cumu_compaction[tablet->data_dir()].erase(it); + removed = 1; + } break; + } default: removed = _tablet_submitted_base_compaction[tablet->data_dir()].erase(tablet->tablet_id()); break; @@ -672,23 +677,27 @@ Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet, tablet->tablet_id(), compaction_type); } int64_t permits = 0; - Status st = tablet->prepare_compaction_and_calculate_permits(compaction_type, tablet, &permits); + + std::shared_ptr compaction_task; + Status st = tablet->prepare_compaction_and_calculate_permits(compaction_type, tablet, &permits, + &compaction_task); if (st.ok() && permits > 0 && _permit_limiter.request(permits)) { std::unique_ptr& thread_pool = (compaction_type == CompactionType::CUMULATIVE_COMPACTION) ? _cumu_compaction_thread_pool : _base_compaction_thread_pool; - auto st = thread_pool->submit_func([tablet, compaction_type, permits, this]() { - tablet->execute_compaction(compaction_type); - _permit_limiter.release(permits); - // reset compaction - tablet->reset_compaction(compaction_type); - _pop_tablet_from_submitted_compaction(tablet, compaction_type); - }); + auto st = thread_pool->submit_func( + [tablet, compaction_type, permits, this, compaction_task]() { + tablet->execute_compaction(compaction_type, compaction_task); + _permit_limiter.release(permits); + // reset compaction + tablet->reset_compaction(compaction_type, compaction_task); + _pop_tablet_from_submitted_compaction(tablet, compaction_type); + }); if (!st.ok()) { _permit_limiter.release(permits); // reset compaction - tablet->reset_compaction(compaction_type); + tablet->reset_compaction(compaction_type, compaction_task); _pop_tablet_from_submitted_compaction(tablet, compaction_type); return Status::InternalError( "failed to submit compaction task to thread pool, " @@ -698,7 +707,7 @@ Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet, return Status::OK(); } else { // reset compaction - tablet->reset_compaction(compaction_type); + tablet->reset_compaction(compaction_type, compaction_task); _pop_tablet_from_submitted_compaction(tablet, compaction_type); if (!st.ok()) { return Status::InternalError( diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 481353ec0dcf7f..7f1285230d904f 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -387,7 +387,7 @@ class StorageEngine { std::mutex _tablet_submitted_compaction_mutex; // a tablet can do base and cumulative compaction at same time - std::map> _tablet_submitted_cumu_compaction; + std::map> _tablet_submitted_cumu_compaction; std::map> _tablet_submitted_base_compaction; std::atomic _wakeup_producer_flag {0}; diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 96311c4c77dede..238523a36c9ed3 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -51,7 +51,7 @@ #include #include #include -#include +#include #include #include #include @@ -989,6 +989,8 @@ bool Tablet::can_do_compaction(size_t path_hash, CompactionType compaction_type) uint32_t Tablet::calc_compaction_score( CompactionType compaction_type, std::shared_ptr cumulative_compaction_policy) { + // Need _compaction_meta_lock, because it will iterator _cumulative_compactions. + std::lock_guard compaction_meta_lock(_compaction_meta_lock); // Need meta lock, because it will iterator "all_rs_metas" of tablet meta. std::shared_lock rdlock(_meta_lock); if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) { @@ -1258,10 +1260,14 @@ std::vector Tablet::pick_candidate_rowsets_to_cumulative_compac if (_cumulative_point == K_INVALID_CUMULATIVE_POINT) { return candidate_rowsets; } + + VersionUnorderedSet version_in_compact; + get_in_compacted_rowsets(&version_in_compact); { std::shared_lock rlock(_meta_lock); for (const auto& [version, rs] : _rs_version_map) { - if (version.first >= _cumulative_point && rs->is_local()) { + if (version.first >= _cumulative_point && rs->is_local() && + version_in_compact.find(rs->version()) == version_in_compact.end()) { candidate_rowsets.push_back(rs); } } @@ -1270,6 +1276,24 @@ std::vector Tablet::pick_candidate_rowsets_to_cumulative_compac return candidate_rowsets; } +std::vector Tablet::pick_rowsets_not_in_compaction() const { + std::vector candidate_rowsets; + if (_cumulative_point == K_INVALID_CUMULATIVE_POINT) { + return candidate_rowsets; + } + VersionUnorderedSet version_in_compact; + get_in_compacted_rowsets(&version_in_compact); + + for (const auto& [version, rs] : _rs_version_map) { + if (version.first >= _cumulative_point && rs->is_local() && + version_in_compact.find(rs->version()) == version_in_compact.end()) { + candidate_rowsets.push_back(rs); + } + } + std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator); + return candidate_rowsets; +} + std::vector Tablet::pick_candidate_rowsets_to_base_compaction() { std::vector candidate_rowsets; { @@ -1596,8 +1620,8 @@ void Tablet::generate_tablet_meta_copy_unlocked(TabletMetaSharedPtr new_tablet_m } Status Tablet::prepare_compaction_and_calculate_permits(CompactionType compaction_type, - TabletSharedPtr tablet, int64_t* permits) { - std::vector compaction_rowsets; + TabletSharedPtr tablet, int64_t* permits, + std::shared_ptr* compaction) { if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) { scoped_refptr trace(new Trace); MonotonicStopWatch watch; @@ -1610,9 +1634,15 @@ Status Tablet::prepare_compaction_and_calculate_permits(CompactionType compactio ADOPT_TRACE(trace.get()); TRACE("create cumulative compaction"); - StorageEngine::instance()->create_cumulative_compaction(tablet, _cumulative_compaction); + std::shared_ptr cumu_compaction; + StorageEngine::instance()->create_cumulative_compaction(tablet, cumu_compaction); DorisMetrics::instance()->cumulative_compaction_request_total->increment(1); - Status res = _cumulative_compaction->prepare_compact(); + + // choose rowsets to compact and add compact task to _cumulative_compactions should be an atomic operation + std::unique_lock compaction_meta_lock(_compaction_meta_lock); + TRACE("got compaction meta lock"); + + Status res = cumu_compaction->prepare_compact(); if (!res.ok()) { set_last_cumu_compaction_failure_time(UnixMillis()); *permits = 0; @@ -1625,7 +1655,8 @@ Status Tablet::prepare_compaction_and_calculate_permits(CompactionType compactio // And because we set permits to 0, so even if we return OK here, nothing will be done. return Status::OK(); } - compaction_rowsets = _cumulative_compaction->get_input_rowsets(); + add_cumulative_compaction_unlocked(cumu_compaction); + *compaction = std::move(cumu_compaction); } else { DCHECK_EQ(compaction_type, CompactionType::BASE_COMPACTION); scoped_refptr trace(new Trace); @@ -1638,10 +1669,21 @@ Status Tablet::prepare_compaction_and_calculate_permits(CompactionType compactio }); ADOPT_TRACE(trace.get()); + std::unique_lock compaction_meta_lock(_compaction_meta_lock); + TRACE("got compaction meta lock"); + + if (get_base_compaction() != nullptr) { + *permits = 0; + LOG(WARNING) << "another base compaction is running, tablet=" << full_name(); + return Status::OK(); + } + TRACE("create base compaction"); - StorageEngine::instance()->create_base_compaction(tablet, _base_compaction); + std::shared_ptr base_compaction; + StorageEngine::instance()->create_base_compaction(tablet, base_compaction); DorisMetrics::instance()->base_compaction_request_total->increment(1); - Status res = _base_compaction->prepare_compact(); + + Status res = base_compaction->prepare_compact(); if (!res.ok()) { set_last_base_compaction_failure_time(UnixMillis()); *permits = 0; @@ -1654,16 +1696,30 @@ Status Tablet::prepare_compaction_and_calculate_permits(CompactionType compactio // And because we set permits to 0, so even if we return OK here, nothing will be done. return Status::OK(); } - compaction_rowsets = _base_compaction->get_input_rowsets(); + set_base_compaction(base_compaction); + *compaction = std::move(base_compaction); } + *permits = 0; + auto& compaction_rowsets = compaction->get()->get_input_rowsets(); for (auto rowset : compaction_rowsets) { *permits += rowset->rowset_meta()->get_compaction_score(); } + auto type_str = + compaction_type == CompactionType::CUMULATIVE_COMPACTION ? "Cumulative" : "Base"; + std::stringstream ss; + ss << "successfully prepare compact, type=" << type_str << ", permits=" << *permits + << ", tablet=" << full_name() << ", input_rowsets={ "; + for (auto rowset : compaction_rowsets) { + ss << rowset->version() << ", "; + } + ss << "}"; + LOG(INFO) << ss.str(); return Status::OK(); } -void Tablet::execute_compaction(CompactionType compaction_type) { +void Tablet::execute_compaction(CompactionType compaction_type, + const std::shared_ptr& compaction) { if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) { scoped_refptr trace(new Trace); MonotonicStopWatch watch; @@ -1677,7 +1733,7 @@ void Tablet::execute_compaction(CompactionType compaction_type) { ADOPT_TRACE(trace.get()); TRACE("execute cumulative compaction"); - Status res = _cumulative_compaction->execute_compact(); + Status res = compaction->execute_compact(); if (!res.ok()) { set_last_cumu_compaction_failure_time(UnixMillis()); DorisMetrics::instance()->cumulative_compaction_request_failed->increment(1); @@ -1712,10 +1768,17 @@ void Tablet::execute_compaction(CompactionType compaction_type) { } } -void Tablet::reset_compaction(CompactionType compaction_type) { +void Tablet::reset_compaction(CompactionType compaction_type, + const std::shared_ptr& compaction) { + if (compaction == nullptr) { + return; + } + std::unique_lock cumu_compaction_meta_lock(_compaction_meta_lock); if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) { - _cumulative_compaction.reset(); + remove_cumulative_compaction_unlocked( + std::dynamic_pointer_cast(compaction)); } else { + CHECK(compaction == _base_compaction); _base_compaction.reset(); } } @@ -2885,4 +2948,28 @@ bool Tablet::should_skip_compaction(CompactionType compaction_type, int64_t now) return false; } +void Tablet::get_in_compacted_rowsets(VersionUnorderedSet* rowset_versions) const { + for (auto& cumulative_compact : _cumulative_compactions) { + const auto& input_rowset = cumulative_compact->get_input_rowsets(); + for (auto& rs : input_rowset) { + rowset_versions->insert(rs->version()); + } + + // output_version may be added to the rowsets before + // the compaction task was removed from _cumulative_compactions, + // to prevent premature use of this version, we mark output_version as in compacted + rowset_versions->insert( + Version(input_rowset.front()->start_version(), input_rowset.back()->end_version())); + } +} + +void Tablet::set_clone_occurred() { + for (auto& cumu_compaction : _cumulative_compactions) { + cumu_compaction->set_clone_occurred(); + } + if (_base_compaction != nullptr) { + _base_compaction->set_clone_occurred(); + } +} + } // namespace doris diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 1fa6d1bc7fe61d..26c98955ca0167 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -33,6 +33,7 @@ #include #include #include +#include #include #include @@ -57,6 +58,7 @@ namespace doris { class Tablet; class CumulativeCompactionPolicy; +class Compaction; class CumulativeCompaction; class BaseCompaction; class RowsetWriter; @@ -192,7 +194,8 @@ class Tablet : public BaseTablet { std::mutex& get_rowset_update_lock() { return _rowset_update_lock; } std::mutex& get_push_lock() { return _ingest_lock; } std::mutex& get_base_compaction_lock() { return _base_compaction_lock; } - std::mutex& get_cumulative_compaction_lock() { return _cumulative_compaction_lock; } + std::shared_mutex& get_cumulative_compaction_lock() { return _cumulative_compaction_lock; } + std::mutex& get_compaction_meta_lock() { return _compaction_meta_lock; } std::shared_mutex& get_migration_lock() { return _migration_lock; } @@ -247,6 +250,9 @@ class Tablet : public BaseTablet { std::vector pick_candidate_rowsets_to_cumulative_compaction(); std::vector pick_candidate_rowsets_to_base_compaction(); + // similar to pick_candidate_rowsets_to_cumulative_compaction, but does not acquire _meta_lock, + // which means that caller should hold _meta_lock + std::vector pick_rowsets_not_in_compaction() const; void calculate_cumulative_point(); // TODO(ygl): @@ -276,13 +282,13 @@ class Tablet : public BaseTablet { // return a json string to show the compaction status of this tablet void get_compaction_status(std::string* json_result); - Status prepare_compaction_and_calculate_permits(CompactionType compaction_type, - TabletSharedPtr tablet, int64_t* permits); - void execute_compaction(CompactionType compaction_type); - void reset_compaction(CompactionType compaction_type); - - void set_clone_occurred(bool clone_occurred) { _is_clone_occurred = clone_occurred; } - bool get_clone_occurred() { return _is_clone_occurred; } + Status prepare_compaction_and_calculate_permits( + CompactionType compaction_type, TabletSharedPtr tablet, int64_t* permits, + std::shared_ptr* cumulative_compaction); + void execute_compaction(CompactionType compaction_type, + const std::shared_ptr& compaction); + void reset_compaction(CompactionType compaction_type, + const std::shared_ptr& compaction); void set_cumulative_compaction_policy( std::shared_ptr cumulative_compaction_policy) { @@ -456,6 +462,25 @@ class Tablet : public BaseTablet { return config::max_tablet_io_errors > 0 && _io_error_times >= config::max_tablet_io_errors; } + //////////////////////////////////////////////////////////////////////////// + // following functions are protected by _compaction_meta_lock + //////////////////////////////////////////////////////////////////////////// + void get_in_compacted_rowsets(VersionUnorderedSet* rowset_versions) const; + + void add_cumulative_compaction_unlocked( + const std::shared_ptr& cumu_compaction); + void remove_cumulative_compaction_unlocked( + const std::shared_ptr& cumu_compaction); + + const std::shared_ptr& get_base_compaction() const; + void set_base_compaction(const std::shared_ptr& base_compaction); + + void set_clone_occurred(); + + //////////////////////////////////////////////////////////////////////////// + // end + //////////////////////////////////////////////////////////////////////////// + private: Status _init_once_action(); void _print_missed_versions(const std::vector& missed_versions) const; @@ -515,9 +540,10 @@ class Tablet : public BaseTablet { std::shared_mutex _meta_store_lock; std::mutex _ingest_lock; std::mutex _base_compaction_lock; - std::mutex _cumulative_compaction_lock; + std::shared_mutex _cumulative_compaction_lock; std::mutex _schema_change_lock; std::shared_mutex _migration_lock; + std::mutex _compaction_meta_lock; // TODO(lingbin): There is a _meta_lock TabletMeta too, there should be a comment to // explain how these two locks work together. @@ -558,7 +584,8 @@ class Tablet : public BaseTablet { std::shared_ptr _cumulative_compaction_policy; std::string _cumulative_compaction_type; - std::shared_ptr _cumulative_compaction; + // protected by _compaction_meta_lock + std::vector> _cumulative_compactions; std::shared_ptr _base_compaction; // whether clone task occurred during the tablet is in thread pool queue to wait for compaction std::atomic _is_clone_occurred; @@ -598,6 +625,29 @@ class Tablet : public BaseTablet { std::atomic publised_count = 0; }; +inline void Tablet::add_cumulative_compaction_unlocked( + const std::shared_ptr& cumu_compaction) { + _cumulative_compactions.push_back(cumu_compaction); +} + +inline void Tablet::remove_cumulative_compaction_unlocked( + const std::shared_ptr& cumu_compaction) { + for (auto it = _cumulative_compactions.begin(); it != _cumulative_compactions.end(); ++it) { + if ((*it) == cumu_compaction) { + _cumulative_compactions.erase(it); + break; + } + } +} + +inline const std::shared_ptr& Tablet::get_base_compaction() const { + return _base_compaction; +} + +inline void Tablet::set_base_compaction(const std::shared_ptr& base_compaction) { + _base_compaction = base_compaction; +} + inline CumulativeCompactionPolicy* Tablet::cumulative_compaction_policy() { return _cumulative_compaction_policy.get(); } diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index 94986cdcfb2990..6b192f67872390 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -712,10 +712,11 @@ TabletSharedPtr TabletManager::find_best_tablet_to_compaction( continue; } } else { - std::unique_lock lock(tablet_ptr->get_cumulative_compaction_lock(), - std::try_to_lock); + std::shared_lock lock(tablet_ptr->get_cumulative_compaction_lock(), + std::try_to_lock); if (!lock.owns_lock()) { - LOG(INFO) << "can not get cumu lock: " << tablet_ptr->tablet_id(); + // the tablet is under clone + LOG(INFO) << "can not get shared cumu lock: " << tablet_ptr->tablet_id(); continue; } } diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp index dfb18ee37632f6..93c19cac46555a 100644 --- a/be/src/olap/task/engine_clone_task.cpp +++ b/be/src/olap/task/engine_clone_task.cpp @@ -559,7 +559,8 @@ Status EngineCloneTask::_finish_clone(Tablet* tablet, const std::string& clone_d std::lock_guard base_compaction_lock(tablet->get_base_compaction_lock()); std::lock_guard cumulative_compaction_lock(tablet->get_cumulative_compaction_lock()); std::lock_guard cold_compaction_lock(tablet->get_cold_compaction_lock()); - tablet->set_clone_occurred(true); + std::lock_guard compact_meta_lock(tablet->get_compaction_meta_lock()); + tablet->set_clone_occurred(); std::lock_guard push_lock(tablet->get_push_lock()); std::lock_guard rwlock(tablet->get_rowset_update_lock()); std::lock_guard wrlock(tablet->get_header_lock());