From 8573960ec224a5dcbf2d2b629c66a269a75c388d Mon Sep 17 00:00:00 2001 From: eldenmoon Date: Fri, 7 Mar 2025 22:58:43 +0800 Subject: [PATCH] add path stats check and fix sparse cache --- be/src/olap/compaction.cpp | 6 ++ be/src/olap/iterators.h | 3 +- .../olap/rowset/segment_v2/column_reader.cpp | 14 ++-- .../segment_v2/hierarchical_data_reader.h | 35 +++++---- .../rowset/segment_v2/segment_iterator.cpp | 2 +- .../olap/rowset/segment_v2/segment_writer.cpp | 12 ++- .../olap/rowset/segment_v2/segment_writer.h | 3 +- .../segment_v2/variant_column_writer_impl.cpp | 12 +-- be/src/vec/common/schema_util.cpp | 78 +++++++++++++------ be/src/vec/common/schema_util.h | 8 +- 10 files changed, 116 insertions(+), 57 deletions(-) diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index acfcc34470f0ed..545096771d1266 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -74,6 +74,7 @@ #include "runtime/thread_context.h" #include "util/time.h" #include "util/trace.h" +#include "vec/common/schema_util.h" using std::vector; @@ -1329,6 +1330,11 @@ Status Compaction::check_correctness() { _tablet->tablet_id(), _input_row_num, _stats.merged_rows, _stats.filtered_rows, _output_rowset->num_rows()); } + if (_tablet->keys_type() == KeysType::DUP_KEYS) { + // only check path stats for dup_keys since the rows may be merged in other models + RETURN_IF_ERROR(vectorized::schema_util::check_path_stats(_input_rowsets, _output_rowset, + _tablet->tablet_id())); + } return Status::OK(); } diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h index 963f4d23598cf3..f0cc7784f5c12e 100644 --- a/be/src/olap/iterators.h +++ b/be/src/olap/iterators.h @@ -121,7 +121,8 @@ class StorageReadOptions { RowRanges row_ranges; size_t topn_limit = 0; // Cache for sparse column data to avoid redundant reads - vectorized::ColumnPtr sparse_column_cache; + // col_unique_id -> cached column_ptr + std::unordered_map sparse_column_cache; }; struct CompactionSampleInfo { diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp b/be/src/olap/rowset/segment_v2/column_reader.cpp index 313eca9b0161b5..166dd9f0a9727c 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/column_reader.cpp @@ -324,9 +324,10 @@ Status VariantColumnReader::_create_sparse_merge_reader(ColumnIterator** iterato VLOG_DEBUG << "subcolumns to merge " << src_subcolumns_for_sparse.size(); // Create sparse column merge reader - *iterator = new SparseColumnMergeReader( - path_set_info.sub_path_set, std::unique_ptr(inner_iter), - std::move(src_subcolumns_for_sparse), const_cast(opts)); + *iterator = new SparseColumnMergeReader(path_set_info.sub_path_set, + std::unique_ptr(inner_iter), + std::move(src_subcolumns_for_sparse), + const_cast(opts), target_col); return Status::OK(); } @@ -385,7 +386,7 @@ Status VariantColumnReader::_new_iterator_with_flat_leaves(ColumnIterator** iter *iterator = new SparseColumnExtractReader( relative_path.get_path(), std::unique_ptr(inner_iter), // need to modify sparse_column_cache, so use const_cast here - const_cast(opts)); + const_cast(opts), target_col); return Status::OK(); } if (relative_path.get_path() == SPARSE_COLUMN_PATH) { @@ -465,8 +466,9 @@ Status VariantColumnReader::new_iterator(ColumnIterator** iterator, const Tablet ColumnIterator* inner_iter; RETURN_IF_ERROR(_sparse_column_reader->new_iterator(&inner_iter)); DCHECK(opt); - *iterator = new SparseColumnExtractReader( - relative_path.get_path(), std::unique_ptr(inner_iter), nullptr); + *iterator = new SparseColumnExtractReader(relative_path.get_path(), + std::unique_ptr(inner_iter), + nullptr, target_col); return Status::OK(); } diff --git a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h index 591b706e0e7a88..5ea7ac59ad78f1 100644 --- a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h +++ b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h @@ -172,7 +172,7 @@ class BaseSparseColumnProcessor : public ColumnIterator { vectorized::MutableColumnPtr _sparse_column; StorageReadOptions* _read_opts; // Shared cache pointer std::unique_ptr _sparse_column_reader; - + const TabletColumn& _col; // Pure virtual method for data processing when encounter existing sparse columns(to be implemented by subclasses) virtual void _process_data_with_existing_sparse_column(vectorized::MutableColumnPtr& dst, size_t num_rows) = 0; @@ -182,8 +182,9 @@ class BaseSparseColumnProcessor : public ColumnIterator { size_t num_rows) = 0; public: - BaseSparseColumnProcessor(std::unique_ptr&& reader, StorageReadOptions* opts) - : _read_opts(opts), _sparse_column_reader(std::move(reader)) { + BaseSparseColumnProcessor(std::unique_ptr&& reader, StorageReadOptions* opts, + const TabletColumn& col) + : _read_opts(opts), _sparse_column_reader(std::move(reader)), _col(col) { _sparse_column = vectorized::ColumnObject::create_sparse_column_fn(); } @@ -208,15 +209,17 @@ class BaseSparseColumnProcessor : public ColumnIterator { Status _process_batch(ReadMethod&& read_method, size_t nrows, vectorized::MutableColumnPtr& dst) { // Cache check and population logic - if (_read_opts && _read_opts->sparse_column_cache && + if (_read_opts && _read_opts->sparse_column_cache[_col.parent_unique_id()] && ColumnReader::is_compaction_reader_type(_read_opts->io_ctx.reader_type)) { - _sparse_column = _read_opts->sparse_column_cache->assume_mutable(); + _sparse_column = + _read_opts->sparse_column_cache[_col.parent_unique_id()]->assume_mutable(); } else { _sparse_column->clear(); RETURN_IF_ERROR(read_method()); if (_read_opts) { - _read_opts->sparse_column_cache = _sparse_column->assume_mutable(); + _read_opts->sparse_column_cache[_col.parent_unique_id()] = + _sparse_column->get_ptr(); } } @@ -231,6 +234,14 @@ class BaseSparseColumnProcessor : public ColumnIterator { } return Status::OK(); } +}; + +// Implementation for path extraction processor +class SparseColumnExtractReader : public BaseSparseColumnProcessor { +public: + SparseColumnExtractReader(std::string_view path, std::unique_ptr reader, + StorageReadOptions* opts, const TabletColumn& col) + : BaseSparseColumnProcessor(std::move(reader), opts, col), _path(path) {} // Batch processing using template method Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool* has_null) override { @@ -248,14 +259,6 @@ class BaseSparseColumnProcessor : public ColumnIterator { }, count, dst); } -}; - -// Implementation for path extraction processor -class SparseColumnExtractReader : public BaseSparseColumnProcessor { -public: - SparseColumnExtractReader(std::string_view path, std::unique_ptr reader, - StorageReadOptions* opts) - : BaseSparseColumnProcessor(std::move(reader), opts), _path(path) {} private: std::string _path; @@ -280,8 +283,8 @@ class SparseColumnMergeReader : public BaseSparseColumnProcessor { SparseColumnMergeReader(const TabletSchema::PathSet& path_map, std::unique_ptr&& sparse_column_reader, SubstreamReaderTree&& src_subcolumns_for_sparse, - StorageReadOptions* opts) - : BaseSparseColumnProcessor(std::move(sparse_column_reader), opts), + StorageReadOptions* opts, const TabletColumn& col) + : BaseSparseColumnProcessor(std::move(sparse_column_reader), opts, col), _src_subcolumn_map(path_map), _src_subcolumns_for_sparse(src_subcolumns_for_sparse) {} Status init(const ColumnIteratorOptions& opts) override; diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index a9db0a8130782b..46af14e7479b55 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -2024,7 +2024,7 @@ Status SegmentIterator::_next_batch_internal(vectorized::Block* block) { bool is_mem_reuse = block->mem_reuse(); DCHECK(is_mem_reuse); // Clear the sparse column cache before processing a new batch - _opts.sparse_column_cache = nullptr; + _opts.sparse_column_cache.clear(); SCOPED_RAW_TIMER(&_opts.stats->block_load_ns); if (UNLIKELY(!_lazy_inited)) { RETURN_IF_ERROR(_lazy_init()); diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index 1720b0ddba45ad..8d38964dd872b5 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -809,7 +809,7 @@ Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_po // caculate stats for variant type // TODO it's tricky here, maybe come up with a better idea - _maybe_calculate_variant_stats(block, id, cid); + _maybe_calculate_variant_stats(block, id, cid, row_pos, num_rows); } if (_has_key) { if (_is_mow_with_cluster_key()) { @@ -1329,8 +1329,11 @@ inline bool SegmentWriter::_is_mow_with_cluster_key() { // Compaction will extend sparse column and is visible during read and write, in order to // persit variant stats info, we should do extra caculation during flushing segment, otherwise // the info is lost -void SegmentWriter::_maybe_calculate_variant_stats(const vectorized::Block* block, size_t id, - size_t cid) { +void SegmentWriter::_maybe_calculate_variant_stats( + const vectorized::Block* block, + size_t id, // id is the offset of the column in the block + size_t cid, // cid is the column id in TabletSchema + size_t row_pos, size_t num_rows) { // Only process sparse columns during compaction if (!_tablet_schema->columns()[cid]->is_sparse_column() || _opts.write_type != DataWriteType::TYPE_COMPACTION) { @@ -1351,7 +1354,8 @@ void SegmentWriter::_maybe_calculate_variant_stats(const vectorized::Block* bloc // Found matching column, calculate statistics auto* stats = column.mutable_variant_statistics(); - vectorized::schema_util::calculate_variant_stats(*block->get_by_position(id).column, stats); + vectorized::schema_util::calculate_variant_stats(*block->get_by_position(id).column, stats, + row_pos, num_rows); VLOG_DEBUG << "sparse stats columns " << stats->sparse_column_non_null_size_size(); break; diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h b/be/src/olap/rowset/segment_v2/segment_writer.h index f505aaeaebb578..1f6427594132e0 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.h +++ b/be/src/olap/rowset/segment_v2/segment_writer.h @@ -175,7 +175,8 @@ class SegmentWriter { Status _write_footer(); Status _write_raw_data(const std::vector& slices); void _maybe_invalid_row_cache(const std::string& key); - void _maybe_calculate_variant_stats(const vectorized::Block* block, size_t id, size_t cid); + void _maybe_calculate_variant_stats(const vectorized::Block* block, size_t id, size_t cid, + size_t row_pos, size_t num_rows); std::string _encode_keys(const std::vector& key_columns, size_t pos); // used for unique-key with merge on write and segment min_max key diff --git a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp index 5c57db390dec32..34fe6e085ecdc4 100644 --- a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp +++ b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp @@ -83,9 +83,10 @@ Status _create_column_writer(uint32_t cid, const TabletColumn& column, opt->need_bloom_filter = column.is_bf_column(); opt->need_bitmap_index = column.has_bitmap_index(); const auto& index = tablet_schema->inverted_index(column.parent_unique_id()); - VLOG_DEBUG << "column: " << column.name() << " need_inverted_index: " << opt->need_inverted_index - << " need_bloom_filter: " << opt->need_bloom_filter - << " need_bitmap_index: " << opt->need_bitmap_index; + VLOG_DEBUG << "column: " << column.name() + << " need_inverted_index: " << opt->need_inverted_index + << " need_bloom_filter: " << opt->need_bloom_filter + << " need_bitmap_index: " << opt->need_bitmap_index; // init inverted index if (index != nullptr && @@ -660,8 +661,9 @@ Status VariantSubcolumnWriter::finalize() { _opts.rowset_ctx->tablet_schema->column_by_uid(_tablet_column->parent_unique_id()); // refresh opts and get writer with flush column vectorized::schema_util::inherit_column_attributes(parent_column, flush_column); - VLOG_DEBUG << "parent_column: " << parent_column.name() << " flush_column: " - << flush_column.name() << " is_bf_column: " << parent_column.is_bf_column() << " " + VLOG_DEBUG << "parent_column: " << parent_column.name() + << " flush_column: " << flush_column.name() + << " is_bf_column: " << parent_column.is_bf_column() << " " << flush_column.is_bf_column(); RETURN_IF_ERROR(_create_column_writer( 0, flush_column, _opts.rowset_ctx->tablet_schema, _opts.inverted_index_file_writer, diff --git a/be/src/vec/common/schema_util.cpp b/be/src/vec/common/schema_util.cpp index 50c8d0649fe609..59d6d9bc1e9c94 100644 --- a/be/src/vec/common/schema_util.cpp +++ b/be/src/vec/common/schema_util.cpp @@ -724,6 +724,36 @@ void get_subpaths(const TabletColumn& variant, } } +Status check_path_stats(const std::vector& intputs, RowsetSharedPtr output, + int64_t tablet_id) { + std::unordered_map original_uid_to_path_stats; + for (const auto& rs : intputs) { + RETURN_IF_ERROR(collect_path_stats(rs, original_uid_to_path_stats)); + } + std::unordered_map output_uid_to_path_stats; + RETURN_IF_ERROR(collect_path_stats(output, output_uid_to_path_stats)); + for (const auto& [uid, stats] : original_uid_to_path_stats) { + if (output_uid_to_path_stats.find(uid) == output_uid_to_path_stats.end()) { + return Status::InternalError("Path stats not found for uid {}, tablet_id {}", uid, + tablet_id); + } + if (stats.size() != output_uid_to_path_stats.at(uid).size()) { + return Status::InternalError("Path stats size not match for uid {}, tablet_id {}", uid, + tablet_id); + } + for (const auto& [path, size] : stats) { + if (output_uid_to_path_stats.at(uid).at(path) != size) { + return Status::InternalError( + "Path stats not match for uid {} with path `{}`, input size {}, output " + "size {}, " + "tablet_id {}", + uid, path, size, output_uid_to_path_stats.at(uid).at(path), tablet_id); + } + } + } + return Status::OK(); +} + // Build the temporary schema for compaction // 1. collect path stats from all rowsets // 2. get the subpaths and sparse paths for each unique id @@ -763,7 +793,8 @@ Status get_compaction_schema(const std::vector& rowsets, subcolumn.set_name(column->name_lower_case() + "." + subpath.to_string()); subcolumn.set_type(FieldType::OLAP_FIELD_TYPE_VARIANT); subcolumn.set_parent_unique_id(column->unique_id()); - subcolumn.set_path_info(PathInData(column->name_lower_case() + "." + subpath.to_string())); + subcolumn.set_path_info( + PathInData(column->name_lower_case() + "." + subpath.to_string())); subcolumn.set_aggregation_method(column->aggregation()); subcolumn.set_variant_max_subcolumns_count(column->variant_max_subcolumns_count()); subcolumn.set_is_nullable(true); @@ -783,7 +814,8 @@ Status get_compaction_schema(const std::vector& rowsets, // Calculate statistics about variant data paths from the encoded sparse column void calculate_variant_stats(const IColumn& encoded_sparse_column, - segment_v2::VariantStatisticsPB* stats) { + segment_v2::VariantStatisticsPB* stats, size_t row_pos, + size_t num_rows) { // Cast input column to ColumnMap type since sparse column is stored as a map const auto& map_column = assert_cast(encoded_sparse_column); @@ -793,21 +825,25 @@ void calculate_variant_stats(const IColumn& encoded_sparse_column, // Get the keys column which contains the paths as strings const auto& sparse_data_paths = assert_cast(map_column.get_keys_ptr().get()); - + const auto& serialized_sparse_column_offsets = + assert_cast(map_column.get_offsets()); // Iterate through all paths in the sparse column - for (size_t i = 0; i != sparse_data_paths->size(); ++i) { - auto path = sparse_data_paths->get_data_at(i); - - // If path already exists in statistics, increment its count - if (auto it = sparse_data_paths_statistics.find(path); - it != sparse_data_paths_statistics.end()) { - ++it->second; - } - // If path doesn't exist and we haven't hit the max statistics size limit, - // add it with count 1 - else if (sparse_data_paths_statistics.size() < - VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE) { - sparse_data_paths_statistics.emplace(path, 1); + for (size_t i = row_pos; i != row_pos + num_rows; ++i) { + size_t offset = serialized_sparse_column_offsets[i - 1]; + size_t end = serialized_sparse_column_offsets[i]; + for (size_t j = offset; j != end; ++j) { + auto path = sparse_data_paths->get_data_at(j); + // If path already exists in statistics, increment its count + if (auto it = sparse_data_paths_statistics.find(path); + it != sparse_data_paths_statistics.end()) { + ++it->second; + } + // If path doesn't exist and we haven't hit the max statistics size limit, + // add it with count 1 + else if (sparse_data_paths_statistics.size() < + VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE) { + sparse_data_paths_statistics.emplace(path, 1); + } } } @@ -815,13 +851,11 @@ void calculate_variant_stats(const IColumn& encoded_sparse_column, // This maps each path string to its frequency count for (const auto& [path, size] : sparse_data_paths_statistics) { const auto& sparse_path = path.to_string(); - auto it = stats->sparse_column_non_null_size().find(sparse_path); - if (it == stats->sparse_column_non_null_size().end()) { - stats->mutable_sparse_column_non_null_size()->emplace(sparse_path, size); + auto& count_map = *stats->mutable_sparse_column_non_null_size(); + if (auto it = count_map.find(sparse_path); it != count_map.end()) { + it->second += size; } else { - size_t original_size = it->second; - stats->mutable_sparse_column_non_null_size()->emplace(sparse_path, - original_size + size); + count_map.emplace(sparse_path, size); } } } diff --git a/be/src/vec/common/schema_util.h b/be/src/vec/common/schema_util.h index 6e3d049f1998da..a4101883fc97ae 100644 --- a/be/src/vec/common/schema_util.h +++ b/be/src/vec/common/schema_util.h @@ -136,8 +136,14 @@ TabletColumn create_sparse_column(const TabletColumn& variant); // Build the temporary schema for compaction, this will reduce the memory usage of compacting variant columns Status get_compaction_schema(const std::vector& rowsets, TabletSchemaSPtr& target); +// Check if the path stats are consistent between inputs rowsets and output rowset. +// Used to check the correctness of compaction. +Status check_path_stats(const std::vector& intputs, RowsetSharedPtr output, + int64_t tablet_id); + // Calculate statistics about variant data paths from the encoded sparse column void calculate_variant_stats(const IColumn& encoded_sparse_column, - segment_v2::VariantStatisticsPB* stats); + segment_v2::VariantStatisticsPB* stats, size_t row_pos, + size_t num_rows); } // namespace doris::vectorized::schema_util