From aeb68ceb1c8c78c492d1b0bb6604a10e33985578 Mon Sep 17 00:00:00 2001 From: zhangyifan27 Date: Mon, 8 Dec 2025 11:44:12 +0800 Subject: [PATCH] IMPALA-4568: Support FooterCache for parquet and orc files Change-Id: Ie3f0ecc082745a811fb35e347597b82c198d4e65 --- be/src/exec/orc/hdfs-orc-scanner.cc | 110 +++++++- be/src/exec/orc/hdfs-orc-scanner.h | 36 ++- be/src/exec/parquet/hdfs-parquet-scanner.cc | 241 +++++++++++------ be/src/exec/parquet/hdfs-parquet-scanner.h | 36 ++- be/src/exec/parquet/parquet-column-readers.cc | 2 +- be/src/exec/parquet/parquet-column-readers.h | 2 +- be/src/exec/parquet/parquet-page-index.cc | 2 +- be/src/runtime/exec-env.cc | 2 +- be/src/runtime/exec-env.h | 1 + be/src/runtime/io/CMakeLists.txt | 3 + be/src/runtime/io/disk-io-mgr.cc | 28 ++ be/src/runtime/io/disk-io-mgr.h | 9 + be/src/runtime/io/footer-cache-test.cc | 218 ++++++++++++++++ be/src/runtime/io/footer-cache.cc | 247 ++++++++++++++++++ be/src/runtime/io/footer-cache.h | 155 +++++++++++ be/src/util/impalad-metrics.cc | 43 +++ be/src/util/impalad-metrics.h | 27 ++ common/thrift/metrics.json | 60 +++++ .../queries/QueryTest/footer-cache.test | 61 +++++ tests/custom_cluster/test_data_cache.py | 2 + tests/custom_cluster/test_footer_cache.py | 157 +++++++++++ 21 files changed, 1356 insertions(+), 86 deletions(-) create mode 100644 be/src/runtime/io/footer-cache-test.cc create mode 100644 be/src/runtime/io/footer-cache.cc create mode 100644 be/src/runtime/io/footer-cache.h create mode 100644 testdata/workloads/functional-query/queries/QueryTest/footer-cache.test create mode 100644 tests/custom_cluster/test_footer_cache.py diff --git a/be/src/exec/orc/hdfs-orc-scanner.cc b/be/src/exec/orc/hdfs-orc-scanner.cc index a8c15126dd..a7e57a8f15 100644 --- a/be/src/exec/orc/hdfs-orc-scanner.cc +++ b/be/src/exec/orc/hdfs-orc-scanner.cc @@ -27,6 +27,7 @@ #include "exprs/scalar-expr.h" #include "runtime/collection-value-builder.h" #include "runtime/exec-env.h" +#include "runtime/io/footer-cache.h" #include "runtime/io/request-context.h" #include "runtime/mem-tracker.h" #include "runtime/runtime-filter.inline.h" @@ -329,6 +330,10 @@ Status HdfsOrcScanner::Open(ScannerContext* context) { num_pushed_down_runtime_filters_counter_ = ADD_COUNTER(scan_node_->runtime_profile(), "NumPushedDownRuntimeFilters", TUnit::UNIT); + num_footer_cache_hits_counter_ = + ADD_COUNTER(scan_node_->runtime_profile(), "NumOrcFooterCacheHits", TUnit::UNIT); + num_footer_cache_misses_counter_ = + ADD_COUNTER(scan_node_->runtime_profile(), "NumOrcFooterCacheMisses", TUnit::UNIT); codegend_process_scratch_batch_fn_ = scan_node_->GetCodegenFn(THdfsFileFormat::ORC); if (codegend_process_scratch_batch_fn_ == nullptr) { @@ -503,7 +508,59 @@ void HdfsOrcScanner::Close(RowBatch* row_batch) { CloseInternal(); } -Status HdfsOrcScanner::ProcessFileTail() { +Status HdfsOrcScanner::TryGetFooterFromCache(FooterCache* footer_cache, + std::string* cached_footer, bool* cache_hit) { + DCHECK(footer_cache != nullptr); + DCHECK(cached_footer != nullptr); + DCHECK(cache_hit != nullptr); + + *cache_hit = false; + const HdfsFileDesc* file_desc = stream_->file_desc(); + FooterCacheValue cached_value = + footer_cache->GetFooter(file_desc->filename, file_desc->mtime); + + // Check if we got an ORC footer from cache + if (!std::holds_alternative(cached_value)) { + return Status::OK(); + } + *cached_footer = std::get(cached_value); + if (cached_footer->empty()) { + return Status::OK(); + } + + COUNTER_ADD(num_footer_cache_hits_counter_, 1); + VLOG(2) << "Footer cache hit for ORC file: " << filename(); + *cache_hit = true; + return Status::OK(); +} + +Status HdfsOrcScanner::CreateOrcReaderWithCachedFooter( + const std::string& cached_footer) { + try { + unique_ptr input_stream(new ScanRangeInputStream(this)); + VLOG_FILE << "Processing FileTail of ORC file: " << input_stream->getName() + << ", file_length: " << input_stream->getLength(); + reader_options_.setSerializedFileTail(cached_footer); + reader_ = orc::createReader(move(input_stream), reader_options_); + + // When using cached footer, we need to position the stream as if we had read + // the footer from disk. Skip to the end of the footer range to match the state + // that would exist after reading the footer from disk. + int64_t bytes_to_skip = reader_->getSerializedFileTail().length(); + if (bytes_to_skip > 0) { + Status status; + if (!stream_->SkipBytes(bytes_to_skip, &status)) { + return Status(Substitute("Failed to position stream after cached footer: $0", + status.GetDetail())); + } + VLOG(2) << Substitute("Skipped $0 bytes to position stream at footer start after " + "using cached footer", bytes_to_skip); + } + } RETURN_ON_ORC_EXCEPTION("Encountered parse error in tail of ORC file $0: $1"); + return Status::OK(); +} + +Status HdfsOrcScanner::CreateOrcReaderFromDisk() { try { // ScanRangeInputStream keeps a pointer to this HdfsOrcScanner so we can hack // async IO behind the orc::InputStream interface. The ranges of the @@ -513,8 +570,28 @@ Status HdfsOrcScanner::ProcessFileTail() { << ", file_length: " << input_stream->getLength(); reader_ = orc::createReader(move(input_stream), reader_options_); } RETURN_ON_ORC_EXCEPTION("Encountered parse error in tail of ORC file $0: $1"); + return Status::OK(); +} - if (reader_->getNumberOfRows() == 0) return Status::OK(); +void HdfsOrcScanner::CacheOrcFooter(FooterCache* footer_cache) { + DCHECK(footer_cache != nullptr); + DCHECK(reader_ != nullptr); + + const HdfsFileDesc* file_desc = stream_->file_desc(); + std::string serialized_tail = reader_->getSerializedFileTail(); + Status cache_status = footer_cache->PutOrcFooter( + file_desc->filename, file_desc->mtime, serialized_tail); + if (!cache_status.ok()) { + // Cache insertion failure is not fatal, just log it + VLOG(2) << "Failed to cache ORC footer for file '" << filename() + << "': " << cache_status.GetDetail(); + } +} + +Status HdfsOrcScanner::ValidateOrcReader() { + DCHECK(reader_ != nullptr); + + if (reader_->getNumberOfRows() == 0) return Status::OK(); if (reader_->getNumberOfStripes() == 0) { return Status(Substitute("Invalid ORC file: $0. No stripes in this file but" " numberOfRows in footer is $1", filename(), reader_->getNumberOfRows())); @@ -522,6 +599,35 @@ Status HdfsOrcScanner::ProcessFileTail() { return Status::OK(); } +Status HdfsOrcScanner::ProcessFileTail() { + FooterCache* footer_cache = ExecEnv::GetInstance()->disk_io_mgr()->footer_cache(); + + // Try to get footer from cache first if footer cache is enabled + if (footer_cache != nullptr) { + std::string cached_footer; + bool cache_hit = false; + RETURN_IF_ERROR(TryGetFooterFromCache(footer_cache, &cached_footer, &cache_hit)); + if (cache_hit) { + RETURN_IF_ERROR(CreateOrcReaderWithCachedFooter(cached_footer)); + return ValidateOrcReader(); + } + } + + // Cache miss or cache disabled - read footer from disk + if (footer_cache != nullptr) { + COUNTER_ADD(num_footer_cache_misses_counter_, 1); + } + + RETURN_IF_ERROR(CreateOrcReaderFromDisk()); + + // Write to cache if footer cache is enabled + if (footer_cache != nullptr) { + CacheOrcFooter(footer_cache); + } + + return ValidateOrcReader(); +} + inline THdfsCompression::type HdfsOrcScanner::TranslateCompressionKind( orc::CompressionKind kind) { switch (kind) { diff --git a/be/src/exec/orc/hdfs-orc-scanner.h b/be/src/exec/orc/hdfs-orc-scanner.h index a066105de1..99c4a2f1bb 100644 --- a/be/src/exec/orc/hdfs-orc-scanner.h +++ b/be/src/exec/orc/hdfs-orc-scanner.h @@ -25,6 +25,7 @@ #include "runtime/exec-env.h" #include "runtime/io/disk-io-mgr.h" +#include "runtime/io/footer-cache.h" #include "runtime/runtime-state.h" #include "exec/acid-metadata-utils.h" #include "exec/hdfs-columnar-scanner.h" @@ -38,6 +39,10 @@ struct HdfsFileDesc; class OrcStructReader; class OrcComplexColumnReader; +namespace io { + class FooterCache; +} + /// This scanner leverage the ORC library to parse ORC files located in HDFS. Data is /// transformed into Impala in-memory representation (i.e. Tuples, RowBatches) by /// different kinds of OrcColumnReaders. @@ -151,13 +156,13 @@ class HdfsOrcScanner : public HdfsColumnarScanner { return filename_; } + /// Default read implementation for non async IO. + Status readRandom(void* buf, uint64_t length, uint64_t offset); + private: HdfsOrcScanner* scanner_; const HdfsFileDesc* file_desc_; std::string filename_; - - /// Default read implementation for non async IO. - Status readRandom(void* buf, uint64_t length, uint64_t offset); }; HdfsOrcScanner(HdfsScanNodeBase* scan_node, RuntimeState* state); @@ -307,6 +312,12 @@ class HdfsOrcScanner : public HdfsColumnarScanner { /// Number of runtime filters that are pushed down to the ORC reader. RuntimeProfile::Counter* num_pushed_down_runtime_filters_counter_ = nullptr; + /// Number of footer cache hits. + RuntimeProfile::Counter* num_footer_cache_hits_counter_ = nullptr; + + /// Number of footer cache misses. + RuntimeProfile::Counter* num_footer_cache_misses_counter_ = nullptr; + /// Number of arrived runtime IN-list filters that can be pushed down. /// Used in ShouldUpdateSearchArgument(). Init to -1 so the check can pass at first. int num_pushable_in_list_filters_ = -1; @@ -359,6 +370,25 @@ class HdfsOrcScanner : public HdfsColumnarScanner { /// last ORC_FOOTER_SIZE bytes in context_. Status ProcessFileTail() WARN_UNUSED_RESULT; + /// Helper methods for ProcessFileTail() to reduce complexity: + + /// Try to get ORC footer from cache. Sets cache_hit to true if cache hit, + /// false if cache miss. Returns error status on failure. + Status TryGetFooterFromCache(io::FooterCache* footer_cache, + std::string* cached_footer, bool* cache_hit) WARN_UNUSED_RESULT; + /// Create ORC reader using cached footer. + Status CreateOrcReaderWithCachedFooter(const std::string& cached_footer) + WARN_UNUSED_RESULT; + + /// Create ORC reader by reading footer from disk. + Status CreateOrcReaderFromDisk() WARN_UNUSED_RESULT; + + /// Cache the ORC footer after reading from disk. + void CacheOrcFooter(io::FooterCache* footer_cache); + + /// Validate the ORC reader after creation. + Status ValidateOrcReader() WARN_UNUSED_RESULT; + /// Resolve SchemaPath in TupleDescriptors and translate them to ORC type ids into /// 'selected_nodes'. Track the position slots by pre-order traversal in the /// descriptors and push them to a stack as 'pos_slots'. diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc index d569358216..2feee0bb50 100644 --- a/be/src/exec/parquet/hdfs-parquet-scanner.cc +++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc @@ -43,6 +43,7 @@ #include "runtime/collection-value-builder.h" #include "runtime/exec-env.h" #include "runtime/io/disk-io-mgr.h" +#include "runtime/io/footer-cache.h" #include "runtime/io/request-context.h" #include "runtime/runtime-filter.inline.h" #include "runtime/runtime-state.h" @@ -117,6 +118,8 @@ HdfsParquetScanner::HdfsParquetScanner(HdfsScanNodeBase* scan_node, RuntimeState num_row_groups_counter_(nullptr), num_minmax_filtered_pages_counter_(nullptr), num_dict_filtered_row_groups_counter_(nullptr), + num_footer_cache_hits_counter_(nullptr), + num_footer_cache_misses_counter_(nullptr), parquet_compressed_page_size_counter_(nullptr), parquet_uncompressed_page_size_counter_(nullptr), coll_items_read_counter_(0), @@ -159,6 +162,10 @@ Status HdfsParquetScanner::Open(ScannerContext* context) { ADD_COUNTER(scan_node_->runtime_profile(), "NumTopLevelValuesSkipped", TUnit::UNIT); num_dict_filtered_row_groups_counter_ = ADD_COUNTER(scan_node_->runtime_profile(), "NumDictFilteredRowGroups", TUnit::UNIT); + num_footer_cache_hits_counter_ = + ADD_COUNTER(scan_node_->runtime_profile(), "ParquetFooterCacheHits", TUnit::UNIT); + num_footer_cache_misses_counter_ = + ADD_COUNTER(scan_node_->runtime_profile(), "ParquetFooterCacheMisses", TUnit::UNIT); parquet_compressed_page_size_counter_ = ADD_SUMMARY_STATS_COUNTER( scan_node_->runtime_profile(), "ParquetCompressedPageSize", TUnit::BYTES); parquet_uncompressed_page_size_counter_ = ADD_SUMMARY_STATS_COUNTER( @@ -242,7 +249,7 @@ Status HdfsParquetScanner::Open(ScannerContext* context) { file_metadata_utils_, state_->query_options().parquet_fallback_schema_resolution, state_->query_options().parquet_array_resolution)); - RETURN_IF_ERROR(schema_resolver_->Init(&file_metadata_, filename())); + RETURN_IF_ERROR(schema_resolver_->Init(file_metadata_.get(), filename())); // We've processed the metadata and there are columns that need to be materialized. RETURN_IF_ERROR(CreateColumnReaders( @@ -478,14 +485,14 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) { RETURN_IF_ERROR( RowBatch::ResizeAndAllocateTupleBuffer(state_, row_batch->tuple_data_pool(), row_batch->row_desc()->GetRowSize(), &capacity, &tuple_buf_size, &tuple_buf)); - if (file_metadata_.num_rows > 0) { + if (file_metadata_->num_rows > 0) { COUNTER_ADD(num_file_metadata_read_, 1); Tuple* dst_tuple = reinterpret_cast(tuple_buf); TupleRow* dst_row = row_batch->GetRow(row_batch->AddRow()); InitTuple(template_tuple_, dst_tuple); int64_t* dst_slot = dst_tuple->GetBigIntSlot(scan_node_->count_star_slot_offset()); *dst_slot = 0; - for (const auto &row_group : file_metadata_.row_groups) { + for (const auto &row_group : file_metadata_->row_groups) { *dst_slot += row_group.num_rows; } dst_row->SetTuple(0, dst_tuple); @@ -499,14 +506,14 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) { // There are no materialized slots and we are not optimizing count(*), e.g. // "select 1 from alltypes". We can serve this query from just the file metadata. // We don't need to read the column data. - if (row_group_rows_read_ == file_metadata_.num_rows) { + if (row_group_rows_read_ == file_metadata_->num_rows) { eos_ = true; return Status::OK(); } COUNTER_ADD(num_file_metadata_read_, 1); assemble_rows_timer_.Start(); - DCHECK_LE(row_group_rows_read_, file_metadata_.num_rows); - int64_t rows_remaining = file_metadata_.num_rows - row_group_rows_read_; + DCHECK_LE(row_group_rows_read_, file_metadata_->num_rows); + int64_t rows_remaining = file_metadata_->num_rows - row_group_rows_read_; int max_tuples = min(row_batch->capacity(), rows_remaining); TupleRow* current_row = row_batch->GetRow(row_batch->AddRow()); int num_to_commit = WriteTemplateTuples(current_row, max_tuples); @@ -537,8 +544,8 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) { if (!status.ok()) RETURN_IF_ERROR(state_->LogOrReturnError(status.msg())); } RETURN_IF_ERROR(NextRowGroup()); - DCHECK_LE(row_group_idx_, file_metadata_.row_groups.size()); - if (row_group_idx_ == file_metadata_.row_groups.size()) { + DCHECK_LE(row_group_idx_, file_metadata_->row_groups.size()); + if (row_group_idx_ == file_metadata_->row_groups.size()) { eos_ = true; DCHECK(parse_status_.ok()); return Status::OK(); @@ -604,7 +611,7 @@ ColumnStatsReader HdfsParquetScanner::CreateStatsReader( int col_idx = node->col_idx; DCHECK_LT(col_idx, row_group.columns.size()); - const vector& col_orders = file_metadata_.column_orders; + const vector& col_orders = file_metadata_->column_orders; const parquet::ColumnOrder* col_order = col_idx < col_orders.size() ? &col_orders[col_idx] : nullptr; @@ -911,7 +918,7 @@ Status HdfsParquetScanner::NextRowGroup() { DCHECK_EQ(0, context_->NumStreams()); ++row_group_idx_; - if (row_group_idx_ >= file_metadata_.row_groups.size()) { + if (row_group_idx_ >= file_metadata_->row_groups.size()) { if (start_with_first_row_group && misaligned_row_group_skipped) { // We started with the first row group and skipped all the row groups because // they were misaligned. The execution flow won't reach this point if there is at @@ -920,17 +927,16 @@ Status HdfsParquetScanner::NextRowGroup() { } break; } - const parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_]; - // Also check 'file_metadata_.num_rows' to make sure 'select count(*)' and 'select *' - // behave consistently for corrupt files that have 'file_metadata_.num_rows == 0' - // but some data in row groups. - if (row_group.num_rows == 0 || file_metadata_.num_rows == 0) continue; - + const parquet::RowGroup& row_group = file_metadata_->row_groups[row_group_idx_]; + // Also check 'file_metadata_->num_rows' to make sure 'select count(*)' and 'select *' + // behave consistently for corrupt files that have 'file_metadata_->num_rows == 0' + // but non-empty row groups. + if (row_group.num_rows == 0 || file_metadata_->num_rows == 0) continue; // Let's find the index of the first row in this row group. It's needed to track the // file position of each row. int64_t row_group_first_row = 0; for (int i = 0; i < row_group_idx_; ++i) { - const parquet::RowGroup& row_group = file_metadata_.row_groups[i]; + const parquet::RowGroup& row_group = file_metadata_->row_groups[i]; row_group_first_row += row_group.num_rows; } @@ -1477,7 +1483,7 @@ Status HdfsParquetScanner::FindSkipRangesForPagesWithMinMaxFilters( } min_max_tuple_->Init(min_max_tuple_desc->byte_size()); - parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_]; + parquet::RowGroup& row_group = file_metadata_->row_groups[row_group_idx_]; int filtered_pages = 0; @@ -1580,7 +1586,7 @@ Status HdfsParquetScanner::FindSkipRangesForPagesWithMinMaxFilters( } Status HdfsParquetScanner::EvaluatePageIndex() { - parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_]; + parquet::RowGroup& row_group = file_metadata_->row_groups[row_group_idx_]; vector skip_ranges; for (int i = 0; i < stats_conjunct_evals_.size(); ++i) { @@ -1670,7 +1676,7 @@ Status HdfsParquetScanner::EvaluatePageIndex() { Status HdfsParquetScanner::ComputeCandidatePagesForColumns() { if (candidate_ranges_.empty()) return Status::OK(); - parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_]; + parquet::RowGroup& row_group = file_metadata_->row_groups[row_group_idx_]; for (BaseScalarColumnReader* scalar_reader : scalar_readers_) { const auto& page_locations = scalar_reader->offset_index_.page_locations; if (!ComputeCandidatePages(page_locations, candidate_ranges_, row_group.num_rows, @@ -2758,32 +2764,33 @@ inline bool HdfsParquetScanner::ReadCollectionItem( return continue_execution; } -Status HdfsParquetScanner::ProcessFooter() { - const int64_t file_len = stream_->file_desc()->file_length; - const int64_t scan_range_len = stream_->scan_range()->len(); +bool HdfsParquetScanner::TryGetFooterFromCache() { + FooterCache* footer_cache = ExecEnv::GetInstance()->disk_io_mgr()->footer_cache(); + if (footer_cache == nullptr) return false; - // We're processing the scan range issued in IssueInitialRanges(). The scan range should - // be the last FOOTER_BYTES of the file. !success means the file is shorter than we - // expect. Note we can't detect if the file is larger than we expect without attempting - // to read past the end of the scan range, but in this case we'll fail below trying to - // parse the footer. - DCHECK_LE(scan_range_len, PARQUET_FOOTER_SIZE); - uint8_t* buffer; - bool success = stream_->ReadBytes(scan_range_len, &buffer, &parse_status_); - if (!success) { - DCHECK(!parse_status_.ok()); - if (parse_status_.code() == TErrorCode::SCANNER_INCOMPLETE_READ) { - VLOG_QUERY << "Metadata for file '" << filename() << "' appears stale: " - << "metadata states file size to be " - << PrettyPrinter::Print(file_len, TUnit::BYTES) - << ", but could only read " - << PrettyPrinter::Print(stream_->total_bytes_returned(), TUnit::BYTES); - return Status(TErrorCode::STALE_METADATA_FILE_TOO_SHORT, filename(), - scan_node_->hdfs_table()->fully_qualified_name()); + const HdfsFileDesc* file_desc = stream_->file_desc(); + FooterCacheValue cached_value = + footer_cache->GetFooter(file_desc->filename, file_desc->mtime); + + // Check if we got a Parquet footer from cache + if (std::holds_alternative>(cached_value)) { + auto cached_metadata = std::get>(cached_value); + if (cached_metadata != nullptr) { + // Cache hit - share the cached FileMetaData, skip disk I/O and deserialization + file_metadata_ = cached_metadata; + COUNTER_ADD(num_footer_cache_hits_counter_, 1); + VLOG(2) << "Footer cache hit for file: " << filename(); + return true; } - return parse_status_; } - DCHECK(stream_->eosr()); + + COUNTER_ADD(num_footer_cache_misses_counter_, 1); + return false; +} + +Status HdfsParquetScanner::ValidateFooterBuffer(int64_t scan_range_len, uint8_t* buffer, + uint8_t** metadata_ptr, uint32_t* metadata_size, int64_t* metadata_start) { + const int64_t file_len = stream_->file_desc()->file_length; // Number of bytes in buffer after the fixed size footer is accounted for. int remaining_bytes_buffered = scan_range_len - sizeof(int32_t) - @@ -2807,23 +2814,93 @@ Status HdfsParquetScanner::ProcessFooter() { // The size of the metadata is encoded as a 4 byte little endian value before // the magic number uint8_t* metadata_size_ptr = magic_number_ptr - sizeof(int32_t); - uint32_t metadata_size = *reinterpret_cast(metadata_size_ptr); + *metadata_size = *reinterpret_cast(metadata_size_ptr); + // The start of the metadata is: // file_len - 4-byte footer length field - 4-byte version number field - metadata size - int64_t metadata_start = file_len - sizeof(int32_t) - sizeof(PARQUET_VERSION_NUMBER) - - metadata_size; - if (UNLIKELY(metadata_start < 0)) { + *metadata_start = file_len - sizeof(int32_t) - sizeof(PARQUET_VERSION_NUMBER) - + *metadata_size; + if (UNLIKELY(*metadata_start < 0)) { return Status(Substitute("File '$0' is invalid. Invalid metadata size in file " "footer: $1 bytes. File size: $2 bytes.", - filename(), metadata_size, file_len)); + filename(), *metadata_size, file_len)); + } + + *metadata_ptr = metadata_size_ptr - *metadata_size; + + return Status::OK(); +} + +Status HdfsParquetScanner::DeserializeFooterMetadata(uint8_t* metadata_ptr, + uint32_t metadata_size, int64_t metadata_start) { + const int64_t file_len = stream_->file_desc()->file_length; + + // Deserialize file footer + // TODO: this takes ~7ms for a 1000-column table, figure out how to reduce this. + file_metadata_ = std::make_shared(); + Status status = + DeserializeThriftMsg(metadata_ptr, &metadata_size, true, file_metadata_.get()); + if (!status.ok()) { + return Status(Substitute("File '$0' of length $1 bytes has invalid file metadata " + "at file offset $2, Error = $3.", filename(), file_len, metadata_start, + status.GetDetail())); + } + + // Write to cache if footer cache is enabled + FooterCache* footer_cache = ExecEnv::GetInstance()->disk_io_mgr()->footer_cache(); + if (footer_cache != nullptr) { + const HdfsFileDesc* file_desc = stream_->file_desc(); + // Share the FileMetaData with cache (zero-copy) + Status cache_status = footer_cache->PutParquetFooter( + file_desc->filename, file_desc->mtime, file_metadata_); + if (!cache_status.ok()) { + // Cache insertion failure is not fatal, just log it + VLOG(2) << "Failed to cache footer for file '" << filename() + << "': " << cache_status.GetDetail(); + } + } + + return Status::OK(); +} + +Status HdfsParquetScanner::ReadFooterFromDisk() { + // We're processing the scan range issued in IssueInitialRanges(). The scan range should + // be the last FOOTER_BYTES of the file. !success means the file is shorter than we + // expect. Note we can't detect if the file is larger than we expect without attempting + // to read past the end of the scan range, but in this case we'll fail below trying to + // parse the footer. + const int64_t file_len = stream_->file_desc()->file_length; + const int64_t scan_range_len = stream_->scan_range()->len(); + DCHECK_LE(scan_range_len, PARQUET_FOOTER_SIZE); + + uint8_t* buffer; + bool success = stream_->ReadBytes(scan_range_len, &buffer, &parse_status_); + if (!success) { + DCHECK(!parse_status_.ok()); + if (parse_status_.code() == TErrorCode::SCANNER_INCOMPLETE_READ) { + VLOG_QUERY << "Metadata for file '" << filename() << "' appears stale: " + << "metadata states file size to be " + << PrettyPrinter::Print(file_len, TUnit::BYTES) + << ", but could only read " + << PrettyPrinter::Print(stream_->total_bytes_returned(), TUnit::BYTES); + return Status(TErrorCode::STALE_METADATA_FILE_TOO_SHORT, filename(), + scan_node_->hdfs_table()->fully_qualified_name()); + } + return parse_status_; } - uint8_t* metadata_ptr = metadata_size_ptr - metadata_size; + DCHECK(stream_->eosr()); - // If the metadata was too big, we need to read it into a contiguous buffer before - // deserializing it. + uint8_t* metadata_ptr; + uint32_t metadata_size; + int64_t metadata_start; + RETURN_IF_ERROR(ValidateFooterBuffer(scan_range_len, buffer, &metadata_ptr, + &metadata_size, &metadata_start)); + + // Check if we need to read additional metadata + int remaining_bytes_buffered = scan_range_len - sizeof(int32_t) - + sizeof(PARQUET_VERSION_NUMBER); ScopedBuffer metadata_buffer(scan_node_->mem_tracker()); - DCHECK(metadata_range_ != nullptr); if (UNLIKELY(metadata_size > remaining_bytes_buffered)) { // In this case, the metadata is bigger than our guess meaning there are // not enough bytes in the footer range from IssueInitialRanges(). @@ -2843,31 +2920,27 @@ Status HdfsParquetScanner::ProcessFooter() { RETURN_IF_ERROR(ReadToBuffer(metadata_start, metadata_ptr, metadata_size)); } - // Deserialize file footer - // TODO: this takes ~7ms for a 1000-column table, figure out how to reduce this. - Status status = - DeserializeThriftMsg(metadata_ptr, &metadata_size, true, &file_metadata_); - if (!status.ok()) { - return Status(Substitute("File '$0' of length $1 bytes has invalid file metadata " - "at file offset $2, Error = $3.", filename(), file_len, metadata_start, - status.GetDetail())); - } + RETURN_IF_ERROR(DeserializeFooterMetadata(metadata_ptr, metadata_size, metadata_start)); + + return Status::OK(); +} - RETURN_IF_ERROR(ParquetMetadataUtils::ValidateFileVersion(file_metadata_, filename())); +Status HdfsParquetScanner::ValidateFileMetadata() { + RETURN_IF_ERROR(ParquetMetadataUtils::ValidateFileVersion(*file_metadata_, filename())); if (VLOG_FILE_IS_ON) { VLOG_FILE << "Parquet metadata for " << filename() << " created by " - << file_metadata_.created_by << ":\n" - << join(file_metadata_.key_value_metadata | transformed( + << file_metadata_->created_by << ":\n" + << join(file_metadata_->key_value_metadata | transformed( [](parquet::KeyValue kv) { return kv.key + "=" + kv.value; }), "\n"); } // IMPALA-3943: Do not throw an error for empty files for backwards compatibility. - if (file_metadata_.num_rows == 0) { + if (file_metadata_->num_rows == 0) { // Warn if the num_rows is inconsistent with the row group metadata. - if (!file_metadata_.row_groups.empty()) { + if (!file_metadata_->row_groups.empty()) { bool has_non_empty_row_group = false; - for (const parquet::RowGroup& row_group : file_metadata_.row_groups) { + for (const parquet::RowGroup& row_group : file_metadata_->row_groups) { if (row_group.num_rows > 0) { has_non_empty_row_group = true; break; @@ -2883,17 +2956,35 @@ Status HdfsParquetScanner::ProcessFooter() { } // Parse out the created by application version string - if (file_metadata_.__isset.created_by) { - file_version_ = ParquetFileVersion(file_metadata_.created_by); + if (file_metadata_->__isset.created_by) { + file_version_ = ParquetFileVersion(file_metadata_->created_by); } - if (file_metadata_.row_groups.empty()) { + + if (file_metadata_->row_groups.empty()) { return Status( Substitute("Invalid file. This file: $0 has no row groups", filename())); } - if (file_metadata_.num_rows < 0) { + + if (file_metadata_->num_rows < 0) { return Status(Substitute("Corrupt Parquet file '$0': negative row count $1 in " - "file metadata", filename(), file_metadata_.num_rows)); + "file metadata", filename(), file_metadata_->num_rows)); } + + return Status::OK(); +} + +Status HdfsParquetScanner::ProcessFooter() { + // Try to get footer from cache first if footer cache is enabled + bool cache_hit = TryGetFooterFromCache(); + + // Cache miss or cache disabled - read footer from disk + if (!cache_hit) { + RETURN_IF_ERROR(ReadFooterFromDisk()); + } + + // Validate the deserialized file metadata + RETURN_IF_ERROR(ValidateFileMetadata()); + return Status::OK(); } @@ -3070,7 +3161,7 @@ Status HdfsParquetScanner::InitScalarColumns(int64_t row_group_first_row) { int64_t partition_id = context_->partition_descriptor()->id(); const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(partition_id, filename()); DCHECK(file_desc != nullptr); - parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_]; + parquet::RowGroup& row_group = file_metadata_->row_groups[row_group_idx_]; // Used to validate that the number of values in each reader in column_readers_ at the // same SchemaElement is the same. @@ -3128,7 +3219,7 @@ Status HdfsParquetScanner::ValidateEndOfRowGroup( // These column readers materialize table-level values (vs. collection values). // Test if the expected number of rows from the file metadata matches the actual // number of rows read from the file. - int64_t expected_rows_in_group = file_metadata_.row_groups[row_group_idx].num_rows; + int64_t expected_rows_in_group = file_metadata_->row_groups[row_group_idx].num_rows; if (rows_read != expected_rows_in_group) { return Status(TErrorCode::PARQUET_GROUP_ROW_COUNT_ERROR, filename(), row_group_idx, expected_rows_in_group, rows_read); @@ -3207,7 +3298,7 @@ ParquetTimestampDecoder HdfsParquetScanner::CreateTimestampDecoder( bool HdfsParquetScanner::GetHiveZoneConversionLegacy() const { string writer_zone_conversion_legacy; string writer_time_zone; - for (const parquet::KeyValue& kv : file_metadata_.key_value_metadata) { + for (const parquet::KeyValue& kv : file_metadata_->key_value_metadata) { if (kv.key == "writer.zone.conversion.legacy") { writer_zone_conversion_legacy = kv.value; } else if (kv.key == "writer.time.zone") { diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.h b/be/src/exec/parquet/hdfs-parquet-scanner.h index dcc0c63096..696f5218a6 100644 --- a/be/src/exec/parquet/hdfs-parquet-scanner.h +++ b/be/src/exec/parquet/hdfs-parquet-scanner.h @@ -444,8 +444,8 @@ class HdfsParquetScanner : public HdfsColumnarScanner { /// Column readers among 'column_readers_' not used for filtering std::vector non_filter_readers_; - /// File metadata thrift object - parquet::FileMetaData file_metadata_; + /// File metadata thrift object (shared_ptr for efficient caching) + std::shared_ptr file_metadata_; /// Version of the application that wrote this file. ParquetFileVersion file_version_; @@ -548,6 +548,12 @@ class HdfsParquetScanner : public HdfsColumnarScanner { /// and runtime bloom filters on the dictionary entries. RuntimeProfile::Counter* num_dict_filtered_row_groups_counter_; + /// Number of times the footer was found in the footer cache. + RuntimeProfile::Counter* num_footer_cache_hits_counter_; + + /// Number of times the footer was not found in the footer cache. + RuntimeProfile::Counter* num_footer_cache_misses_counter_; + /// Tracks the size of any compressed pages read. If no compressed pages are read, this /// counter is empty RuntimeProfile::SummaryStatsCounter* parquet_compressed_page_size_counter_; @@ -804,6 +810,32 @@ class HdfsParquetScanner : public HdfsColumnarScanner { /// last PARQUET_FOOTER_SIZE bytes in context_. Status ProcessFooter() WARN_UNUSED_RESULT; + /// Helper functions for ProcessFooter() to reduce complexity: + + /// Try to get footer from cache. Returns true if cache hit, false otherwise. + /// On cache hit, 'file_metadata_' is populated with cached data. + bool TryGetFooterFromCache(); + + /// Read footer from disk into 'file_metadata_'. Returns OK on success. + Status ReadFooterFromDisk() WARN_UNUSED_RESULT; + + /// Validate the footer buffer and extract metadata pointer and size. + /// 'scan_range_len' is the length of the scan range. + /// 'buffer' points to the buffer containing footer data. + /// On success, sets 'metadata_ptr' to point to metadata and 'metadata_size' to its size. + /// 'metadata_start' is set to the file offset where metadata starts. + Status ValidateFooterBuffer(int64_t scan_range_len, uint8_t* buffer, + uint8_t** metadata_ptr, uint32_t* metadata_size, int64_t* metadata_start) + WARN_UNUSED_RESULT; + + /// Deserialize footer metadata from 'metadata_ptr' of size 'metadata_size'. + /// Populates 'file_metadata_' on success. + Status DeserializeFooterMetadata(uint8_t* metadata_ptr, uint32_t metadata_size, + int64_t metadata_start) WARN_UNUSED_RESULT; + + /// Validate the deserialized file metadata in 'file_metadata_'. + Status ValidateFileMetadata() WARN_UNUSED_RESULT; + /// Populates 'column_readers' for the slots in 'tuple_desc', including creating child /// readers for any collections. Schema resolution is handled in this function as /// well. Fills in the appropriate template tuple slot with NULL for any materialized diff --git a/be/src/exec/parquet/parquet-column-readers.cc b/be/src/exec/parquet/parquet-column-readers.cc index b131b2f2de..5902f4689e 100644 --- a/be/src/exec/parquet/parquet-column-readers.cc +++ b/be/src/exec/parquet/parquet-column-readers.cc @@ -1052,7 +1052,7 @@ Status BaseScalarColumnReader::Reset(const HdfsFileDesc& file_desc, const parquet::ColumnChunk& col_chunk, int row_group_idx, int64_t row_group_first_row) { // Ensure metadata is valid before using it to initialize the reader. - RETURN_IF_ERROR(ParquetMetadataUtils::ValidateRowGroupColumn(parent_->file_metadata_, + RETURN_IF_ERROR(ParquetMetadataUtils::ValidateRowGroupColumn(*parent_->file_metadata_, parent_->filename(), row_group_idx, col_idx(), schema_element(), parent_->state_)); num_buffered_values_ = 0; diff --git a/be/src/exec/parquet/parquet-column-readers.h b/be/src/exec/parquet/parquet-column-readers.h index 73eb45a0d8..b6d72fe6d2 100644 --- a/be/src/exec/parquet/parquet-column-readers.h +++ b/be/src/exec/parquet/parquet-column-readers.h @@ -636,7 +636,7 @@ class BaseScalarColumnReader : public ParquetColumnReader { int64_t LastRowIdxInCurrentPage() const { DCHECK(!candidate_data_pages_.empty()); int64_t num_rows = - parent_->file_metadata_.row_groups[parent_->row_group_idx_].num_rows; + parent_->file_metadata_->row_groups[parent_->row_group_idx_].num_rows; // Find the next valid page. int page_idx = candidate_data_pages_[candidate_page_idx_] + 1; while (page_idx < offset_index_.page_locations.size()) { diff --git a/be/src/exec/parquet/parquet-page-index.cc b/be/src/exec/parquet/parquet-page-index.cc index 9925714ac1..be42fa6fa1 100644 --- a/be/src/exec/parquet/parquet-page-index.cc +++ b/be/src/exec/parquet/parquet-page-index.cc @@ -68,7 +68,7 @@ bool ParquetPageIndex::DeterminePageIndexRangesInRowGroup( Status ParquetPageIndex::ReadAll(int row_group_idx) { DCHECK(page_index_buffer_.buffer() == nullptr); bool has_page_index = DeterminePageIndexRangesInRowGroup( - scanner_->file_metadata_.row_groups[row_group_idx], + scanner_->file_metadata_->row_groups[row_group_idx], &column_index_base_offset_, &column_index_size_, &offset_index_base_offset_, &offset_index_size_); diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc index 6dc9d1bd64..125fa7d0ca 100644 --- a/be/src/runtime/exec-env.cc +++ b/be/src/runtime/exec-env.cc @@ -39,6 +39,7 @@ #include "runtime/hbase-table-factory.h" #include "runtime/hdfs-fs-cache.h" #include "runtime/io/disk-io-mgr.h" +#include "runtime/io/footer-cache.h" #include "runtime/krpc-data-stream-mgr.h" #include "runtime/lib-cache.h" #include "runtime/mem-tracker.h" @@ -106,7 +107,6 @@ DEFINE_int32(admission_control_slots, 0, "value for dedicated coordinators)."); DEFINE_string(codegen_cache_capacity, "1GB", "Specify the capacity of the codegen cache. If set to 0, codegen cache is disabled."); - DEFINE_bool(use_local_catalog, true, "Use the on-demand metadata feature in coordinators. If this is set, coordinators " "pull metadata as needed from catalogd and cache it locally. The cached metadata " diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h index 2043ead77d..18398c726f 100644 --- a/be/src/runtime/exec-env.h +++ b/be/src/runtime/exec-env.h @@ -79,6 +79,7 @@ class TCatalogRegistration; namespace io { class DiskIoMgr; + class FooterCache; } /// Execution environment for Impala daemon. Contains all required global structures, and diff --git a/be/src/runtime/io/CMakeLists.txt b/be/src/runtime/io/CMakeLists.txt index b3a5bd78a2..749d2a2339 100644 --- a/be/src/runtime/io/CMakeLists.txt +++ b/be/src/runtime/io/CMakeLists.txt @@ -26,6 +26,7 @@ add_library(Io disk-io-mgr.cc disk-io-mgr-stress.cc disk-file.cc + footer-cache.cc local-file-system.cc local-file-system-with-fault-injection.cc error-converter.cc @@ -44,6 +45,7 @@ add_library(IoTests STATIC data-cache-trace-test.cc disk-io-mgr-test.cc disk-file-test.cc + footer-cache-test.cc ) add_dependencies(IoTests gen-deps) @@ -62,5 +64,6 @@ target_link_libraries(data-cache-trace-replayer ${IMPALA_TEST_LINK_LIBS}) ADD_UNIFIED_BE_LSAN_TEST(disk-io-mgr-test DiskIoMgrTest.*) ADD_UNIFIED_BE_LSAN_TEST(disk-file-test DiskFileTest.*:DiskFileDeathTest.*) ADD_UNIFIED_BE_LSAN_TEST(data-cache-trace-test DataCacheTraceTest.*) +ADD_UNIFIED_BE_LSAN_TEST(footer-cache-test FooterCacheTest.*) # Exception to unified be: Custom main function (platform tests) ADD_BE_LSAN_TEST(data-cache-test) diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc index 6830fe1a5f..2da01c10f2 100644 --- a/be/src/runtime/io/disk-io-mgr.cc +++ b/be/src/runtime/io/disk-io-mgr.cc @@ -27,6 +27,7 @@ #include "runtime/io/disk-io-mgr-internal.h" #include "runtime/io/error-converter.h" #include "runtime/io/file-writer.h" +#include "runtime/io/footer-cache.h" #include "runtime/io/handle-cache.inline.h" #include @@ -44,6 +45,8 @@ #include "util/histogram-metric.h" #include "util/metrics.h" #include "util/os-util.h" +#include "util/parse-util.h" +#include "util/pretty-printer.h" #include "util/test-info.h" #include "util/time.h" @@ -148,6 +151,13 @@ DEFINE_int32(num_adls_io_threads, 16, "Number of ADLS I/O threads"); // The maximum number of GCS I/O threads. TODO: choose the default empirically. DEFINE_int32(num_gcs_io_threads, 16, "Number of GCS I/O threads"); +// Footer cache configuration +DEFINE_string(footer_cache_capacity, "512MB", + "Specify the capacity of the footer cache. If set to 0, footer cache is disabled."); +DEFINE_int32(footer_cache_partitions, 16, + "Number of partitions in the footer cache for reducing lock contention. " + "If set to 0 or negative, footer cache is disabled."); + // The maximum number of GCS I/O threads. TODO: choose the default empirically. DEFINE_int32(num_cos_io_threads, 16, "Number of COS I/O threads"); @@ -699,6 +709,24 @@ Status DiskIoMgr::Init() { new DataCache(FLAGS_data_cache, FLAGS_data_cache_num_async_write_threads)); RETURN_IF_ERROR(remote_data_cache_->Init()); } + + // Initialize footer cache + bool is_percent = false; + int64_t footer_cache_capacity = + ParseUtil::ParseMemSpec(FLAGS_footer_cache_capacity, &is_percent, 0); + if (footer_cache_capacity > 0 && FLAGS_footer_cache_partitions > 0) { + // If footer_cache_capacity is larger than 0, the number should not be a percentage. + DCHECK(!is_percent); + footer_cache_.reset(new FooterCache()); + RETURN_IF_ERROR(footer_cache_->Init(footer_cache_capacity, + FLAGS_footer_cache_partitions)); + LOG(INFO) << "Footer Cache initialized with capacity " + << PrettyPrinter::Print(footer_cache_capacity, TUnit::BYTES) + << " and " << FLAGS_footer_cache_partitions << " partitions"; + } else { + LOG(INFO) << "Footer Cache is disabled."; + } + return Status::OK(); } diff --git a/be/src/runtime/io/disk-io-mgr.h b/be/src/runtime/io/disk-io-mgr.h index 5e3fd46ea2..0a9924041b 100644 --- a/be/src/runtime/io/disk-io-mgr.h +++ b/be/src/runtime/io/disk-io-mgr.h @@ -41,6 +41,7 @@ namespace io { class DataCache; class DiskQueue; +class FooterCache; /// Manager object that schedules IO for all queries on all disks and remote filesystems /// (such as S3). Each query maps to one or more RequestContext objects, each of which @@ -424,6 +425,9 @@ class DiskIoMgr : public CacheLineAligned { DataCache* remote_data_cache() { return remote_data_cache_.get(); } + FooterCache* footer_cache() const { return footer_cache_.get(); } + bool footer_cache_enabled() const { return footer_cache_ != nullptr; } + private: DISALLOW_COPY_AND_ASSIGN(DiskIoMgr); friend class DiskIoMgrTest_Buffers_Test; @@ -504,6 +508,11 @@ class DiskIoMgr : public CacheLineAligned { /// non-local reads and data read from remote data nodes will be stored in it. If not /// configured, this would be NULL. std::unique_ptr remote_data_cache_; + + /// Singleton cache for file footer metadata. If configured, it will cache parsed + /// footer metadata to avoid repeated I/O and deserialization. If not configured, + /// this would be NULL. + std::unique_ptr footer_cache_; }; } } diff --git a/be/src/runtime/io/footer-cache-test.cc b/be/src/runtime/io/footer-cache-test.cc new file mode 100644 index 0000000000..2c45467265 --- /dev/null +++ b/be/src/runtime/io/footer-cache-test.cc @@ -0,0 +1,218 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/io/footer-cache.h" + +#include +#include +#include + +#include "common/status.h" +#include "gen-cpp/parquet_types.h" +#include "testutil/gtest-util.h" +#include "util/metrics.h" + +namespace impala { +namespace io { + +class FooterCacheTest : public ::testing::Test { + protected: + void SetUp() override { + metrics_.reset(new MetricGroup("footer-cache-test")); + } + void TearDown() override {} + + std::unique_ptr metrics_; +}; + +// Test basic get/put operations with Parquet FileMetaData objects +TEST_F(FooterCacheTest, BasicGetPutParquet) { + FooterCache cache(metrics_.get()); + ASSERT_OK(cache.Init(1024 * 1024, 4)); // 1MB capacity, 4 partitions + + std::string filename = "/data/test.parquet"; + int64_t mtime = 12345; + + // Create a test FileMetaData object + auto test_metadata = std::make_shared(); + test_metadata->version = 1; + test_metadata->num_rows = 1000; + test_metadata->created_by = "test"; + + // Initially, cache should be empty + FooterCacheValue result = cache.GetFooter(filename, mtime); + EXPECT_TRUE(std::holds_alternative(result)); + + // Put Parquet footer into cache + Status status = cache.PutParquetFooter(filename, mtime, test_metadata); + ASSERT_OK(status); + + // Now we should get it back + result = cache.GetFooter(filename, mtime); + EXPECT_TRUE(std::holds_alternative>(result)); + auto cached_metadata = std::get>(result); + EXPECT_TRUE(cached_metadata != nullptr); + EXPECT_EQ(cached_metadata->version, 1); + EXPECT_EQ(cached_metadata->num_rows, 1000); + EXPECT_EQ(cached_metadata->created_by, "test"); +} + +// Test basic get/put operations with ORC serialized tail +TEST_F(FooterCacheTest, BasicGetPutOrc) { + FooterCache cache(metrics_.get()); + ASSERT_OK(cache.Init(1024 * 1024, 4)); // 1MB capacity, 4 partitions + + std::string filename = "/data/test.orc"; + int64_t mtime = 67890; + + // Create test ORC tail data + std::string test_tail = "ORC serialized tail data"; + + // Initially, cache should be empty + FooterCacheValue result = cache.GetFooter(filename, mtime); + EXPECT_TRUE(std::holds_alternative(result)); + + // Put ORC footer into cache + Status status = cache.PutOrcFooter(filename, mtime, test_tail); + ASSERT_OK(status); + + // Now we should get it back + result = cache.GetFooter(filename, mtime); + EXPECT_TRUE(std::holds_alternative(result)); + std::string cached_tail = std::get(result); + EXPECT_EQ(cached_tail, test_tail); +} + +// Test that mtime mismatch returns empty variant +TEST_F(FooterCacheTest, MtimeMismatch) { + FooterCache cache(metrics_.get()); + ASSERT_OK(cache.Init(1024 * 1024, 4)); + + std::string filename = "/data/test.parquet"; + int64_t mtime1 = 12345; + int64_t mtime2 = 67890; + + auto test_metadata = std::make_shared(); + test_metadata->version = 1; + test_metadata->num_rows = 500; + + // Put with mtime1 + ASSERT_OK(cache.PutParquetFooter(filename, mtime1, test_metadata)); + + // Get with mtime1 should succeed + FooterCacheValue result = cache.GetFooter(filename, mtime1); + EXPECT_TRUE(std::holds_alternative>(result)); + auto cached_metadata = std::get>(result); + EXPECT_EQ(cached_metadata->num_rows, 500); + + // Get with mtime2 should return empty variant (different mtime) + result = cache.GetFooter(filename, mtime2); + EXPECT_TRUE(std::holds_alternative(result)); +} + +// Test LRU eviction with metrics +TEST_F(FooterCacheTest, LruEviction) { + FooterCache cache(metrics_.get()); + ASSERT_OK(cache.Init(500, 1)); // Small capacity to trigger eviction, single partition + + // Get eviction counter + IntCounter* evicted = metrics_->FindMetricForTesting( + "impala-server.io-mgr.footer-cache.entries-evicted"); + ASSERT_TRUE(evicted != nullptr); + int64_t initial_evictions = evicted->GetValue(); + + // Insert entries until cache is full and eviction occurs + for (int i = 0; i < 5; ++i) { + std::string filename = "/data/file" + std::to_string(i) + ".parquet"; + auto metadata = std::make_shared(); + metadata->version = 1; + metadata->num_rows = i * 100; + ASSERT_OK(cache.PutParquetFooter(filename, 100, metadata)); + } + + // Verify that eviction occurred (eviction counter increased) + int64_t final_evictions = evicted->GetValue(); + EXPECT_GT(final_evictions, initial_evictions); +} +// Test metrics tracking +TEST_F(FooterCacheTest, MetricsTracking) { + FooterCache cache(metrics_.get()); + ASSERT_OK(cache.Init(1024 * 1024, 4)); + + std::string filename = "/data/test.parquet"; + int64_t mtime = 12345; + auto test_metadata = std::make_shared(); + test_metadata->version = 1; + test_metadata->num_rows = 1000; + + // Get metrics + IntCounter* hits = metrics_->FindMetricForTesting("impala-server.io-mgr.footer-cache.hits"); + IntCounter* misses = metrics_->FindMetricForTesting("impala-server.io-mgr.footer-cache.misses"); + IntGauge* entries_in_use = metrics_->FindMetricForTesting("impala-server.io-mgr.footer-cache.entries-in-use"); + + ASSERT_TRUE(hits != nullptr); + ASSERT_TRUE(misses != nullptr); + ASSERT_TRUE(entries_in_use != nullptr); + + // Initial miss + FooterCacheValue result = cache.GetFooter(filename, mtime); + EXPECT_TRUE(std::holds_alternative(result)); + EXPECT_EQ(misses->GetValue(), 1); + EXPECT_EQ(hits->GetValue(), 0); + + // Put footer + ASSERT_OK(cache.PutParquetFooter(filename, mtime, test_metadata)); + EXPECT_EQ(entries_in_use->GetValue(), 1); + + // Cache hit + result = cache.GetFooter(filename, mtime); + EXPECT_TRUE(std::holds_alternative>(result)); + auto cached_metadata = std::get>(result); + EXPECT_EQ(cached_metadata->num_rows, 1000); + EXPECT_EQ(hits->GetValue(), 1); + EXPECT_EQ(misses->GetValue(), 1); +} + +// Test partitioning (different files go to different partitions) +TEST_F(FooterCacheTest, Partitioning) { + FooterCache cache(metrics_.get()); + ASSERT_OK(cache.Init(1024 * 1024, 16)); // 1MB, 16 partitions + + EXPECT_EQ(cache.NumPartitions(), 16); + EXPECT_EQ(cache.Capacity(), 1024 * 1024); + + // Insert files - they should be distributed across partitions + for (int i = 0; i < 32; ++i) { + std::string filename = "/data/file" + std::to_string(i) + ".parquet"; + auto metadata = std::make_shared(); + metadata->version = 1; + metadata->num_rows = i * 100; + ASSERT_OK(cache.PutParquetFooter(filename, 100, metadata)); + } + + // All should be retrievable + for (int i = 0; i < 32; ++i) { + std::string filename = "/data/file" + std::to_string(i) + ".parquet"; + FooterCacheValue result = cache.GetFooter(filename, 100); + EXPECT_TRUE(std::holds_alternative>(result)); + auto cached_metadata = std::get>(result); + EXPECT_EQ(cached_metadata->num_rows, i * 100); + } +} + +} // namespace io +} // namespace impala diff --git a/be/src/runtime/io/footer-cache.cc b/be/src/runtime/io/footer-cache.cc new file mode 100644 index 0000000000..aa7a31fbf3 --- /dev/null +++ b/be/src/runtime/io/footer-cache.cc @@ -0,0 +1,247 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/io/footer-cache.h" + +#include +#include + +#include "common/names.h" +#include "util/debug-util.h" +#include "util/hash-util.h" +#include "util/metrics.h" +#include "util/impalad-metrics.h" + +namespace impala { +namespace io { + +// ======================== FooterCache ======================== + +void FooterCache::EvictionCallback::EvictedEntry(kudu::Slice key, kudu::Slice value) { + DCHECK(key.data() != nullptr); + DCHECK(value.data() != nullptr); + + // Explicitly destroy the FooterCacheValue (variant) stored in the cache + FooterCacheValue* footer_value = + reinterpret_cast(const_cast(value.data())); + footer_value->~FooterCacheValue(); + + int64_t entry_size = key.size() + value.size(); + + VLOG(2) << "Evicted footer cache entry, key size=" << key.size() + << ", value size=" << value.size(); + + // Update statistics, thread-safe + if (footer_cache_entries_evicted_ != nullptr) { + footer_cache_entries_evicted_->Increment(1); + } + if (footer_cache_entries_in_use_ != nullptr) { + footer_cache_entries_in_use_->Increment(-1); + } + if (footer_cache_entries_in_use_bytes_ != nullptr) { + footer_cache_entries_in_use_bytes_->Increment(-entry_size); + } +} + +std::string FooterCache::BuildMetadataKey(const std::string& filename, int64_t mtime) { + // Build cache key by concatenating filename and mtime + std::ostringstream oss; + oss << filename << mtime; + return oss.str(); +} + +FooterCache::FooterCache() + : total_capacity_(0), + footer_cache_hits_(ImpaladMetrics::IO_MGR_FOOTER_CACHE_HITS), + footer_cache_misses_(ImpaladMetrics::IO_MGR_FOOTER_CACHE_MISSES), + footer_cache_entries_evicted_(ImpaladMetrics::IO_MGR_FOOTER_CACHE_ENTRIES_EVICTED), + footer_cache_entries_in_use_(ImpaladMetrics::IO_MGR_FOOTER_CACHE_ENTRIES_IN_USE), + footer_cache_entries_in_use_bytes_(ImpaladMetrics::IO_MGR_FOOTER_CACHE_ENTRIES_IN_USE_BYTES), + footer_cache_entry_size_stats_(ImpaladMetrics::IO_MGR_FOOTER_CACHE_ENTRY_SIZES), + evict_callback_( + new FooterCache::EvictionCallback(footer_cache_entries_evicted_, + footer_cache_entries_in_use_, footer_cache_entries_in_use_bytes_)) {} + +Status FooterCache::Init(size_t capacity, size_t num_partitions) { + DCHECK_GT(capacity, 0); + DCHECK_GT(num_partitions, 0); + DCHECK(partitions_.empty()) << "FooterCache already initialized"; + + total_capacity_ = capacity; + partitions_.resize(num_partitions); + + // Distribute capacity evenly across partitions, rounding up if necessary + size_t remainder = capacity % num_partitions; + size_t base_capacity = capacity / num_partitions; + size_t partition_capacity = (remainder > 0 ? base_capacity + 1 : base_capacity); + + // Initialize each partition's cache + for (size_t i = 0; i < num_partitions; ++i) { + Status status = partitions_[i].Init(partition_capacity, i); + if (!status.ok()) { + return Status(Substitute("Failed to initialize footer cache partition $0: $1", + i, status.GetDetail())); + } + } + + VLOG(1) << "Initialized footer cache with capacity " << capacity + << " and " << num_partitions << " partitions"; + return Status::OK(); +} + +FooterCache::~FooterCache() { + ReleaseResources(); +} + +FooterCacheValue FooterCache::GetFooter( + const std::string& filename, int64_t mtime) { + // Hash the filename to get the partition index + size_t partition_idx = GetPartitionIndex(filename); + FooterCachePartition& partition = partitions_[partition_idx]; + + std::string key_str = BuildMetadataKey(filename, mtime); + kudu::Slice key(key_str); + + // Lookup in the cache (Cache class handles locking internally) + Cache::UniqueHandle handle = partition.cache->Lookup(key); + if (!handle) { + footer_cache_misses_->Increment(1); + return FooterCacheValue(); // Return empty variant + } + + footer_cache_hits_->Increment(1); + + kudu::Slice value = partition.cache->Value(handle); + // The cache stores a FooterCacheValue (variant) + FooterCacheValue* footer_value = + reinterpret_cast(const_cast(value.data())); + VLOG(2) << "Footer cache hit for file: " << filename << " (mtime=" << mtime << ")"; + return *footer_value; +} + +Status FooterCache::PutParquetFooter(const std::string& filename, int64_t mtime, + std::shared_ptr file_metadata) { + DCHECK(file_metadata != nullptr); + + size_t partition_idx = GetPartitionIndex(filename); + FooterCachePartition& partition = partitions_[partition_idx]; + + std::string key_str = BuildMetadataKey(filename, mtime); + kudu::Slice key(key_str); + + // Allocate space for storing the FooterCacheValue (variant) + size_t value_size = sizeof(FooterCacheValue); + Cache::UniquePendingHandle pending = partition.cache->Allocate(key, value_size); + if (!pending) { + return Status(Substitute("Failed to allocate cache space for footer: $0", filename)); + } + + // Store the variant in the cache using placement new + uint8_t* cache_value = partition.cache->MutableValue(&pending); + new (cache_value) FooterCacheValue(file_metadata); + + // Insert into the cache (this handles LRU eviction automatically) + Cache::UniqueHandle handle = partition.cache->Insert(std::move(pending), evict_callback_.get()); + if (!handle) { + return Status(Substitute("Failed to insert footer into cache: $0", filename)); + } + + // Update metrics, thread-safe + int64_t entry_size = key.size() + value_size; + footer_cache_entries_in_use_->Increment(1); + footer_cache_entries_in_use_bytes_->Increment(entry_size); + footer_cache_entry_size_stats_->Update(entry_size); + + VLOG(2) << "Cached Parquet footer for file: " << filename << " (mtime=" << mtime << ")"; + return Status::OK(); +} + +Status FooterCache::PutOrcFooter(const std::string& filename, int64_t mtime, + const std::string& serialized_tail) { + DCHECK(!serialized_tail.empty()); + + size_t partition_idx = GetPartitionIndex(filename); + FooterCachePartition& partition = partitions_[partition_idx]; + + std::string key_str = BuildMetadataKey(filename, mtime); + kudu::Slice key(key_str); + + // Allocate space for storing the FooterCacheValue (variant) + size_t value_size = sizeof(FooterCacheValue); + Cache::UniquePendingHandle pending = partition.cache->Allocate(key, value_size); + if (!pending) { + return Status(Substitute("Failed to allocate cache space for footer: $0", filename)); + } + + // Store the variant in the cache using placement new + uint8_t* cache_value = partition.cache->MutableValue(&pending); + new (cache_value) FooterCacheValue(serialized_tail); + + // Insert into the cache (this handles LRU eviction automatically) + Cache::UniqueHandle handle = partition.cache->Insert(std::move(pending), evict_callback_.get()); + if (!handle) { + return Status(Substitute("Failed to insert footer into cache: $0", filename)); + } + + // Update metrics + int64_t entry_size = key.size() + value_size; + footer_cache_entries_in_use_->Increment(1); + footer_cache_entries_in_use_bytes_->Increment(entry_size); + footer_cache_entry_size_stats_->Update(entry_size); + + VLOG(2) << "Cached ORC footer for file: " << filename << " (mtime=" << mtime << ")"; + return Status::OK(); +} + +void FooterCache::ReleaseResources() { + // Release all cache resources by resetting the cache pointers in each partition + for (FooterCachePartition& partition : partitions_) { + if (partition.cache != nullptr) { + partition.cache.reset(); + } + } + VLOG(1) << "Released all footer cache resources"; +} + +size_t FooterCache::GetPartitionIndex(const std::string& filename) const { + // Hash the filename and mod by number of partitions + return HashUtil::Hash(filename.data(), filename.size(), 0) % partitions_.size(); +} + +// ======================== FooterCachePartition ======================== + +Status FooterCache::FooterCachePartition::Init(size_t capacity, int partition_id) { + std::string cache_id = Substitute("footer-cache-partition-$0", partition_id); + cache.reset(NewCache(Cache::EvictionPolicy::LRU, capacity, cache_id)); + + if (cache == nullptr) { + return Status(Substitute("Failed to create cache for partition $0", partition_id)); + } + + Status status = cache->Init(); + if (!status.ok()) { + return Status(Substitute("Failed to initialize cache for partition $0: $1", + partition_id, status.GetDetail())); + } + + VLOG(1) << "Initialized footer cache partition " << partition_id + << " with capacity " << capacity; + return Status::OK(); +} + +} // namespace io +} // namespace impala diff --git a/be/src/runtime/io/footer-cache.h b/be/src/runtime/io/footer-cache.h new file mode 100644 index 0000000000..44f26f705e --- /dev/null +++ b/be/src/runtime/io/footer-cache.h @@ -0,0 +1,155 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "common/status.h" +#include "gen-cpp/parquet_types.h" +#include "util/cache/cache.h" +#include "util/container-util.h" +#include "util/metrics-fwd.h" +#include "util/histogram-metric.h" + +namespace impala { +namespace io { + +/// Type alias for footer cache value - can be empty, Parquet FileMetaData or ORC tail +using FooterCacheValue = std::variant< + std::monostate, // Empty state (cache miss) + std::shared_ptr, // Parquet footer + std::string>; // ORC serialized tail + +/// The FooterCache is a thread-safe partitioned LRU cache for file footer metadata. +/// It supports both Parquet and ORC file formats by storing different types of footer +/// data. Similar to FileHandleCache, it uses multiple partitions to reduce lock +/// contention in high-concurrency scenarios. +/// +/// Cache Key: pair +/// Cache Value: variant, string> +/// - Parquet: shared_ptr (parsed footer object) +/// - ORC: string (serialized tail bytes) +/// +/// The cache stores format-specific footer data, which provides: +/// - Parquet: Zero deserialization overhead and complete avoidance of disk I/O +/// - ORC: Avoidance of disk I/O for tail data +/// - Direct use of the cached data by callers +/// +/// The cache automatically evicts least recently used entries when capacity is reached. +/// Each partition operates independently with its own lock and LRU list. +class FooterCache { + public: + typedef std::pair CacheKey; + + FooterCache(); + + ~FooterCache(); + + /// Initialization for the footer cache, including cache and metrics allocation. + /// Must be called before using the cache. + Status Init(size_t capacity, size_t num_partitions); + + /// Get a footer from the cache. Returns a variant with std::monostate if not found. + /// This will hash the filename to determine which partition to use. + /// The returned variant contains either: + /// - std::monostate if not found (cache miss) + /// - shared_ptr for Parquet files + /// - string for ORC files (serialized tail) + /// Caller should use std::holds_alternative or std::get to access the value. + /// Thread-safe. + FooterCacheValue GetFooter(const std::string& filename, int64_t mtime); + + /// Put a Parquet footer into the cache. Stores the parsed FileMetaData object. + /// This will hash the filename to determine which partition to use. + /// Thread-safe. + Status PutParquetFooter(const std::string& filename, int64_t mtime, + std::shared_ptr file_metadata); + + /// Put an ORC footer into the cache. Stores the serialized tail bytes. + /// This will hash the filename to determine which partition to use. + /// Thread-safe. + Status PutOrcFooter(const std::string& filename, int64_t mtime, + const std::string& serialized_tail); + + /// Get cache configuration + size_t Capacity() const { return total_capacity_; } + size_t NumPartitions() const { return partitions_.size(); } + + /// Release all resources held by the cache. This will reset all partition caches. + /// After calling this method, the cache should not be used anymore. + /// Thread-safe. + void ReleaseResources(); + + /// EvictionCallback for the footer cache. + class EvictionCallback : public Cache::EvictionCallback { + public: + EvictionCallback(IntCounter* entries_evicted, IntGauge* entries_in_use, + IntGauge* entries_in_use_bytes) + : footer_cache_entries_evicted_(entries_evicted), + footer_cache_entries_in_use_(entries_in_use), + footer_cache_entries_in_use_bytes_(entries_in_use_bytes) {} + virtual void EvictedEntry(kudu::Slice key, kudu::Slice value) override; + + private: + /// Metrics for the footer cache. + IntCounter* footer_cache_entries_evicted_; + IntGauge* footer_cache_entries_in_use_; + IntGauge* footer_cache_entries_in_use_bytes_; + }; + + private: + /// Each partition operates independently with its own Cache instance. + /// This reduces lock contention in high-concurrency scenarios. + struct FooterCachePartition { + + std::unique_ptr cache; + + Status Init(size_t capacity, int partition_id); + }; + + /// Build a cache key from filename and mtime by concatenating them. + static std::string BuildMetadataKey(const std::string& filename, int64_t mtime); + + /// Get the partition index for a given filename. + size_t GetPartitionIndex(const std::string& filename) const; + + std::vector partitions_; + + size_t total_capacity_; + + /// Metrics for the footer cache. + IntCounter* footer_cache_hits_; + IntCounter* footer_cache_misses_; + IntCounter* footer_cache_entries_evicted_; + IntGauge* footer_cache_entries_in_use_; + IntGauge* footer_cache_entries_in_use_bytes_; + + /// Statistics for the footer entry sizes. + HistogramMetric* footer_cache_entry_size_stats_; + + /// Eviction callback function. + std::unique_ptr evict_callback_; +}; + +} // namespace io +} // namespace impala diff --git a/be/src/util/impalad-metrics.cc b/be/src/util/impalad-metrics.cc index 8ff0fe1ef4..c4e4cfd45f 100644 --- a/be/src/util/impalad-metrics.cc +++ b/be/src/util/impalad-metrics.cc @@ -27,6 +27,10 @@ DECLARE_string(debug_actions); DECLARE_bool(use_local_catalog); +// Maximum footer cache entry size for the purposes of histogram sizing in stats +// collection. A footer entry is expected to be less than 1MB. +static constexpr int64_t STATS_MAX_FOOTER_CACHE_ENTRY_SIZE = 1L << 20; + namespace impala { // Naming convention: Components should be separated by '.' and words should @@ -93,6 +97,21 @@ const char* ImpaladMetricKeys::IO_MGR_REMOTE_DATA_CACHE_ASYNC_WRITES_DROPPED_ENT "impala-server.io-mgr.remote-data-cache-async-writes-dropped-entries"; const char* ImpaladMetricKeys::IO_MGR_BYTES_WRITTEN = "impala-server.io-mgr.bytes-written"; + +// Footer cache metrics +const char* ImpaladMetricKeys::IO_MGR_FOOTER_CACHE_HITS = + "impala-server.io-mgr.footer-cache.hits"; +const char* ImpaladMetricKeys::IO_MGR_FOOTER_CACHE_MISSES = + "impala-server.io-mgr.footer-cache.misses"; +const char* ImpaladMetricKeys::IO_MGR_FOOTER_CACHE_ENTRIES_EVICTED = + "impala-server.io-mgr.footer-cache.entries-evicted"; +const char* ImpaladMetricKeys::IO_MGR_FOOTER_CACHE_ENTRIES_IN_USE = + "impala-server.io-mgr.footer-cache.entries-in-use"; +const char* ImpaladMetricKeys::IO_MGR_FOOTER_CACHE_ENTRIES_IN_USE_BYTES = + "impala-server.io-mgr.footer-cache.entries-in-use-bytes"; +const char* ImpaladMetricKeys::IO_MGR_FOOTER_CACHE_ENTRY_SIZES = + "impala-server.io-mgr.footer-cache.entry-sizes"; + const char* ImpaladMetricKeys::IO_MGR_NUM_CACHED_FILE_HANDLES = "impala-server.io.mgr.num-cached-file-handles"; const char* ImpaladMetricKeys::IO_MGR_NUM_FILE_HANDLES_OUTSTANDING = @@ -218,6 +237,15 @@ IntCounter* ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_ASYNC_WRITES_DROPPED_BYTES IntCounter* ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_ASYNC_WRITES_DROPPED_ENTRIES = nullptr; IntCounter* ImpaladMetrics::IO_MGR_BYTES_WRITTEN = nullptr; + +// Footer cache metrics +IntCounter* ImpaladMetrics::IO_MGR_FOOTER_CACHE_HITS = nullptr; +IntCounter* ImpaladMetrics::IO_MGR_FOOTER_CACHE_MISSES = nullptr; +IntCounter* ImpaladMetrics::IO_MGR_FOOTER_CACHE_ENTRIES_EVICTED = nullptr; +IntGauge* ImpaladMetrics::IO_MGR_FOOTER_CACHE_ENTRIES_IN_USE = nullptr; +IntGauge* ImpaladMetrics::IO_MGR_FOOTER_CACHE_ENTRIES_IN_USE_BYTES = nullptr; +HistogramMetric* ImpaladMetrics::IO_MGR_FOOTER_CACHE_ENTRY_SIZES = nullptr; + IntCounter* ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_REOPENED = nullptr; IntCounter* ImpaladMetrics::HEDGED_READ_OPS = nullptr; IntCounter* ImpaladMetrics::HEDGED_READ_OPS_WIN = nullptr; @@ -450,6 +478,21 @@ void ImpaladMetrics::CreateMetrics(MetricGroup* m) { IO_MGR_REMOTE_DATA_CACHE_ASYNC_WRITES_DROPPED_ENTRIES = IO_MGR_METRICS->AddCounter( ImpaladMetricKeys::IO_MGR_REMOTE_DATA_CACHE_ASYNC_WRITES_DROPPED_ENTRIES, 0); + // Footer cache metrics + IO_MGR_FOOTER_CACHE_HITS = IO_MGR_METRICS->AddCounter( + ImpaladMetricKeys::IO_MGR_FOOTER_CACHE_HITS, 0); + IO_MGR_FOOTER_CACHE_MISSES = IO_MGR_METRICS->AddCounter( + ImpaladMetricKeys::IO_MGR_FOOTER_CACHE_MISSES, 0); + IO_MGR_FOOTER_CACHE_ENTRIES_EVICTED = IO_MGR_METRICS->AddCounter( + ImpaladMetricKeys::IO_MGR_FOOTER_CACHE_ENTRIES_EVICTED, 0); + IO_MGR_FOOTER_CACHE_ENTRIES_IN_USE = IO_MGR_METRICS->AddGauge( + ImpaladMetricKeys::IO_MGR_FOOTER_CACHE_ENTRIES_IN_USE, 0); + IO_MGR_FOOTER_CACHE_ENTRIES_IN_USE_BYTES = IO_MGR_METRICS->AddGauge( + ImpaladMetricKeys::IO_MGR_FOOTER_CACHE_ENTRIES_IN_USE_BYTES, 0); + IO_MGR_FOOTER_CACHE_ENTRY_SIZES = IO_MGR_METRICS->RegisterMetric(new HistogramMetric( + MetricDefs::Get(ImpaladMetricKeys::IO_MGR_FOOTER_CACHE_ENTRY_SIZES), + STATS_MAX_FOOTER_CACHE_ENTRY_SIZE, 3)); + IO_MGR_CACHED_FILE_HANDLES_HIT_RATIO = StatsMetric::CreateAndRegister(IO_MGR_METRICS, ImpaladMetricKeys::IO_MGR_CACHED_FILE_HANDLES_HIT_RATIO); diff --git a/be/src/util/impalad-metrics.h b/be/src/util/impalad-metrics.h index a383a1fca9..abe46fa99a 100644 --- a/be/src/util/impalad-metrics.h +++ b/be/src/util/impalad-metrics.h @@ -131,6 +131,24 @@ class ImpaladMetricKeys { /// Total number of bytes written to disk by the io mgr (for spilling) static const char* IO_MGR_BYTES_WRITTEN; + /// Footer cache hits + static const char* IO_MGR_FOOTER_CACHE_HITS; + + /// Footer cache misses + static const char* IO_MGR_FOOTER_CACHE_MISSES; + + /// Footer cache entries evicted + static const char* IO_MGR_FOOTER_CACHE_ENTRIES_EVICTED; + + /// Footer cache entries currently in use + static const char* IO_MGR_FOOTER_CACHE_ENTRIES_IN_USE; + + /// Footer cache bytes currently in use + static const char* IO_MGR_FOOTER_CACHE_ENTRIES_IN_USE_BYTES; + + /// Footer cache entry sizes distribution + static const char* IO_MGR_FOOTER_CACHE_ENTRY_SIZES; + /// Number of unbuffered file handles cached by the io mgr static const char* IO_MGR_NUM_CACHED_FILE_HANDLES; @@ -336,6 +354,15 @@ class ImpaladMetrics { static IntCounter* HEDGED_READ_OPS; static IntCounter* HEDGED_READ_OPS_WIN; static IntCounter* CATALOG_CACHE_EVICTION_COUNT; + + // Footer cache metrics + static IntCounter* IO_MGR_FOOTER_CACHE_HITS; + static IntCounter* IO_MGR_FOOTER_CACHE_MISSES; + static IntCounter* IO_MGR_FOOTER_CACHE_ENTRIES_EVICTED; + static IntGauge* IO_MGR_FOOTER_CACHE_ENTRIES_IN_USE; + static IntGauge* IO_MGR_FOOTER_CACHE_ENTRIES_IN_USE_BYTES; + static HistogramMetric* IO_MGR_FOOTER_CACHE_ENTRY_SIZES; + static IntCounter* CATALOG_CACHE_HIT_COUNT; static IntCounter* CATALOG_CACHE_LOAD_COUNT; static IntCounter* CATALOG_CACHE_LOAD_EXCEPTION_COUNT; diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json index c358b699be..4a6d2c1667 100644 --- a/common/thrift/metrics.json +++ b/common/thrift/metrics.json @@ -79,6 +79,66 @@ "kind": "HISTOGRAM", "key": "impala.codegen-cache.entry-sizes" }, + { + "description": "The total number of cache hits in the Footer Cache", + "contexts": [ + "IMPALAD" + ], + "label": "Footer Cache Hits", + "units": "UNIT", + "kind": "COUNTER", + "key": "impala-server.io-mgr.footer-cache.hits" + }, + { + "description": "The total number of cache misses in the Footer Cache", + "contexts": [ + "IMPALAD" + ], + "label": "Footer Cache Misses", + "units": "UNIT", + "kind": "COUNTER", + "key": "impala-server.io-mgr.footer-cache.misses" + }, + { + "description": "The number of in-use Footer Cache Entries", + "contexts": [ + "IMPALAD" + ], + "label": "In-use Footer Cache Entries", + "units": "UNIT", + "kind": "GAUGE", + "key": "impala-server.io-mgr.footer-cache.entries-in-use" + }, + { + "description": "The total bytes of in-use Footer Cache Entries", + "contexts": [ + "IMPALAD" + ], + "label": "In-use Footer Cache Entries total bytes", + "units": "BYTES", + "kind": "GAUGE", + "key": "impala-server.io-mgr.footer-cache.entries-in-use-bytes" + }, + { + "description": "The number of evicted Footer Cache Entries", + "contexts": [ + "IMPALAD" + ], + "label": "Evicted Footer Cache Entries", + "units": "UNIT", + "kind": "COUNTER", + "key": "impala-server.io-mgr.footer-cache.entries-evicted" + }, + { + "description": "Statistics for footer cache entry sizes allocated from the system.", + "contexts": [ + "IMPALAD" + ], + "label": "Footer Cache Entry Sizes.", + "units": "BYTES", + "kind": "HISTOGRAM", + "key": "impala-server.io-mgr.footer-cache.entry-sizes" + }, { "description": "The total number of cache hits in the Tuple Cache", "contexts": [ diff --git a/testdata/workloads/functional-query/queries/QueryTest/footer-cache.test b/testdata/workloads/functional-query/queries/QueryTest/footer-cache.test new file mode 100644 index 0000000000..922513a4be --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/footer-cache.test @@ -0,0 +1,61 @@ +==== +---- QUERY +create table test_parquet stored as parquet as select * from tpch_parquet.lineitem; +---- RUNTIME_PROFILE +# Expect all cache misses for tpch_parquet.lineitem. +row_regex: .*ParquetFooterCacheHits: 0 \(0\).* +row_regex: .*ParquetFooterCacheMisses: 3 \(3\).* +==== +---- QUERY +select count(*) from tpch_parquet.lineitem t1, test_parquet t2 where t1.l_orderkey = t2.l_orderkey; +---- RESULTS +30012985 +---- RUNTIME_PROFILE +# Expect cache hits for t1 and cache misses for t2. +row_regex: .*ParquetFooterCacheHits: 3 \(3\).* +row_regex: .*ParquetFooterCacheMisses: 0 \(0\).* +row_regex: .*ParquetFooterCacheHits: 0 \(0\).* +row_regex: .*ParquetFooterCacheMisses: 1 \(1\).* +==== +---- QUERY +select count(distinct l_orderkey) from test_parquet; +---- RESULTS +1500000 +---- RUNTIME_PROFILE +# Expect all cache hits. +row_regex: .*ParquetFooterCacheHits: 1 \(1\).* +row_regex: .*ParquetFooterCacheMisses: 0 \(0\).* +==== +---- QUERY +# Overwrite temp table with subset of data. +insert overwrite test_parquet select * from tpch_parquet.lineitem where l_shipmode = 'AIR'; +==== +---- QUERY +# Verifies that stale data from the cache is not used. +select count(distinct l_orderkey) from test_parquet; +---- RESULTS +652393 +---- RUNTIME_PROFILE +# Expect all cache misses due to change in mtime. +row_regex: .*ParquetFooterCacheHits: 0 \(0\).* +row_regex: .*ParquetFooterCacheMisses: 1 \(1\).* +==== +---- QUERY +# Test footer cache with ORC format +select count(distinct l_orderkey) from tpch_orc_def.lineitem; +---- RESULTS +1500000 +---- RUNTIME_PROFILE +# Expect all cache misses. +row_regex: .*OrcFooterCacheHits: 0 \(0\).* +row_regex: .*OrcFooterCacheMisses: 4 \(4\).* +==== +---- QUERY +select count(distinct l_orderkey) from tpch_orc_def.lineitem; +---- RESULTS +1500000 +---- RUNTIME_PROFILE +# Expect all cache hits. +row_regex: .*OrcFooterCacheHits: 4 \(4\).* +row_regex: .*OrcFooterCacheMisses: 0 \(0\).* +==== diff --git a/tests/custom_cluster/test_data_cache.py b/tests/custom_cluster/test_data_cache.py index 73a868ae8c..4986462228 100644 --- a/tests/custom_cluster/test_data_cache.py +++ b/tests/custom_cluster/test_data_cache.py @@ -43,6 +43,8 @@ def setup_class(cls): def get_impalad_args(eviction_policy, high_write_concurrency=True, force_single_shard=True, keep_across_restarts=False): impalad_args = ["--always_use_data_cache=true"] + # Disable footer cache to avoid interference with data cache. + impalad_args.append("--footer_cache_capacity=0") if (high_write_concurrency): impalad_args.append("--data_cache_write_concurrency=64") if (force_single_shard): diff --git a/tests/custom_cluster/test_footer_cache.py b/tests/custom_cluster/test_footer_cache.py new file mode 100644 index 0000000000..2d46d69fa5 --- /dev/null +++ b/tests/custom_cluster/test_footer_cache.py @@ -0,0 +1,157 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import absolute_import, division, print_function + +import pytest + +from tests.common.custom_cluster_test_suite import CustomClusterTestSuite +from tests.common.skip import SkipIf, SkipIfNotHdfsMinicluster + + +@SkipIfNotHdfsMinicluster.scheduling +class TestFooterCache(CustomClusterTestSuite): + """ This test enables the footer cache and verifies that cache hit and miss counts + in the runtime profile and metrics are as expected. Run on non-EC HDFS only as + this test checks the number of footer cache hit counts, which implicitly relies + on the scheduler's behavior and number of HDFS blocks. + """ + @classmethod + def setup_class(cls): + super(TestFooterCache, cls).setup_class() + + def get_footer_cache_metric(self, suffix): + """Helper method to get footer cache metrics.""" + return self.get_metric('impala-server.io-mgr.footer-cache.' + suffix) + + def __test_footer_cache_deterministic(self, vector, unique_database): + """ This test creates a temporary table from another table, overwrites it with + some other data and verifies that no stale footer is read from the cache. Runs with + a single node to make it easier to verify the runtime profile. + """ + self.run_test_case('QueryTest/footer-cache', vector, unique_database) + assert self.get_footer_cache_metric('hits') > 0 + assert self.get_footer_cache_metric('misses') > 0 + assert self.get_footer_cache_metric('entries-in-use') > 0 + assert self.get_footer_cache_metric('entries-in-use-bytes') > 0 + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args(cluster_size=1) + def test_footer_cache_deterministic(self, vector, unique_database): + """Test footer cache with deterministic queries.""" + self.__test_footer_cache_deterministic(vector, unique_database) + + def __test_footer_cache(self): + """ This test scans the same table twice and verifies the footer cache hit count + metrics are correct. + """ + QUERY = "select * from tpch_parquet.lineitem" + # Do a first run to warm up the cache. Expect no hits. + self.execute_query(QUERY) + assert self.get_footer_cache_metric('hits') == 0 + assert self.get_footer_cache_metric('misses') > 0 + assert self.get_footer_cache_metric('entries-in-use') > 0 + assert self.get_footer_cache_metric('entries-in-use-bytes') > 0 + + # Do a second run. Expect some hits. + self.execute_query(QUERY) + assert self.get_footer_cache_metric('hits') > 0 + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args(cluster_size=1, force_restart=True) + def test_footer_cache(self): + """Test basic footer cache functionality.""" + self.__test_footer_cache() + + def __test_footer_cache_disablement(self, vector): + """Verifies that the cache metrics are all zero when footer cache is disabled.""" + assert self.get_footer_cache_metric('hits') == 0 + assert self.get_footer_cache_metric('misses') == 0 + assert self.get_footer_cache_metric('entries-in-use') == 0 + assert self.get_footer_cache_metric('entries-in-use-bytes') == 0 + + # Runs a query against parquet format. + # Verifies that the metrics stay at zero when the cache is disabled. + QUERY = "select * from functional_parquet.alltypes" + self.execute_query(QUERY, vector.get_value('exec_option')) + assert self.get_footer_cache_metric('hits') == 0 + assert self.get_footer_cache_metric('misses') == 0 + assert self.get_footer_cache_metric('entries-in-use') == 0 + assert self.get_footer_cache_metric('entries-in-use-bytes') == 0 + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args="--footer_cache_capacity=0", + cluster_size=1) + def test_footer_cache_disablement(self, vector): + """Test that footer cache can be disabled.""" + self.__test_footer_cache_disablement(vector) + + def __test_footer_cache_with_orc(self): + """ This test scans ORC tables and verifies the footer cache hit count + metrics are correct for ORC format. + """ + QUERY = "select * from functional_orc_def.alltypes" + # Do a first run to warm up the cache. Expect no hits. + self.execute_query(QUERY) + assert self.get_footer_cache_metric('hits') == 0 + assert self.get_footer_cache_metric('misses') > 0 + assert self.get_footer_cache_metric('entries-in-use') > 0 + assert self.get_footer_cache_metric('entries-in-use-bytes') > 0 + + # Do a second run. Expect some hits. + self.execute_query(QUERY) + assert self.get_footer_cache_metric('hits') > 0 + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args(cluster_size=1) + def test_footer_cache_with_orc(self): + """Test footer cache with ORC format.""" + self.__test_footer_cache_with_orc() + + def __test_footer_cache_capacity(self): + """ This test verifies that footer cache respects the capacity limit. + Uses a small cache capacity to trigger evictions. + """ + # Query multiple tables to fill up the cache + queries = [ + "select * from tpch_parquet.lineitem limit 1", + "select * from tpch_parquet.orders limit 1", + "select * from tpch_parquet.customer limit 1", + "select * from tpch_parquet.part limit 1", + "select * from tpch_parquet.supplier limit 1", + "select * from tpch_parquet.partsupp limit 1", + "select * from tpch_parquet.nation limit 1", + "select * from tpch_parquet.region limit 1" + ] + + for query in queries: + self.execute_query(query) + + # Verify that cache metrics are being tracked + assert self.get_footer_cache_metric('misses') > 0 + + # With small cache, some entries might have been evicted + assert self.get_footer_cache_metric('entries-evicted') > 0 + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args="--footer_cache_capacity=10KB", + cluster_size=1) + def test_footer_cache_capacity(self): + """Test footer cache with limited capacity.""" + self.__test_footer_cache_capacity()