Skip to content

Commit

Permalink
[Enhancement (compaction) support parallel compaction for single tablet
Browse files Browse the repository at this point in the history
  • Loading branch information
gitccl committed Apr 26, 2023
1 parent 925efc1 commit 8fd6026
Show file tree
Hide file tree
Showing 15 changed files with 447 additions and 149 deletions.
57 changes: 48 additions & 9 deletions be/src/http/action/compaction_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ Status CompactionAction::_handle_run_status_compaction(HttpRequest* req, std::st

{
// use try lock to check this tablet is running cumulative compaction
std::unique_lock<std::mutex> lock_cumulative(tablet->get_cumulative_compaction_lock(),
std::try_to_lock);
std::unique_lock lock_cumulative(tablet->get_cumulative_compaction_lock(),
std::try_to_lock);
if (!lock_cumulative.owns_lock()) {
msg = "compaction task for this tablet is running";
compaction_type = "cumulative";
Expand Down Expand Up @@ -199,31 +199,70 @@ Status CompactionAction::_execute_compaction_callback(TabletSharedPtr tablet,
}
Status res = Status::OK();
if (compaction_type == PARAM_COMPACTION_BASE) {
BaseCompaction base_compaction(tablet);
res = base_compaction.compact();
std::shared_ptr<BaseCompaction> base_compaction;
bool need_reset = false;
{
std::unique_lock compaction_meta_lock(tablet->get_compaction_meta_lock());
if (tablet->get_base_compaction() != nullptr) {
LOG(INFO) << "another base compaction is running, tablet=" << tablet->full_name();
res = Status::Error<TRY_LOCK_FAILED>();
} else {
StorageEngine::instance()->create_base_compaction(tablet, base_compaction);
Status res = base_compaction->prepare_compact();
if (res.ok()) {
tablet->set_base_compaction(base_compaction);
need_reset = true;
}
}
}

if (res.ok()) {
res = base_compaction->execute_compact();
}
if (need_reset) {
tablet->reset_compaction(BASE_COMPACTION, base_compaction);
}

if (!res) {
if (res.is<BE_NO_SUITABLE_VERSION>()) {
// Ignore this error code.
VLOG_NOTICE << "failed to init base compaction due to no suitable version, tablet="
<< tablet->full_name();
} else {
DorisMetrics::instance()->base_compaction_request_failed->increment(1);
LOG(WARNING) << "failed to init base compaction. res=" << res
LOG(WARNING) << "failed to do base compaction. res=" << res
<< ", tablet=" << tablet->full_name();
}
}
} else if (compaction_type == PARAM_COMPACTION_CUMULATIVE) {
CumulativeCompaction cumulative_compaction(tablet);
res = cumulative_compaction.compact();
std::shared_ptr<CumulativeCompaction> cumu_compaction;
StorageEngine::instance()->create_cumulative_compaction(tablet, cumu_compaction);
bool need_reset = false;
{
std::unique_lock compaction_meta_lock(tablet->get_compaction_meta_lock());
Status res = cumu_compaction->prepare_compact();
if (res.ok()) {
tablet->add_cumulative_compaction_unlocked(cumu_compaction);
need_reset = true;
}
}

if (res.ok()) {
res = cumu_compaction->execute_compact();
}
if (need_reset) {
tablet->reset_compaction(CUMULATIVE_COMPACTION, cumu_compaction);
}

if (!res) {
if (res.is<CUMULATIVE_NO_SUITABLE_VERSION>()) {
if (res.is<BE_NO_SUITABLE_VERSION>()) {
// Ignore this error code.
VLOG_NOTICE << "failed to init cumulative compaction due to no suitable version,"
<< "tablet=" << tablet->full_name();
} else {
DorisMetrics::instance()->cumulative_compaction_request_failed->increment(1);
LOG(WARNING) << "failed to do cumulative compaction. res=" << res
<< ", table=" << tablet->full_name();
<< ", tablet=" << tablet->full_name();
}
}
}
Expand Down
13 changes: 2 additions & 11 deletions be/src/olap/base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,10 @@ Status BaseCompaction::prepare_compact() {
return Status::Error<INVALID_ARGUMENT>();
}

std::unique_lock<std::mutex> lock(_tablet->get_base_compaction_lock(), std::try_to_lock);
if (!lock.owns_lock()) {
LOG(WARNING) << "another base compaction is running. tablet=" << _tablet->full_name();
return Status::Error<TRY_LOCK_FAILED>();
}
TRACE("got base compaction lock");

// 1. pick rowsets to compact
RETURN_NOT_OK(pick_rowsets_to_compact());
TRACE("rowsets picked");
TRACE_COUNTER_INCREMENT("input_rowsets_count", _input_rowsets.size());
_tablet->set_clone_occurred(false);

return Status::OK();
}
Expand All @@ -71,15 +63,14 @@ Status BaseCompaction::execute_compact_impl() {
#endif
std::unique_lock<std::mutex> lock(_tablet->get_base_compaction_lock(), std::try_to_lock);
if (!lock.owns_lock()) {
LOG(WARNING) << "another base compaction is running. tablet=" << _tablet->full_name();
LOG(INFO) << "The tablet is under clone, tablet=" << _tablet->full_name();
return Status::Error<TRY_LOCK_FAILED>();
}
TRACE("got base compaction lock");

// Clone task may happen after compaction task is submitted to thread pool, and rowsets picked
// for compaction may change. In this case, current compaction task should not be executed.
if (_tablet->get_clone_occurred()) {
_tablet->set_clone_occurred(false);
if (get_clone_occurred()) {
return Status::Error<BE_CLONE_OCCURRED>();
}

Expand Down
15 changes: 14 additions & 1 deletion be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ Compaction::Compaction(const TabletSharedPtr& tablet, const std::string& label)
_input_row_num(0),
_input_num_segments(0),
_input_index_size(0),
_state(CompactionState::INITED) {
_state(CompactionState::INITED),
_is_clone_occurred(false) {
_mem_tracker = std::make_shared<MemTrackerLimiter>(MemTrackerLimiter::Type::COMPACTION, label);
}

Expand All @@ -84,6 +85,18 @@ Status Compaction::execute_compact() {
return st;
}

const std::vector<RowsetSharedPtr>& Compaction::get_input_rowsets() const {
return _input_rowsets;
}

void Compaction::set_clone_occurred() {
_is_clone_occurred = true;
}

bool Compaction::get_clone_occurred() {
return _is_clone_occurred;
}

Status Compaction::do_compaction(int64_t permits) {
TRACE("start to do compaction");
uint32_t checksum_before;
Expand Down
8 changes: 8 additions & 0 deletions be/src/olap/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ class Compaction {
virtual Status prepare_compact() = 0;
Status execute_compact();
virtual Status execute_compact_impl() = 0;

const std::vector<RowsetSharedPtr>& get_input_rowsets() const;

void set_clone_occurred();
bool get_clone_occurred();

#ifdef BE_TEST
void set_input_rowset(const std::vector<RowsetSharedPtr>& rowsets);
RowsetSharedPtr output_rowset();
Expand Down Expand Up @@ -115,6 +121,8 @@ class Compaction {
RowIdConversion _rowid_conversion;
TabletSchemaSPtr _cur_tablet_schema;

std::atomic<bool> _is_clone_occurred;

DISALLOW_COPY_AND_ASSIGN(Compaction);
};

Expand Down
44 changes: 18 additions & 26 deletions be/src/olap/cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,6 @@ Status CumulativeCompaction::prepare_compact() {
return Status::Error<CUMULATIVE_INVALID_PARAMETERS>();
}

std::unique_lock<std::mutex> lock(_tablet->get_cumulative_compaction_lock(), std::try_to_lock);
if (!lock.owns_lock()) {
LOG(INFO) << "The tablet is under cumulative compaction. tablet=" << _tablet->full_name();
return Status::Error<TRY_LOCK_FAILED>();
}
TRACE("got cumulative compaction lock");

// 1. calculate cumulative point
_tablet->calculate_cumulative_point();
TRACE("calculated cumulative point");
Expand All @@ -64,23 +57,21 @@ Status CumulativeCompaction::prepare_compact() {
RETURN_NOT_OK(pick_rowsets_to_compact());
TRACE("rowsets picked");
TRACE_COUNTER_INCREMENT("input_rowsets_count", _input_rowsets.size());
_tablet->set_clone_occurred(false);

return Status::OK();
}

Status CumulativeCompaction::execute_compact_impl() {
std::unique_lock<std::mutex> lock(_tablet->get_cumulative_compaction_lock(), std::try_to_lock);
std::shared_lock lock(_tablet->get_cumulative_compaction_lock(), std::try_to_lock);
if (!lock.owns_lock()) {
LOG(INFO) << "The tablet is under cumulative compaction. tablet=" << _tablet->full_name();
LOG(INFO) << "The tablet is under clone, tablet=" << _tablet->full_name();
return Status::Error<TRY_LOCK_FAILED>();
}
TRACE("got cumulative compaction lock");

// Clone task may happen after compaction task is submitted to thread pool, and rowsets picked
// for compaction may change. In this case, current compaction task should not be executed.
if (_tablet->get_clone_occurred()) {
_tablet->set_clone_occurred(false);
if (get_clone_occurred()) {
return Status::Error<CUMULATIVE_CLONE_OCCURRED>();
}

Expand All @@ -95,8 +86,12 @@ Status CumulativeCompaction::execute_compact_impl() {
_state = CompactionState::SUCCESS;

// 5. set cumulative point
_tablet->cumulative_compaction_policy()->update_cumulative_point(
_tablet.get(), _input_rowsets, _output_rowset, _last_delete_version);
{
std::lock_guard compact_meta_lock(_tablet->get_compaction_meta_lock());
_tablet->cumulative_compaction_policy()->update_cumulative_point(
_tablet.get(), _input_rowsets, _output_rowset, _last_delete_version);
}

VLOG_CRITICAL << "after cumulative compaction, current cumulative point is "
<< _tablet->cumulative_layer_point() << ", tablet=" << _tablet->full_name();

Expand All @@ -114,27 +109,24 @@ Status CumulativeCompaction::pick_rowsets_to_compact() {
return Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>();
}

// candidate_rowsets may not be continuous
// So we need to choose the longest continuous path from it.
std::vector<Version> missing_versions;
RETURN_NOT_OK(find_longest_consecutive_version(&candidate_rowsets, &missing_versions));
if (!missing_versions.empty()) {
DCHECK(missing_versions.size() == 2);
LOG(WARNING) << "There are missed versions among rowsets. "
<< "prev rowset verison=" << missing_versions[0]
<< ", next rowset version=" << missing_versions[1]
<< ", tablet=" << _tablet->full_name();
}

size_t compaction_score = 0;
_tablet->cumulative_compaction_policy()->pick_input_rowsets(
_tablet.get(), candidate_rowsets, config::cumulative_compaction_max_deltas,
config::cumulative_compaction_min_deltas, &_input_rowsets, &_last_delete_version,
&compaction_score);

if (candidate_rowsets.empty()) {
return Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>();
}

// Cumulative compaction will process with at least 1 rowset.
// So when there is no rowset being chosen, we should return Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>():
if (_input_rowsets.empty()) {
Version begin_version = candidate_rowsets[0]->version();
if (begin_version.first != _tablet->cumulative_layer_point()) {
return Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>();
}

if (_last_delete_version.first != -1) {
// we meet a delete version, should increase the cumulative point to let base compaction handle the delete version.
// plus 1 to skip the delete version.
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/cumulative_compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ class CumulativeCompaction : public Compaction {
CumulativeCompaction(const TabletSharedPtr& tablet);
~CumulativeCompaction() override;

// caller should hold compaction_meta_lock when call this function
Status prepare_compact() override;
Status execute_compact_impl() override;

std::vector<RowsetSharedPtr> get_input_rowsets() { return _input_rowsets; }
const std::vector<RowsetSharedPtr>& get_input_rowsets() { return _input_rowsets; }

protected:
Status pick_rowsets_to_compact() override;
Expand Down
Loading

0 comments on commit 8fd6026

Please sign in to comment.