Skip to content

Commit

Permalink
[feature](compaction) Add an http action for visibility of compaction…
Browse files Browse the repository at this point in the history
… score on each tablet (#38489) (#40826)

pick: #38489 

Usage:
1. `curl http://be_ip:be_host/api/compaction_score?top_n=10` Returns a
json object contains compaction score for top n, n=top_n.
```
[
    {
        "compaction_score": "5",
        "tablet_id": "42595"
    },
    {
        "compaction_score": "5",
        "tablet_id": "42587"
    },
    {
        "compaction_score": "5",
        "tablet_id": "42593"
    },
    {
        "compaction_score": "5",
        "tablet_id": "42597"
    },
    {
        "compaction_score": "5",
        "tablet_id": "42589"
    },
    {
        "compaction_score": "5",
        "tablet_id": "42599"
    },
    {
        "compaction_score": "5",
        "tablet_id": "42601"
    },
    {
        "compaction_score": "5",
        "tablet_id": "42591"
    },
    {
        "compaction_score": "5",
        "tablet_id": "42585"
    },
    {
        "compaction_score": "4",
        "tablet_id": "10034"
    }
]
```
If top_n is not specified, return all compaction score for all tablets.
If top_n is illegal, raise an error.
```
invalid argument: top_n=wrong
```

2. `curl http://be_ip:be_host/api/compaction_score?sync_meta=true`
`sync_meta` is only available on cloud mode, will sync meta from meta
service. It can cooperate with top_n.
If add param `sync_meta` on non-cloud mode, will raise an error.
```
sync meta is only available for cloud mode
```

3. In the future, this endpoint may extend other utility, like fetching
tablet compaction score by table id, etc.

## Proposed changes

Issue Number: close #xxx

<!--Describe your changes.-->
  • Loading branch information
TangSiyang2001 authored Sep 21, 2024
1 parent 9877a08 commit d1d52ae
Show file tree
Hide file tree
Showing 7 changed files with 298 additions and 0 deletions.
160 changes: 160 additions & 0 deletions be/src/http/action/compaction_score_action.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "http/action/compaction_score_action.h"

#include <gen_cpp/FrontendService_types.h>
#include <gen_cpp/Types_types.h>
#include <glog/logging.h>
#include <rapidjson/document.h>
#include <rapidjson/prettywriter.h>
#include <rapidjson/stringbuffer.h>

#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <exception>
#include <functional>
#include <iterator>
#include <limits>
#include <memory>
#include <span>
#include <stdexcept>
#include <string>
#include <string_view>
#include <vector>

#include "common/status.h"
#include "http/http_channel.h"
#include "http/http_handler_with_auth.h"
#include "http/http_headers.h"
#include "http/http_request.h"
#include "http/http_status.h"
#include "olap/tablet_fwd.h"
#include "olap/tablet_manager.h"
#include "util/stopwatch.hpp"

namespace doris {

const std::string TOP_N = "top_n";
const std::string COMPACTION_SCORE = "compaction_score";
constexpr size_t DEFAULT_TOP_N = std::numeric_limits<size_t>::max();
constexpr std::string_view TABLET_ID = "tablet_id";

template <typename T>
concept CompactionScoreAccessble = requires(T t) {
{ t.get_real_compaction_score() } -> std::same_as<uint32_t>;
};

template <CompactionScoreAccessble T>
std::vector<CompactionScoreResult> calculate_compaction_scores(
std::span<std::shared_ptr<T>> tablets) {
std::vector<CompactionScoreResult> result;
result.reserve(tablets.size());
std::ranges::transform(tablets, std::back_inserter(result),
[](const std::shared_ptr<T>& tablet) -> CompactionScoreResult {
return {.tablet_id = tablet->tablet_id(),
.compaction_score = tablet->get_real_compaction_score()};
});
return result;
}

struct LocalCompactionScoreAccessor final : CompactionScoresAccessor {
LocalCompactionScoreAccessor(TabletManager* tablet_mgr) : tablet_mgr(tablet_mgr) {}

std::vector<CompactionScoreResult> get_all_tablet_compaction_scores() override {
auto tablets = tablet_mgr->get_all_tablet();
std::span<TabletSharedPtr> s = {tablets.begin(), tablets.end()};
return calculate_compaction_scores(s);
}

TabletManager* tablet_mgr;
};

static rapidjson::Value jsonfy_tablet_compaction_score(
const CompactionScoreResult& result, rapidjson::MemoryPoolAllocator<>& allocator) {
rapidjson::Value node;
node.SetObject();

rapidjson::Value tablet_id_key;
tablet_id_key.SetString(TABLET_ID.data(), TABLET_ID.length(), allocator);
rapidjson::Value tablet_id_val;
auto tablet_id_str = std::to_string(result.tablet_id);
tablet_id_val.SetString(tablet_id_str.c_str(), tablet_id_str.length(), allocator);

rapidjson::Value score_key;
score_key.SetString(COMPACTION_SCORE.data(), COMPACTION_SCORE.size());
rapidjson::Value score_val;
auto score_str = std::to_string(result.compaction_score);
score_val.SetString(score_str.c_str(), score_str.length(), allocator);
node.AddMember(score_key, score_val, allocator);

node.AddMember(tablet_id_key, tablet_id_val, allocator);
return node;
}

CompactionScoreAction::CompactionScoreAction(ExecEnv* exec_env, TPrivilegeHier::type hier,
TPrivilegeType::type type, TabletManager* tablet_mgr)
: HttpHandlerWithAuth(exec_env, hier, type),
_accessor(std::make_unique<LocalCompactionScoreAccessor>(tablet_mgr)) {}

void CompactionScoreAction::handle(HttpRequest* req) {
req->add_output_header(HttpHeaders::CONTENT_TYPE, HttpHeaders::JsonType.data());
auto top_n_param = req->param(TOP_N);

size_t top_n = DEFAULT_TOP_N;
if (!top_n_param.empty()) {
try {
auto tmp_top_n = std::stoll(top_n_param);
if (tmp_top_n < 0) {
throw std::invalid_argument("`top_n` cannot less than 0");
}
top_n = tmp_top_n;
} catch (const std::exception& e) {
LOG(WARNING) << "convert failed:" << e.what();
auto msg = fmt::format("invalid argument: top_n={}", top_n_param);
HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, msg);
return;
}
}

std::string result;
if (auto st = _handle(top_n, &result); !st) {
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, st.to_json());
return;
}
HttpChannel::send_reply(req, HttpStatus::OK, result);
}

Status CompactionScoreAction::_handle(size_t top_n, std::string* result) {
auto scores = _accessor->get_all_tablet_compaction_scores();
top_n = std::min(top_n, scores.size());
std::partial_sort(scores.begin(), scores.begin() + top_n, scores.end(), std::greater<>());

rapidjson::Document root;
root.SetArray();
auto& allocator = root.GetAllocator();
std::for_each(scores.begin(), scores.begin() + top_n, [&](const auto& score) {
root.PushBack(jsonfy_tablet_compaction_score(score, allocator), allocator);
});
rapidjson::StringBuffer str_buf;
rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(str_buf);
root.Accept(writer);
*result = str_buf.GetString();
return Status::OK();
}

} // namespace doris
62 changes: 62 additions & 0 deletions be/src/http/action/compaction_score_action.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <gen_cpp/FrontendService_types.h>

#include <cstddef>
#include <memory>
#include <string>

#include "common/status.h"
#include "http/http_handler_with_auth.h"
#include "http/http_request.h"
#include "olap/storage_engine.h"
#include "runtime/exec_env.h"
namespace doris {

struct CompactionScoreResult {
int64_t tablet_id;
size_t compaction_score;
};

inline bool operator>(const CompactionScoreResult& lhs, const CompactionScoreResult& rhs) {
return lhs.compaction_score > rhs.compaction_score;
}

struct CompactionScoresAccessor {
virtual ~CompactionScoresAccessor() = default;

virtual std::vector<CompactionScoreResult> get_all_tablet_compaction_scores() = 0;
};

// topn, sync
class CompactionScoreAction : public HttpHandlerWithAuth {
public:
explicit CompactionScoreAction(ExecEnv* exec_env, TPrivilegeHier::type hier,
TPrivilegeType::type type, TabletManager* tablet_mgr);

void handle(HttpRequest* req) override;

private:
Status _handle(size_t top_n, std::string* result);

std::unique_ptr<CompactionScoresAccessor> _accessor;
};

} // namespace doris
8 changes: 8 additions & 0 deletions be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,12 @@ Status BaseTablet::update_by_least_common_schema(const TabletSchemaSPtr& update_
return Status::OK();
}

uint32_t BaseTablet::get_real_compaction_score() const {
const auto& rs_metas = _tablet_meta->all_rs_metas();
return std::accumulate(rs_metas.begin(), rs_metas.end(), 0,
[](uint32_t score, const RowsetMetaSharedPtr& rs_meta) {
return score + rs_meta->get_compaction_score();
});
}

} /* namespace doris */
4 changes: 4 additions & 0 deletions be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ class BaseTablet {

virtual size_t tablet_footprint() = 0;

// this method just return the compaction sum on each rowset
// note(tsy): we should unify the compaction score calculation finally
uint32_t get_real_compaction_score() const;

protected:
mutable std::shared_mutex _meta_lock;
const TabletMetaSharedPtr _tablet_meta;
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1147,6 +1147,9 @@ uint32_t Tablet::calc_cold_data_compaction_score() const {

uint32_t Tablet::_calc_cumulative_compaction_score(
std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy) {
if (cumulative_compaction_policy == nullptr) [[unlikely]] {
return 0;
}
#ifndef BE_TEST
if (_cumulative_compaction_policy == nullptr ||
_cumulative_compaction_policy->name() != cumulative_compaction_policy->name()) {
Expand Down
8 changes: 8 additions & 0 deletions be/src/service/http_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <event2/bufferevent.h>
#include <event2/http.h>
#include <gen_cpp/FrontendService_types.h>

#include <string>
#include <vector>
Expand All @@ -33,6 +34,7 @@
#include "http/action/checksum_action.h"
#include "http/action/clear_cache_action.h"
#include "http/action/compaction_action.h"
#include "http/action/compaction_score_action.h"
#include "http/action/config_action.h"
#include "http/action/debug_point_action.h"
#include "http/action/download_action.h"
Expand Down Expand Up @@ -258,6 +260,12 @@ Status HttpService::start() {
SnapshotAction* snapshot_action =
_pool.add(new SnapshotAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN));
_ev_http_server->register_handler(HttpMethod::GET, "/api/snapshot", snapshot_action);

CompactionScoreAction* compaction_score_action =
_pool.add(new CompactionScoreAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN,
_env->get_storage_engine()->tablet_manager()));
_ev_http_server->register_handler(HttpMethod::GET, "/api/compaction_score",
compaction_score_action);
#endif

// 2 compaction actions
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_compaction_score_action") {
def tableName = "test_compaction_score_action";

sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
id INT NOT NULL,
name STRING NOT NULL
) DUPLICATE KEY (`id`)
PROPERTIES ("replication_num" = "1", "disable_auto_compaction" = "true");
"""
for (i in 0..<30) {
sql """ INSERT INTO ${tableName} VALUES(1, "Vedal") """
sql """ INSERT INTO ${tableName} VALUES(2, "Neuro") """
sql """ INSERT INTO ${tableName} VALUES(3, "Evil") """
}

def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);

for (int i=0;i<backendId_to_backendIP.size();i++){
def beHttpAddress =backendId_to_backendIP.entrySet()[i].getValue()+":"+backendId_to_backendHttpPort.entrySet()[i].getValue()
if (isCloudMode()) {
def (code, text, err) = curl("GET",beHttpAddress+ "/api/compaction_score?top_n=1&sync_meta=true")
def score_str = parseJson(text).get(0).get("compaction_score")
def score = Integer.parseInt(score_str)
assertTrue(score >= 90)
} else {
def (code, text, err) = curl("GET",beHttpAddress+"/api/compaction_score?top_n=1")
def score_str = parseJson(text).get(0).get("compaction_score")
def score = Integer.parseInt(score_str)
assertTrue(score >= 90)
}
}
}

0 comments on commit d1d52ae

Please sign in to comment.