diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp index 41e60b5e264639..c7a86d19905ffa 100644 --- a/be/src/cloud/cloud_meta_mgr.cpp +++ b/be/src/cloud/cloud_meta_mgr.cpp @@ -40,6 +40,7 @@ #include "cloud/cloud_tablet.h" #include "cloud/config.h" #include "cloud/pb_convert.h" +#include "cloud/schema_cloud_dictionary_cache.h" #include "common/config.h" #include "common/logging.h" #include "common/status.h" @@ -342,6 +343,8 @@ static std::string debug_info(const Request& req) { req.tablet_id(), req.lock_id()); } else if constexpr (is_any_v) { return fmt::format(" tablet_id={}", req.tablet_id()); + } else if constexpr (is_any_v) { + return fmt::format(" index_id={}", req.index_id()); } else { static_assert(!sizeof(Request)); } @@ -473,10 +476,10 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_ req.set_cumulative_point(tablet->cumulative_layer_point()); } req.set_end_version(-1); - // backend side use schema dict - if (config::variant_use_cloud_schema_dict) { - req.set_schema_op(GetRowsetRequest::RETURN_DICT); - } + // backend side use schema dict in cache if enable cloud schema dict cache + req.set_schema_op(config::variant_use_cloud_schema_dict_cache + ? GetRowsetRequest::NO_DICT + : GetRowsetRequest::RETURN_DICT); VLOG_DEBUG << "send GetRowsetRequest: " << req.ShortDebugString(); stub->get_rowset(&cntl, &req, &resp, nullptr); @@ -592,8 +595,28 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_ existed_rowset->rowset_id().to_string() == cloud_rs_meta_pb.rowset_id_v2()) { continue; // Same rowset, skip it } - RowsetMetaPB meta_pb = cloud_rowset_meta_to_doris( - cloud_rs_meta_pb, resp.has_schema_dict() ? &resp.schema_dict() : nullptr); + RowsetMetaPB meta_pb; + // Check if the rowset meta contains a schema dictionary key list. + if (cloud_rs_meta_pb.has_schema_dict_key_list() && !resp.has_schema_dict()) { + // Use the locally cached dictionary. + RowsetMetaCloudPB copied_cloud_rs_meta_pb = cloud_rs_meta_pb; + CloudStorageEngine& engine = + ExecEnv::GetInstance()->storage_engine().to_cloud(); + { + wlock.unlock(); + RETURN_IF_ERROR( + engine.get_schema_cloud_dictionary_cache() + .replace_dict_keys_to_schema(cloud_rs_meta_pb.index_id(), + &copied_cloud_rs_meta_pb)); + wlock.lock(); + } + meta_pb = cloud_rowset_meta_to_doris(copied_cloud_rs_meta_pb); + } else { + // Otherwise, use the schema dictionary from the response (if available). + meta_pb = cloud_rowset_meta_to_doris( + cloud_rs_meta_pb, + resp.has_schema_dict() ? &resp.schema_dict() : nullptr); + } auto rs_meta = std::make_shared(); rs_meta->init_from_pb(meta_pb); RowsetSharedPtr rowset; @@ -835,6 +858,14 @@ Status CloudMetaMgr::commit_rowset(const RowsetMeta& rs_meta, RowsetMetaPB rs_meta_pb = rs_meta.get_rowset_pb(); doris_rowset_meta_to_cloud(req.mutable_rowset_meta(), std::move(rs_meta_pb)); + // Replace schema dictionary keys based on the rowset's index ID to maintain schema consistency. + CloudStorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_cloud(); + // if not enable dict cache, then directly return true to avoid refresh + bool replaced = + config::variant_use_cloud_schema_dict_cache + ? engine.get_schema_cloud_dictionary_cache().replace_schema_to_dict_keys( + rs_meta_pb.index_id(), req.mutable_rowset_meta()) + : true; Status st = retry_rpc("commit rowset", req, &resp, &MetaService_Stub::commit_rowset); if (!st.ok() && resp.status().code() == MetaServiceCode::ALREADY_EXISTED) { if (existed_rs_meta != nullptr && resp.has_existed_rowset_meta()) { @@ -845,6 +876,13 @@ Status CloudMetaMgr::commit_rowset(const RowsetMeta& rs_meta, } return Status::AlreadyExist("failed to commit rowset: {}", resp.status().msg()); } + // If dictionary replacement fails, it may indicate that the local schema dictionary is outdated. + // Refreshing the dictionary here ensures that the rowset metadata is updated with the latest schema definitions, + // which is critical for maintaining consistency between the rowset and its corresponding schema. + if (!replaced) { + RETURN_IF_ERROR( + engine.get_schema_cloud_dictionary_cache().refresh_dict(rs_meta_pb.index_id())); + } return st; } @@ -1389,5 +1427,33 @@ int64_t CloudMetaMgr::get_inverted_index_file_szie(const RowsetMeta& rs_meta) { return total_inverted_index_size; } +Status CloudMetaMgr::get_schema_dict(int64_t index_id, + std::shared_ptr* schema_dict) { + VLOG_DEBUG << "Sending GetSchemaDictRequest, index_id: " << index_id; + + // Create the request and response objects. + GetSchemaDictRequest req; + GetSchemaDictResponse resp; + req.set_cloud_unique_id(config::cloud_unique_id); + req.set_index_id(index_id); + + // Invoke RPC via the retry_rpc helper function. + // It will call the MetaService_Stub::get_schema_dict method. + Status st = retry_rpc("get schema dict", req, &resp, &MetaService_Stub::get_schema_dict); + if (!st.ok()) { + return st; + } + + // Optionally, additional checking of the response status can be done here. + // For example, if the returned status code indicates a parsing or not found error, + // you may return an error accordingly. + + // Copy the retrieved schema dictionary from the response. + *schema_dict = std::make_shared(); + (*schema_dict)->Swap(resp.mutable_schema_dict()); + VLOG_DEBUG << "Successfully obtained schema dict, index_id: " << index_id; + return Status::OK(); +} + #include "common/compile_check_end.h" } // namespace doris::cloud diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h index f0a1b1a664887e..d06e55e69ad807 100644 --- a/be/src/cloud/cloud_meta_mgr.h +++ b/be/src/cloud/cloud_meta_mgr.h @@ -16,6 +16,8 @@ // under the License. #pragma once +#include + #include #include #include @@ -58,6 +60,8 @@ class CloudMetaMgr { Status get_tablet_meta(int64_t tablet_id, std::shared_ptr* tablet_meta); + Status get_schema_dict(int64_t index_id, std::shared_ptr* schema_dict); + Status sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_data = false, bool sync_delete_bitmap = true, bool full_sync = false); diff --git a/be/src/cloud/cloud_storage_engine.cpp b/be/src/cloud/cloud_storage_engine.cpp index 574e17fbb9be89..9b74bdf7343f01 100644 --- a/be/src/cloud/cloud_storage_engine.cpp +++ b/be/src/cloud/cloud_storage_engine.cpp @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -37,6 +38,7 @@ #include "cloud/cloud_txn_delete_bitmap_cache.h" #include "cloud/cloud_warm_up_manager.h" #include "cloud/config.h" +#include "common/config.h" #include "io/cache/block_file_cache_downloader.h" #include "io/cache/block_file_cache_factory.h" #include "io/cache/file_cache_common.h" @@ -190,6 +192,9 @@ Status CloudStorageEngine::open() { _tablet_hotspot = std::make_unique(); + _schema_cloud_dictionary_cache = + std::make_unique(config::schema_dict_cache_capacity); + RETURN_NOT_OK_STATUS_WITH_WARN( init_stream_load_recorder(ExecEnv::GetInstance()->store_paths()[0].path), "init StreamLoadRecorder failed"); diff --git a/be/src/cloud/cloud_storage_engine.h b/be/src/cloud/cloud_storage_engine.h index 7d8e68c5f79906..f8dc58b6cd0334 100644 --- a/be/src/cloud/cloud_storage_engine.h +++ b/be/src/cloud/cloud_storage_engine.h @@ -24,6 +24,7 @@ //#include "cloud/cloud_full_compaction.h" #include "cloud/cloud_cumulative_compaction_policy.h" #include "cloud/cloud_tablet.h" +#include "cloud/schema_cloud_dictionary_cache.h" #include "cloud_txn_delete_bitmap_cache.h" #include "io/cache/block_file_cache_factory.h" #include "olap/storage_engine.h" @@ -69,6 +70,9 @@ class CloudStorageEngine final : public BaseStorageEngine { CloudTabletMgr& tablet_mgr() const { return *_tablet_mgr; } CloudTxnDeleteBitmapCache& txn_delete_bitmap_cache() const { return *_txn_delete_bitmap_cache; } + SchemaCloudDictionaryCache& get_schema_cloud_dictionary_cache() { + return *_schema_cloud_dictionary_cache; + } ThreadPool& calc_tablet_delete_bitmap_task_thread_pool() const { return *_calc_tablet_delete_bitmap_task_thread_pool; } @@ -163,6 +167,7 @@ class CloudStorageEngine final : public BaseStorageEngine { std::unique_ptr _tablet_mgr; std::unique_ptr _txn_delete_bitmap_cache; std::unique_ptr _calc_tablet_delete_bitmap_task_thread_pool; + std::unique_ptr _schema_cloud_dictionary_cache; // Components for cache warmup std::unique_ptr _file_cache_block_downloader; diff --git a/be/src/cloud/schema_cloud_dictionary_cache.cpp b/be/src/cloud/schema_cloud_dictionary_cache.cpp new file mode 100644 index 00000000000000..25f0b2327024fd --- /dev/null +++ b/be/src/cloud/schema_cloud_dictionary_cache.cpp @@ -0,0 +1,226 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "cloud/schema_cloud_dictionary_cache.h" + +#include +#include + +#include +#include +#include +#include + +#include "cloud/cloud_meta_mgr.h" +#include "cloud/cloud_storage_engine.h" +#include "cloud/cloud_tablet.h" +#include "common/config.h" +#include "gen_cpp/cloud.pb.h" // For GetSchemaDictResponse +#include "runtime/exec_env.h" + +namespace doris { + +bvar::Adder g_schema_dict_cache_count("schema_dict_cache_count"); +bvar::Adder g_replace_dict_keys_to_schema_hit_cache( + "schema_dict_cache_replace_dict_keys_to_schema_hit_count"); +bvar::Adder g_replace_schema_to_dict_keys_hit_cache( + "schema_dict_cache_replace_schema_to_dict_keys_hit_count"); +bvar::Adder g_schema_dict_cache_miss_count("schema_dict_cache_miss_count"); +bvar::Adder g_schema_dict_refresh_count("schema_dict_refresh_count"); + +void SchemaCloudDictionaryCache::_insert(int64_t index_id, const SchemaCloudDictionarySPtr& dict) { + auto* value = new CacheValue; + value->dict = dict; + auto* lru_handle = + LRUCachePolicy::insert(fmt::format("{}", index_id), value, 1, 0, CachePriority::NORMAL); + g_schema_dict_cache_count << 1; + _cache->release(lru_handle); +} + +SchemaCloudDictionarySPtr SchemaCloudDictionaryCache::_lookup(int64_t index_id) { + Cache::Handle* handle = LRUCachePolicy::lookup(fmt::format("{}", index_id)); + if (!handle) { + return nullptr; + } + auto* cache_val = static_cast(_cache->value(handle)); + SchemaCloudDictionarySPtr dict = cache_val ? cache_val->dict : nullptr; + _cache->release(handle); // release handle but dict's shared_ptr still alive + return dict; +} + +/** + * Processes dictionary entries by matching items from the given item map. + * It maps items to their dictionary keys, then adds these keys to the rowset metadata. + * If an item is missing in the dictionary, the dictionary key list in rowset meta is cleared + * and the function returns a NotFound status. + * + * @tparam ItemPB The protobuf message type for dictionary items (e.g., ColumnPB or TabletIndexPB). + * @param dict The SchemaCloudDictionary that holds the dictionary entries. + * @param item_dict A mapping from unique identifiers to the dictionary items. + * @param result Pointer to a repeated field where filtered (non-extended) items are stored. May be null. + * @param items The repeated field of items in the original rowset meta. + * @param filter A predicate that returns true if an item should be treated as an extended item and skipped. + * @param add_dict_key_fn A function to be called for each valid item that adds its key to the rowset meta. + * @param rowset_meta Pointer to the rowset metadata; it is cleared if any item is not found. + * + * @return Status::OK if all items are processed successfully; otherwise, a NotFound status. + */ +template +Status process_dictionary(SchemaCloudDictionary& dict, + const google::protobuf::Map& item_dict, + google::protobuf::RepeatedPtrField* result, + const google::protobuf::RepeatedPtrField& items, + const std::function& filter, + const std::function& add_dict_key_fn, + RowsetMetaCloudPB* rowset_meta) { + if (items.empty()) { + return Status::OK(); + } + // Use deterministic method to do serialization since structure like + // `google::protobuf::Map`'s serialization is unstable + auto serialize_fn = [](const ItemPB& item) -> std::string { + std::string output; + google::protobuf::io::StringOutputStream string_output_stream(&output); + google::protobuf::io::CodedOutputStream output_stream(&string_output_stream); + output_stream.SetSerializationDeterministic(true); + item.SerializeToCodedStream(&output_stream); + return output; + }; + + google::protobuf::RepeatedPtrField none_ext_items; + std::unordered_map reversed_dict; + for (const auto& [key, val] : item_dict) { + reversed_dict[serialize_fn(val)] = key; + } + + for (const auto& item : items) { + if (filter(item)) { + // Filter none extended items, mainly extended columns and extended indexes + *none_ext_items.Add() = item; + continue; + } + const std::string serialized_key = serialize_fn(item); + auto it = reversed_dict.find(serialized_key); + if (it == reversed_dict.end()) { + // If any required item is missing in the dictionary, clear the dict key list and return NotFound. + // ATTN: need to clear dict key list let MS to add key list + rowset_meta->clear_schema_dict_key_list(); + g_schema_dict_cache_miss_count << 1; + return Status::NotFound("Not found entry in dict"); + } + // Add existed dict key to related dict + add_dict_key_fn(it->second); + } + // clear extended items to prevent writing them to fdb + if (result != nullptr) { + result->Swap(&none_ext_items); + } + return Status::OK(); +} + +Status SchemaCloudDictionaryCache::replace_schema_to_dict_keys(int64_t index_id, + RowsetMetaCloudPB* rowset_meta) { + if (!rowset_meta->has_variant_type_in_schema()) { + return Status::OK(); + } + auto dict = _lookup(index_id); + if (!dict) { + g_schema_dict_cache_miss_count << 1; + return Status::NotFound("Not found dict {}", index_id); + } + auto* dict_list = rowset_meta->mutable_schema_dict_key_list(); + // Process column dictionary: add keys for non-extended columns. + auto column_filter = [&](const doris::ColumnPB& col) -> bool { return col.unique_id() >= 0; }; + auto column_dict_adder = [&](int32_t key) { dict_list->add_column_dict_key_list(key); }; + RETURN_IF_ERROR(process_dictionary( + *dict, dict->column_dict(), rowset_meta->mutable_tablet_schema()->mutable_column(), + rowset_meta->tablet_schema().column(), column_filter, column_dict_adder, rowset_meta)); + + // Process index dictionary: add keys for indexes with an empty index_suffix_name. + auto index_filter = [&](const doris::TabletIndexPB& index_pb) -> bool { + return index_pb.index_suffix_name().empty(); + }; + auto index_dict_adder = [&](int32_t key) { dict_list->add_index_info_dict_key_list(key); }; + RETURN_IF_ERROR(process_dictionary( + *dict, dict->index_dict(), rowset_meta->mutable_tablet_schema()->mutable_index(), + rowset_meta->tablet_schema().index(), index_filter, index_dict_adder, rowset_meta)); + g_replace_schema_to_dict_keys_hit_cache << 1; + return Status::OK(); +} + +Status SchemaCloudDictionaryCache::_try_fill_schema( + const std::shared_ptr& dict, const SchemaDictKeyList& dict_keys, + TabletSchemaCloudPB* schema) { + // Process column dictionary keys + for (int key : dict_keys.column_dict_key_list()) { + auto it = dict->column_dict().find(key); + if (it == dict->column_dict().end()) { + return Status::NotFound("Column dict key {} not found", key); + } + *schema->add_column() = it->second; + } + // Process index dictionary keys + for (int key : dict_keys.index_info_dict_key_list()) { + auto it = dict->index_dict().find(key); + if (it == dict->index_dict().end()) { + return Status::NotFound("Index dict key {} not found", key); + } + *schema->add_index() = it->second; + } + return Status::OK(); +} + +Status SchemaCloudDictionaryCache::refresh_dict(int64_t index_id, + SchemaCloudDictionarySPtr* new_dict) { + // First attempt: use the current cached dictionary. + auto refresh_dict = std::make_shared(); + RETURN_IF_ERROR(static_cast(ExecEnv::GetInstance()->storage_engine()) + .meta_mgr() + .get_schema_dict(index_id, &refresh_dict)); + _insert(index_id, refresh_dict); + if (new_dict != nullptr) { + *new_dict = refresh_dict; + } + LOG(INFO) << "refresh dict for index_id=" << index_id; + g_schema_dict_refresh_count << 1; + return Status::OK(); +} + +Status SchemaCloudDictionaryCache::replace_dict_keys_to_schema(int64_t index_id, + RowsetMetaCloudPB* out) { + // First attempt: use the current cached dictionary + SchemaCloudDictionarySPtr dict = _lookup(index_id); + Status st = + dict ? _try_fill_schema(dict, out->schema_dict_key_list(), out->mutable_tablet_schema()) + : Status::NotFound("Schema dict not found in cache"); + + // If filling fails (possibly due to outdated dictionary data), refresh the dictionary + if (!st.ok()) { + g_schema_dict_cache_miss_count << 1; + RETURN_IF_ERROR(refresh_dict(index_id, &dict)); + if (!dict) { + return Status::NotFound("Schema dict not found after refresh, index_id={}", + index_id); + } + // Retry filling the schema with the refreshed dictionary + st = _try_fill_schema(dict, out->schema_dict_key_list(), out->mutable_tablet_schema()); + } + g_replace_dict_keys_to_schema_hit_cache << 1; + return st; +} + +} // namespace doris diff --git a/be/src/cloud/schema_cloud_dictionary_cache.h b/be/src/cloud/schema_cloud_dictionary_cache.h new file mode 100644 index 00000000000000..ed21b7db909ad2 --- /dev/null +++ b/be/src/cloud/schema_cloud_dictionary_cache.h @@ -0,0 +1,101 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include +#include +#include + +#include "runtime/memory/lru_cache_policy.h" + +namespace doris { + +class SchemaCloudDictionary; +class RowsetMetaCloudPB; + +using SchemaCloudDictionarySPtr = std::shared_ptr; + +/* + * SchemaCloudDictionaryCache provides a local cache for SchemaCloudDictionary. + * + * Caching logic: + * - If the dictionary associated with a given key has not had any new columns added + * (determined by comparing the serialized data for consistency), + * the cached dictionary is directly used to update the dictionary list in the rowset meta + * (similar to the process_dictionary logic in write_schema_dict). + * - If new columns have been detected, the local cache is disregarded, and the updated + * dictionary should be fetched via the meta service. + */ +class SchemaCloudDictionaryCache : public LRUCachePolicy { +public: + SchemaCloudDictionaryCache(size_t capacity) + : LRUCachePolicy(CachePolicy::CacheType::SCHEMA_CLOUD_DICTIONARY_CACHE, capacity, + LRUCacheType::NUMBER, 512) {} + /** + * Refreshes the dictionary for the given index_id by calling an RPC via the meta manager. + * The refreshed dictionary is then inserted into the cache. + * + * @param index_id The identifier for the index. + * @param new_dict Optional output parameter; if provided, it will be set to point to the refreshed dictionary. + * + * @return Status::OK if the dictionary is successfully refreshed; otherwise, an error status. + */ + virtual Status refresh_dict(int64_t index_id, SchemaCloudDictionarySPtr* new_dict = nullptr); + + /** + * Refreshes the dictionary for the given index_id by calling an RPC via the meta manager. + * The refreshed dictionary is then inserted into the cache. + * + * @param index_id The identifier for the index. + * @param new_dict Optional output parameter; if provided, it will be set to point to the refreshed dictionary. + * + * @return Status::OK if the dictionary is successfully refreshed; otherwise, an error status. + */ + Status replace_schema_to_dict_keys(int64_t index_id, RowsetMetaCloudPB* out); + + /** + * Replaces dictionary keys in the given RowsetMetaCloudPB by using the cached dictionary. + * If the cached dictionary is missing or its data is outdated (i.e. missing required keys), + * an RPC call is triggered to refresh the dictionary, which is then used to fill the tablet schema. + * + * @param index_id The identifier for the index. + * @param out Pointer to the RowsetMetaCloudPB whose tablet schema will be updated. + * + * @return Status::OK if the tablet schema is successfully updated; otherwise, an error status. + */ + Status replace_dict_keys_to_schema(int64_t index_id, RowsetMetaCloudPB* out); + +private: + // ut + friend class FakeSchemaCloudDictionaryCache; + // insert dict + void _insert(int64_t index_id, const SchemaCloudDictionarySPtr& dict); + // lookup dict + SchemaCloudDictionarySPtr _lookup(int64_t index_id); + // Attempts to fill the tablet schema information in a SchemaCloudDictionary into a TabletSchemaCloudPB + // based on a given set of dictionary keys. If any required key is missing in the dictionary, a NotFound status is returned. + Status _try_fill_schema(const SchemaCloudDictionarySPtr& dict, + const SchemaDictKeyList& dict_keys, TabletSchemaCloudPB* schema); + struct CacheValue : public LRUCacheValueBase { + SchemaCloudDictionarySPtr dict; + }; +}; + +} // namespace doris \ No newline at end of file diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 6526061ff44220..301a82336b7d15 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1029,7 +1029,7 @@ DEFINE_Bool(enable_workload_group_for_scan, "false"); DEFINE_mInt64(workload_group_scan_task_wait_timeout_ms, "10000"); // Whether use schema dict in backend side instead of MetaService side(cloud mode) -DEFINE_mBool(variant_use_cloud_schema_dict, "true"); +DEFINE_mBool(variant_use_cloud_schema_dict_cache, "true"); DEFINE_mDouble(variant_ratio_of_defaults_as_sparse_column, "1"); DEFINE_mInt64(variant_threshold_rows_to_estimate_sparse_column, "2048"); DEFINE_mBool(variant_throw_exeception_on_invalid_json, "false"); @@ -1437,6 +1437,7 @@ DEFINE_mInt32(compaction_num_per_round, "1"); DEFINE_mInt32(check_tablet_delete_bitmap_interval_seconds, "300"); DEFINE_mInt32(check_tablet_delete_bitmap_score_top_n, "10"); DEFINE_mBool(enable_check_tablet_delete_bitmap_score, "true"); +DEFINE_mInt32(schema_dict_cache_capacity, "4096"); // clang-format off #ifdef BE_TEST diff --git a/be/src/common/config.h b/be/src/common/config.h index d5299393187c0f..845488139be0a4 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1234,7 +1234,7 @@ DECLARE_mInt64(LZ4_HC_compression_level); // Threshold of a column as sparse column // Notice: TEST ONLY DECLARE_mDouble(variant_ratio_of_defaults_as_sparse_column); -DECLARE_mBool(variant_use_cloud_schema_dict); +DECLARE_mBool(variant_use_cloud_schema_dict_cache); // Threshold to estimate a column is sparsed // Notice: TEST ONLY DECLARE_mInt64(variant_threshold_rows_to_estimate_sparse_column); @@ -1512,6 +1512,7 @@ DECLARE_mInt32(compaction_num_per_round); DECLARE_mInt32(check_tablet_delete_bitmap_interval_seconds); DECLARE_mInt32(check_tablet_delete_bitmap_score_top_n); DECLARE_mBool(enable_check_tablet_delete_bitmap_score); +DECLARE_mInt32(schema_dict_cache_capacity); #ifdef BE_TEST // test s3 diff --git a/be/src/runtime/memory/cache_policy.h b/be/src/runtime/memory/cache_policy.h index 72e61fed2e0013..f7f4bda34fffeb 100644 --- a/be/src/runtime/memory/cache_policy.h +++ b/be/src/runtime/memory/cache_policy.h @@ -17,6 +17,8 @@ #pragma once +#include + #include #include "util/runtime_profile.h" @@ -52,6 +54,7 @@ class CachePolicy { FOR_UT_CACHE_NUMBER = 19, QUERY_CACHE = 20, TABLET_COLUMN_OBJECT_POOL = 21, + SCHEMA_CLOUD_DICTIONARY_CACHE = 22, }; static std::string type_string(CacheType type) { @@ -98,6 +101,8 @@ class CachePolicy { return "QueryCache"; case CacheType::TABLET_COLUMN_OBJECT_POOL: return "TabletColumnObjectPool"; + case CacheType::SCHEMA_CLOUD_DICTIONARY_CACHE: + return "SchemaCloudDictionaryCache"; default: throw Exception(Status::FatalError("not match type of cache policy :{}", static_cast(type))); @@ -126,7 +131,9 @@ class CachePolicy { {"CloudTxnDeleteBitmapCache", CacheType::CLOUD_TXN_DELETE_BITMAP_CACHE}, {"ForUTCacheNumber", CacheType::FOR_UT_CACHE_NUMBER}, {"QueryCache", CacheType::QUERY_CACHE}, - {"TabletColumnObjectPool", CacheType::TABLET_COLUMN_OBJECT_POOL}}; + {"TabletColumnObjectPool", CacheType::TABLET_COLUMN_OBJECT_POOL}, + {"SchemaCloudDictionaryCache", CacheType::SCHEMA_CLOUD_DICTIONARY_CACHE}, + }; static CacheType string_to_type(std::string type) { if (StringToType.contains(type)) { diff --git a/be/test/cloud/test_schema_cloud_dictionary_cache.cpp b/be/test/cloud/test_schema_cloud_dictionary_cache.cpp new file mode 100644 index 00000000000000..3d05eb67e457fa --- /dev/null +++ b/be/test/cloud/test_schema_cloud_dictionary_cache.cpp @@ -0,0 +1,178 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "cloud/schema_cloud_dictionary_cache.h" +#include "gen_cpp/olap_file.pb.h" +#include "gtest/gtest.h" + +namespace doris { + +using SchemaCloudDictionarySPtr = std::shared_ptr; + +/* + * FakeSchemaCloudDictionaryCache is a test subclass which allows injection of dictionary entries + * and overrides refresh_dict to simulate RPC refresh. + */ +class FakeSchemaCloudDictionaryCache : public SchemaCloudDictionaryCache { +public: + FakeSchemaCloudDictionaryCache(size_t capacity) : SchemaCloudDictionaryCache(capacity) {} + + // For unit testing, we override refresh_dict to simulate different scenarios. + // (Assume the base class method is declared virtual for testing or we hide it in our subclass) + Status refresh_dict(int64_t index_id, SchemaCloudDictionarySPtr* new_dict = nullptr) override { + if (simulate_refresh_success) { + // Simulate a successful refresh by creating a valid dictionary. + SchemaCloudDictionarySPtr valid_dict = createValidDictionary(); + // Inject the dictionary into cache. + TestInsert(index_id, valid_dict); + if (new_dict) { + *new_dict = valid_dict; + } + return Status::OK(); + } else { + return Status::InternalError("Simulated refresh failure"); + } + } + + // Public wrapper for injection (assume _insert is accessible, e.g. changed to protected for unit test) + void TestInsert(int64_t index_id, const SchemaCloudDictionarySPtr& dict) { + _insert(index_id, dict); + } + + // Flag to control refresh_dict to simulate refresh results. + bool simulate_refresh_success = true; + + // Create a valid SchemaCloudDictionary with expected keys. + static SchemaCloudDictionarySPtr createValidDictionary() { + auto* dict = new SchemaCloudDictionary(); + // Populate valid column entry with key 101. + auto& col_dict = *dict->mutable_column_dict(); + ColumnPB* col_pb = &(col_dict)[101]; + col_pb->set_unique_id(101); + // Populate valid index entry with key 201. Set index_suffix_name to empty. + auto& idx_dict = *dict->mutable_index_dict(); + TabletIndexPB* idx_pb = &(idx_dict)[201]; + idx_pb->set_index_suffix_name(""); + return SchemaCloudDictionarySPtr(dict); + } + + // Create an invalid SchemaCloudDictionary (missing column key 101) + static SchemaCloudDictionarySPtr createInvalidDictionary() { + auto* dict = new SchemaCloudDictionary(); + // Insert a column with a wrong key example 999 rather than 101. + auto& col_dict = *dict->mutable_column_dict(); + ColumnPB* col_pb = &(col_dict)[999]; + col_pb->set_unique_id(999); + // 正常的 index 数据. + auto& idx_dict = *dict->mutable_index_dict(); + TabletIndexPB* idx_pb = &(idx_dict)[201]; + idx_pb->set_index_suffix_name(""); + return SchemaCloudDictionarySPtr(dict); + } +}; + +// Test case 1: Cached dictionary valid, _try_fill_schema returns OK. +TEST(SchemaCloudDictionaryCacheTest, ReplaceDictKeysToSchema_ValidCache) { + int64_t index_id = 100; + FakeSchemaCloudDictionaryCache cache(10); + // Inject a valid dictionary into cache. + SchemaCloudDictionarySPtr valid_dict = FakeSchemaCloudDictionaryCache::createValidDictionary(); + cache.TestInsert(index_id, valid_dict); + + // Create a RowsetMetaCloudPB with schema dictionary key list. + RowsetMetaCloudPB rs_meta; + // For testing, add expected column key (101) and index key (201). + SchemaDictKeyList* dict_keys = rs_meta.mutable_schema_dict_key_list(); + dict_keys->add_column_dict_key_list(101); + dict_keys->add_index_info_dict_key_list(201); + // Ensure tablet schema message is created. + rs_meta.mutable_tablet_schema(); + + // Call replace_dict_keys_to_schema. + Status st = cache.replace_dict_keys_to_schema(index_id, &rs_meta); + EXPECT_TRUE(st.ok()); + + // Check that the tablet schema was filled. + const TabletSchemaCloudPB& schema = rs_meta.tablet_schema(); + EXPECT_EQ(schema.column_size(), 1); + EXPECT_EQ(schema.index_size(), 1); +} + +// Test case 2: Cached dictionary invalid, triggers refresh then succeeds. +TEST(SchemaCloudDictionaryCacheTest, ReplaceDictKeysToSchema_InvalidCache_ThenRefresh) { + int64_t index_id = 200; + FakeSchemaCloudDictionaryCache cache(10); + // Inject an invalid dictionary (missing required column key 101). + SchemaCloudDictionarySPtr invalid_dict = + FakeSchemaCloudDictionaryCache::createInvalidDictionary(); + cache.TestInsert(index_id, invalid_dict); + + // Create rowset meta with keys expecting valid dictionary. + RowsetMetaCloudPB rs_meta; + SchemaDictKeyList* dict_keys = rs_meta.mutable_schema_dict_key_list(); + dict_keys->add_column_dict_key_list(101); // invalid dict does not contain 101. + dict_keys->add_index_info_dict_key_list(201); + rs_meta.mutable_tablet_schema(); + + cache.simulate_refresh_success = true; + Status st = cache.replace_dict_keys_to_schema(index_id, &rs_meta); + EXPECT_TRUE(st.ok()); + + // After refresh, the valid dictionary should be used. + const TabletSchemaCloudPB& schema = rs_meta.tablet_schema(); + EXPECT_EQ(schema.column_size(), 1); + EXPECT_EQ(schema.index_size(), 1); +} + +// Test case 3: No dictionary in cache, refresh is triggered and succeeds. +TEST(SchemaCloudDictionaryCacheTest, ReplaceDictKeysToSchema_NoCache_ThenRefresh) { + int64_t index_id = 300; + FakeSchemaCloudDictionaryCache cache(10); + // Not injecting any dictionary so that _lookup returns null. + RowsetMetaCloudPB rs_meta; + SchemaDictKeyList* dict_keys = rs_meta.mutable_schema_dict_key_list(); + dict_keys->add_column_dict_key_list(101); + dict_keys->add_index_info_dict_key_list(201); + rs_meta.mutable_tablet_schema(); + + // Refresh should be triggered. + cache.simulate_refresh_success = true; + Status st = cache.replace_dict_keys_to_schema(index_id, &rs_meta); + EXPECT_TRUE(st.ok()); + + const TabletSchemaCloudPB& schema = rs_meta.tablet_schema(); + EXPECT_EQ(schema.column_size(), 1); + EXPECT_EQ(schema.index_size(), 1); +} + +// Test case 4: Refresh fails, replace_dict_keys_to_schema returns error. +TEST(SchemaCloudDictionaryCacheTest, ReplaceDictKeysToSchema_RefreshFailure) { + int64_t index_id = 400; + FakeSchemaCloudDictionaryCache cache(10); + // Ensure no valid dictionary in cache. + RowsetMetaCloudPB rs_meta; + SchemaDictKeyList* dict_keys = rs_meta.mutable_schema_dict_key_list(); + dict_keys->add_column_dict_key_list(101); + dict_keys->add_index_info_dict_key_list(201); + rs_meta.mutable_tablet_schema(); + + cache.simulate_refresh_success = false; + Status st = cache.replace_dict_keys_to_schema(index_id, &rs_meta); + EXPECT_FALSE(st.ok()); +} + +} // namespace doris \ No newline at end of file diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp index 6385fc7c9e815f..fe887760f7f32d 100644 --- a/cloud/src/common/bvars.cpp +++ b/cloud/src/common/bvars.cpp @@ -85,12 +85,12 @@ BvarLatencyRecorderWithTag g_bvar_ms_finish_tablet_job("ms", "finish_tablet_job" BvarLatencyRecorderWithTag g_bvar_ms_get_cluster_status("ms", "get_cluster_status"); BvarLatencyRecorderWithTag g_bvar_ms_set_cluster_status("ms", "set_cluster_status"); BvarLatencyRecorderWithTag g_bvar_ms_check_kv("ms", "check_kv"); +BvarLatencyRecorderWithTag g_bvar_ms_get_schema_dict("ms", "get_schema_dict"); bvar::Adder g_bvar_update_delete_bitmap_fail_counter; bvar::Window > g_bvar_update_delete_bitmap_fail_counter_minute("ms", "update_delete_bitmap_fail", &g_bvar_update_delete_bitmap_fail_counter, 60); bvar::Adder g_bvar_get_delete_bitmap_fail_counter; bvar::Window > g_bvar_get_delete_bitmap_fail_counter_minute("ms", "get_delete_bitmap_fail", &g_bvar_get_delete_bitmap_fail_counter, 60); - // recycler's bvars // TODO: use mbvar for per instance, https://github.com/apache/brpc/blob/master/docs/cn/mbvar_c++.md BvarStatusWithTag g_bvar_recycler_recycle_index_earlest_ts("recycler", "recycle_index_earlest_ts"); diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h index ff1d3520b30dd5..7f616615394781 100644 --- a/cloud/src/common/bvars.h +++ b/cloud/src/common/bvars.h @@ -160,6 +160,7 @@ extern BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach; extern BvarLatencyRecorderWithTag g_bvar_ms_reset_rl_progress; extern BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id; extern BvarLatencyRecorderWithTag g_bvar_ms_check_kv; +extern BvarLatencyRecorderWithTag g_bvar_ms_get_schema_dict; extern bvar::Adder g_bvar_update_delete_bitmap_fail_counter; extern bvar::Adder g_bvar_get_delete_bitmap_fail_counter; diff --git a/cloud/src/meta-service/meta_service.cpp b/cloud/src/meta-service/meta_service.cpp index bc8af94496a0cf..ebe46e4a438d8b 100644 --- a/cloud/src/meta-service/meta_service.cpp +++ b/cloud/src/meta-service/meta_service.cpp @@ -1691,7 +1691,7 @@ void MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller, } } - if (need_read_schema_dict) { + if (need_read_schema_dict && request->schema_op() != GetRowsetRequest::NO_DICT) { read_schema_dict(code, msg, instance_id, idx.index_id(), txn.get(), response->mutable_rowset_meta(), response->mutable_schema_dict(), request->schema_op()); @@ -2589,4 +2589,57 @@ std::size_t get_segments_key_bounds_bytes(const doris::RowsetMetaCloudPB& rowset return ret; } +void MetaServiceImpl::get_schema_dict(::google::protobuf::RpcController* controller, + const GetSchemaDictRequest* request, + GetSchemaDictResponse* response, + ::google::protobuf::Closure* done) { + RPC_PREPROCESS(get_schema_dict); + instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id()); + if (instance_id.empty()) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "empty instance_id"; + LOG(WARNING) << msg << ", cloud_unique_id=" << request->cloud_unique_id(); + return; + } + + if (!request->has_index_id()) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "missing index_id in request"; + return; + } + + RPC_RATE_LIMIT(get_schema_dict) + + std::unique_ptr txn; + TxnErrorCode err = txn_kv_->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as(err); + msg = "failed to init txn"; + return; + } + + std::string dict_key = meta_schema_pb_dictionary_key({instance_id, request->index_id()}); + ValueBuf dict_val; + err = cloud::get(txn.get(), dict_key, &dict_val); + LOG(INFO) << "Retrieved column pb dictionary, index_id=" << request->index_id() + << " key=" << hex(dict_key) << " error=" << err; + if (err != TxnErrorCode::TXN_KEY_NOT_FOUND && err != TxnErrorCode::TXN_OK) { + // Handle retrieval error. + ss << "Failed to retrieve column pb dictionary, instance_id=" << instance_id + << " table_id=" << request->index_id() << " key=" << hex(dict_key) << " error=" << err; + msg = ss.str(); + code = cast_as(err); + return; + } + SchemaCloudDictionary schema_dict; + if (err == TxnErrorCode::TXN_OK && !dict_val.to_pb(&schema_dict)) { + // Handle parse error. + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + msg = fmt::format("Malformed tablet dictionary value, key={}", hex(dict_key)); + return; + } + + response->mutable_schema_dict()->Swap(&schema_dict); +} + } // namespace doris::cloud diff --git a/cloud/src/meta-service/meta_service.h b/cloud/src/meta-service/meta_service.h index 5374cbea741fb0..6df09bd2c20702 100644 --- a/cloud/src/meta-service/meta_service.h +++ b/cloud/src/meta-service/meta_service.h @@ -301,6 +301,10 @@ class MetaServiceImpl : public cloud::MetaService { void get_txn_id(::google::protobuf::RpcController* controller, const GetTxnIdRequest* request, GetTxnIdResponse* response, ::google::protobuf::Closure* done) override; + void get_schema_dict(::google::protobuf::RpcController* controller, + const GetSchemaDictRequest* request, GetSchemaDictResponse* response, + ::google::protobuf::Closure* done) override; + // ATTN: If you add a new method, please also add the corresponding implementation in `MetaServiceProxy`. std::pair get_instance_info(const std::string& instance_id, @@ -694,6 +698,12 @@ class MetaServiceProxy final : public MetaService { call_impl(&cloud::MetaService::get_txn_id, controller, request, response, done); } + void get_schema_dict(::google::protobuf::RpcController* controller, + const GetSchemaDictRequest* request, GetSchemaDictResponse* response, + ::google::protobuf::Closure* done) override { + call_impl(&cloud::MetaService::get_schema_dict, controller, request, response, done); + } + private: template using MetaServiceMethod = void (cloud::MetaService::*)(::google::protobuf::RpcController*, diff --git a/cloud/src/meta-service/meta_service_schema.cpp b/cloud/src/meta-service/meta_service_schema.cpp index ff88e82cf200c8..bbbedd1e3abe8e 100644 --- a/cloud/src/meta-service/meta_service_schema.cpp +++ b/cloud/src/meta-service/meta_service_schema.cpp @@ -212,6 +212,10 @@ void process_dictionary(SchemaCloudDictionary& dict, // such restored schema in fdb. And also add extra dict key info to RowsetMetaCloudPB. void write_schema_dict(MetaServiceCode& code, std::string& msg, const std::string& instance_id, Transaction* txn, RowsetMetaCloudPB* rowset_meta) { + // if schema_dict_key_list is not empty, then the schema already replaced in BE side, and no need to update dict + if (rowset_meta->has_schema_dict_key_list()) { + return; + } std::stringstream ss; // wrtie dict to rowset meta and update dict SchemaCloudDictionary dict; @@ -311,6 +315,7 @@ void write_schema_dict(MetaServiceCode& code, std::string& msg, const std::strin cloud::put(txn, dict_key, dict_val, 0); LOG(INFO) << "Dictionary saved, key=" << hex(dict_key) << " txn_id=" << rowset_meta->txn_id() << " Dict size=" << dict.column_dict_size() + << ", index_id=" << rowset_meta->index_id() << ", Current column ID=" << dict.current_column_dict_id() << ", Current index ID=" << dict.current_index_dict_id() << ", Dict bytes=" << dict_val.size(); @@ -322,7 +327,6 @@ void read_schema_dict(MetaServiceCode& code, std::string& msg, const std::string google::protobuf::RepeatedPtrField* rsp_metas, SchemaCloudDictionary* rsp_dict, GetRowsetRequest::SchemaOp schema_op) { std::stringstream ss; - // read dict if any rowset has dict key list SchemaCloudDictionary dict; std::string column_dict_key = meta_schema_pb_dictionary_key({instance_id, index_id}); diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index 5d541547b97feb..4b9ad50d5d54ba 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -995,6 +995,16 @@ message GetRowsetResponse { optional SchemaCloudDictionary schema_dict = 4; } +message GetSchemaDictRequest { + optional string cloud_unique_id = 1; // For auth + optional int64 index_id = 2; +} + +message GetSchemaDictResponse { + optional MetaServiceResponseStatus status = 1; + optional SchemaCloudDictionary schema_dict = 2; +} + message IndexRequest { optional string cloud_unique_id = 1; // For auth repeated int64 index_ids = 2; @@ -1390,6 +1400,8 @@ enum MetaServiceCode { // The meta service retries KV_TXN_CONFLICT error but is exceeded the max times. KV_TXN_CONFLICT_RETRY_EXCEEDED_MAX_TIMES = 10001; + SCHEMA_DICT_NOT_FOUND = 11001; + UNDEFINED_ERR = 1000000; } @@ -1554,6 +1566,7 @@ service MetaService { rpc commit_rowset(CreateRowsetRequest) returns (CreateRowsetResponse); rpc update_tmp_rowset(CreateRowsetRequest) returns (CreateRowsetResponse); rpc get_rowset(GetRowsetRequest) returns (GetRowsetResponse); + rpc get_schema_dict(GetSchemaDictRequest) returns (GetSchemaDictResponse); rpc prepare_index(IndexRequest) returns (IndexResponse); rpc commit_index(IndexRequest) returns (IndexResponse); rpc drop_index(IndexRequest) returns (IndexResponse);