Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/yb/tools/yb-admin_cli.cc
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,24 @@ Status delete_index_by_id_action(
return Status::OK();
}

const auto clear_cache_args = "[<timeout_in_seconds>] (default 20)";
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Followed @arybochkin's comment on #26078 (comment)


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to add automated test into yb-admin_client-test.cc that will:

  • create table
  • put some data into it
  • flush table to disk (so we have SST files)
  • run scan query other the whole table to load SST blocks into block_cache
  • make sure block cache usage is above expected level
  • clear block cache
  • make sure block cache usage is zero

Status clear_cache_action(const ClusterAdminCli::CLIArguments& args, ClusterAdminClient* client) {
RETURN_NOT_OK(CheckArgumentsCount(args.size(), 0, 1));
std::optional<int> timeout_secs;
if (!args.empty()) {
int parsed_timeout = VERIFY_RESULT(CheckedStoi(args[0]));
if (parsed_timeout <= 0) {
return ClusterAdminCli::kInvalidArguments;
}
timeout_secs = parsed_timeout;
}
RETURN_NOT_OK_PREPEND(
client->ClearCache(MonoDelta::FromSeconds(timeout_secs.value_or(20))),
"Unable to clear cache");
return Status::OK();
}

YB_DEFINE_ENUM(FlushTableFlag, (ADD_INDEXES));

Result<pair<std::optional<int>, bool>> ParseFlushTableArgs(
Expand Down Expand Up @@ -2893,6 +2911,7 @@ void ClusterAdminCli::RegisterCommandHandlers() {
REGISTER_COMMAND(flush_table);
REGISTER_COMMAND(flush_table_by_id);
REGISTER_COMMAND(flush_sys_catalog);
REGISTER_COMMAND(clear_cache);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clear_block_cache should be better to avoid confusion with other caches

REGISTER_COMMAND(compact_sys_catalog);
REGISTER_COMMAND(compact_table);
REGISTER_COMMAND(compact_table_by_id);
Expand Down
55 changes: 55 additions & 0 deletions src/yb/tools/yb-admin_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2008,6 +2008,61 @@ Status ClusterAdminClient::FlushSysCatalog() {
return res.ok() ? Status::OK() : res.status();
}

Status ClusterAdminClient::ClearCache(MonoDelta timeout) {
cout << "Clearing block cache on all tablet servers..." << endl;
// Get all tablet servers in the cluster
RepeatedPtrField<master::ListTabletServersResponsePB::Entry> servers;
RETURN_NOT_OK(ListTabletServers(&servers));
if (servers.empty()) {
return STATUS(IllegalState, "No tablet servers found in cluster");
}
LOG(INFO) << "Found " << servers.size() << " tablet servers" << endl;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed internally that we shouldn't be using LOG(INFO) for yb-admin tool going forward


// Send ClearCache RPC to each tablet server
int successful_clears = 0;
for (const auto& server : servers) {
if (!server.has_alive() || !server.alive()) {
LOG(WARNING) << "Skipping dead tablet server " << server.instance_id().permanent_uuid();
continue;
}
if (!server.has_registration() ||
server.registration().common().private_rpc_addresses().empty()) {
LOG(WARNING) << "Skipping tablet server " << server.instance_id().permanent_uuid()
<< " with no RPC address";
continue;
}
auto ts_uuid = server.instance_id().permanent_uuid();
HostPort ts_addr = HostPortFromPB(server.registration().common().private_rpc_addresses(0));
tserver::ClearCacheRequestPB req;
tserver::ClearCacheResponsePB resp;
TabletServerAdminServiceProxy ts_proxy(proxy_cache_.get(), ts_addr);
LOG(INFO) << "Clearing cache on tablet server " << ts_uuid << " at " << ts_addr;
auto result = InvokeRpc(&TabletServerAdminServiceProxy::ClearCache, ts_proxy, req);
if (!result.ok()) {
LOG(WARNING) << "Failed to clear cache on tablet server " << ts_uuid << ": " << result.status();
continue;
}
resp = *result;
if (resp.has_error()) {
LOG(WARNING) << "Tablet server " << ts_uuid << " returned error: "
<< StatusFromPB(resp.error().status());
continue;
}
successful_clears++;
LOG(INFO) << "Cleared cache on tablet server " << ts_uuid;
if (resp.has_cache_capacity_bytes()) {
LOG(INFO) << " (capacity: " << resp.cache_capacity_bytes() << " bytes)";
}
LOG(INFO) << endl;
}
if (successful_clears == 0) {
return STATUS(RuntimeError, "Failed to clear cache on any tablet servers");
}
cout << "Successfully cleared cache on " << successful_clears << "/"
<< servers.size() << " tablet servers" << endl;
return Status::OK();
}

Status ClusterAdminClient::CompactSysCatalog() {
master::CompactSysCatalogRequestPB req;
auto res = InvokeRpc(
Expand Down
2 changes: 2 additions & 0 deletions src/yb/tools/yb-admin_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ class ClusterAdminClient {

Status FlushSysCatalog();

Status ClearCache(MonoDelta timeout = MonoDelta::FromSeconds(30));

Status CompactSysCatalog();

Status ModifyTablePlacementInfo(const client::YBTableName& table_name,
Expand Down
87 changes: 87 additions & 0 deletions src/yb/tserver/tablet_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@
#include "yb/tserver/ysql_advisory_lock_table.h"
#include "yb/tserver/ysql_lease.h"

#include "yb/rocksdb/db.h"
#include "yb/rocksdb/table.h"
#include "yb/rocksdb/table/block_based_table_factory.h"

#include "yb/util/async_util.h"
#include "yb/util/backoff_waiter.h"
#include "yb/util/callsite_profiling.h"
Expand Down Expand Up @@ -2556,6 +2560,89 @@ void TabletServiceAdminImpl::GetPgSocketDir(
context.RespondSuccess();
}

uint64_t TabletServiceAdminImpl::ClearRocksDbBlockCache(rocksdb::DB* db, const std::string& db_type, const std::string& tablet_id) {
if (!db) {
LOG(WARNING) << "Null " << db_type << " DB for tablet " << tablet_id;
return 0;
}

try {
const rocksdb::Options& options = db->GetOptions();
auto block_table_factory = std::dynamic_pointer_cast<rocksdb::BlockBasedTableFactory>(options.table_factory);

if (!block_table_factory) {
LOG(WARNING) << "No BlockBasedTableFactory found for " << db_type
<< " DB in tablet " << tablet_id;
return 0;
}

auto* block_cache = block_table_factory->table_options().block_cache.get();
if (!block_cache) {
LOG(WARNING) << "No block cache found for " << db_type
<< " DB in tablet " << tablet_id;
return 0;
}

// Setting the cache capacity to 0 forces the cache to evict all stored entries, effectively clearing its contents.
// Immediately restoring the capacity to its original value allows the cache to resume normal operation
// without permanently restricting its size.
auto original_capacity = block_cache->GetCapacity();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: const auto

block_cache->SetCapacity(0);
block_cache->SetCapacity(original_capacity);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inspired by

void PurgeBlockCache() {
auto* block_cache = table_factory_->table_options().block_cache.get();
auto capacity = block_cache->GetCapacity();
block_cache->SetCapacity(0);
block_cache->SetCapacity(capacity);
LOG(INFO) << "Purged block cache";
}


LOG(INFO) << "Purged " << db_type << " block cache for tablet " << tablet_id
<< " (capacity: " << original_capacity << " bytes)";

return original_capacity;

} catch (const std::exception& e) {
LOG(ERROR) << "Exception while clearing " << db_type << " cache for tablet "
<< tablet_id << ": " << e.what();
return 0;
}
}

void TabletServiceAdminImpl::ClearCache(
const ClearCacheRequestPB* req, ClearCacheResponsePB* resp, rpc::RpcContext context) {
LOG(INFO) << "Received ClearCache RPC request from " << context.requestor_string();

TabletPeers tablet_peers = server_->tablet_manager()->GetTabletPeers();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have to do that for every peers because RocksDB block cache is shared across all tablets:

tablet::TabletOptions tablet_options_;

LOG(INFO) << "Found " << tablet_peers.size() << " tablet peers on this tserver";

uint64_t total_cache_capacity = 0;
int successful_clears = 0;

for (const auto& tablet_peer : tablet_peers) {
auto tablet_ptr = tablet_peer->shared_tablet();
if (!tablet_ptr) {
LOG(WARNING) << "Failed to get tablet " << tablet_peer->tablet_id();
continue;
}

auto tablet = *tablet_ptr;
if (!tablet) {
LOG(WARNING) << "Null tablet for " << tablet_peer->tablet_id();
continue;
}

if (tablet->regular_db()) {
auto capacity = ClearRocksDbBlockCache(tablet->regular_db(), "regular", tablet_peer->tablet_id());
total_cache_capacity += capacity;
}

if (tablet->intents_db()) {
auto capacity = ClearRocksDbBlockCache(tablet->intents_db(), "intents", tablet_peer->tablet_id());
total_cache_capacity += capacity;
}

successful_clears++;
}

LOG(INFO) << "Successfully cleared cache on " << successful_clears << " tablets";
resp->set_cache_capacity_bytes(total_cache_capacity);
context.RespondSuccess();
}

bool EmptyWriteBatch(const docdb::KeyValueWriteBatchPB& write_batch) {
return write_batch.write_pairs().empty() && write_batch.apply_external_transactions().empty();
}
Expand Down
10 changes: 10 additions & 0 deletions src/yb/tserver/tablet_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@
#include "yb/tserver/tserver_admin.service.h"
#include "yb/tserver/tserver_service.service.h"

namespace rocksdb {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs for forward declaration

class DB;
}

namespace yb {
class Schema;
class Status;
Expand Down Expand Up @@ -375,6 +379,10 @@ class TabletServiceAdminImpl : public TabletServerAdminServiceIf {
const GetPgSocketDirRequestPB* req, GetPgSocketDirResponsePB* resp,
rpc::RpcContext context) override;

void ClearCache(const ClearCacheRequestPB* req,
ClearCacheResponsePB* resp,
rpc::RpcContext context) override;

private:
TabletServer* const server_;

Expand All @@ -388,6 +396,8 @@ class TabletServiceAdminImpl : public TabletServerAdminServiceIf {
Status DoEnableDbConns(
const EnableDbConnsRequestPB* req, EnableDbConnsResponsePB* resp);

uint64_t ClearRocksDbBlockCache(rocksdb::DB* db, const std::string& db_type, const std::string& tablet_id);

Status SetupCDCSDKRetention(
const tablet::ChangeMetadataRequestPB* req, ChangeMetadataResponsePB* resp,
const tablet::TabletPeerPtr& peer);
Expand Down
9 changes: 9 additions & 0 deletions src/yb/tserver/tserver_admin.proto
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,13 @@ message GetActiveRbsInfoResponsePB {
repeated RbsInfo rbs_infos = 2;
}

message ClearCacheRequestPB {}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you want to expand this command in the future (e.g. adding flag to decide if excluding system tables, targeting specific tablets, etc.) this is a good starting point


message ClearCacheResponsePB {
optional TabletServerErrorPB error = 1;
optional uint64 cache_capacity_bytes = 2;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this field ss more for logging purpose

}

service TabletServerAdminService {
// Create a new, empty tablet with the specified parameters. Only used for
// brand-new tablets, not for "moves".
Expand Down Expand Up @@ -466,4 +473,6 @@ service TabletServerAdminService {
rpc GetPgSocketDir(GetPgSocketDirRequestPB) returns (GetPgSocketDirResponsePB);

rpc GetActiveRbsInfo(GetActiveRbsInfoRequestPB) returns (GetActiveRbsInfoResponsePB);

rpc ClearCache(ClearCacheRequestPB) returns (ClearCacheResponsePB);
}