Skip to content

Commit aeb68ce

Browse files
committed
IMPALA-4568: Support FooterCache for parquet and orc files
Change-Id: Ie3f0ecc082745a811fb35e347597b82c198d4e65
1 parent 3dac013 commit aeb68ce

21 files changed

Lines changed: 1356 additions & 86 deletions

be/src/exec/orc/hdfs-orc-scanner.cc

Lines changed: 108 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include "exprs/scalar-expr.h"
2828
#include "runtime/collection-value-builder.h"
2929
#include "runtime/exec-env.h"
30+
#include "runtime/io/footer-cache.h"
3031
#include "runtime/io/request-context.h"
3132
#include "runtime/mem-tracker.h"
3233
#include "runtime/runtime-filter.inline.h"
@@ -329,6 +330,10 @@ Status HdfsOrcScanner::Open(ScannerContext* context) {
329330
num_pushed_down_runtime_filters_counter_ =
330331
ADD_COUNTER(scan_node_->runtime_profile(), "NumPushedDownRuntimeFilters",
331332
TUnit::UNIT);
333+
num_footer_cache_hits_counter_ =
334+
ADD_COUNTER(scan_node_->runtime_profile(), "NumOrcFooterCacheHits", TUnit::UNIT);
335+
num_footer_cache_misses_counter_ =
336+
ADD_COUNTER(scan_node_->runtime_profile(), "NumOrcFooterCacheMisses", TUnit::UNIT);
332337

333338
codegend_process_scratch_batch_fn_ = scan_node_->GetCodegenFn(THdfsFileFormat::ORC);
334339
if (codegend_process_scratch_batch_fn_ == nullptr) {
@@ -503,7 +508,59 @@ void HdfsOrcScanner::Close(RowBatch* row_batch) {
503508
CloseInternal();
504509
}
505510

506-
Status HdfsOrcScanner::ProcessFileTail() {
511+
Status HdfsOrcScanner::TryGetFooterFromCache(FooterCache* footer_cache,
512+
std::string* cached_footer, bool* cache_hit) {
513+
DCHECK(footer_cache != nullptr);
514+
DCHECK(cached_footer != nullptr);
515+
DCHECK(cache_hit != nullptr);
516+
517+
*cache_hit = false;
518+
const HdfsFileDesc* file_desc = stream_->file_desc();
519+
FooterCacheValue cached_value =
520+
footer_cache->GetFooter(file_desc->filename, file_desc->mtime);
521+
522+
// Check if we got an ORC footer from cache
523+
if (!std::holds_alternative<std::string>(cached_value)) {
524+
return Status::OK();
525+
}
526+
*cached_footer = std::get<std::string>(cached_value);
527+
if (cached_footer->empty()) {
528+
return Status::OK();
529+
}
530+
531+
COUNTER_ADD(num_footer_cache_hits_counter_, 1);
532+
VLOG(2) << "Footer cache hit for ORC file: " << filename();
533+
*cache_hit = true;
534+
return Status::OK();
535+
}
536+
537+
Status HdfsOrcScanner::CreateOrcReaderWithCachedFooter(
538+
const std::string& cached_footer) {
539+
try {
540+
unique_ptr<orc::InputStream> input_stream(new ScanRangeInputStream(this));
541+
VLOG_FILE << "Processing FileTail of ORC file: " << input_stream->getName()
542+
<< ", file_length: " << input_stream->getLength();
543+
reader_options_.setSerializedFileTail(cached_footer);
544+
reader_ = orc::createReader(move(input_stream), reader_options_);
545+
546+
// When using cached footer, we need to position the stream as if we had read
547+
// the footer from disk. Skip to the end of the footer range to match the state
548+
// that would exist after reading the footer from disk.
549+
int64_t bytes_to_skip = reader_->getSerializedFileTail().length();
550+
if (bytes_to_skip > 0) {
551+
Status status;
552+
if (!stream_->SkipBytes(bytes_to_skip, &status)) {
553+
return Status(Substitute("Failed to position stream after cached footer: $0",
554+
status.GetDetail()));
555+
}
556+
VLOG(2) << Substitute("Skipped $0 bytes to position stream at footer start after "
557+
"using cached footer", bytes_to_skip);
558+
}
559+
} RETURN_ON_ORC_EXCEPTION("Encountered parse error in tail of ORC file $0: $1");
560+
return Status::OK();
561+
}
562+
563+
Status HdfsOrcScanner::CreateOrcReaderFromDisk() {
507564
try {
508565
// ScanRangeInputStream keeps a pointer to this HdfsOrcScanner so we can hack
509566
// async IO behind the orc::InputStream interface. The ranges of the
@@ -513,15 +570,64 @@ Status HdfsOrcScanner::ProcessFileTail() {
513570
<< ", file_length: " << input_stream->getLength();
514571
reader_ = orc::createReader(move(input_stream), reader_options_);
515572
} RETURN_ON_ORC_EXCEPTION("Encountered parse error in tail of ORC file $0: $1");
573+
return Status::OK();
574+
}
516575

517-
if (reader_->getNumberOfRows() == 0) return Status::OK();
576+
void HdfsOrcScanner::CacheOrcFooter(FooterCache* footer_cache) {
577+
DCHECK(footer_cache != nullptr);
578+
DCHECK(reader_ != nullptr);
579+
580+
const HdfsFileDesc* file_desc = stream_->file_desc();
581+
std::string serialized_tail = reader_->getSerializedFileTail();
582+
Status cache_status = footer_cache->PutOrcFooter(
583+
file_desc->filename, file_desc->mtime, serialized_tail);
584+
if (!cache_status.ok()) {
585+
// Cache insertion failure is not fatal, just log it
586+
VLOG(2) << "Failed to cache ORC footer for file '" << filename()
587+
<< "': " << cache_status.GetDetail();
588+
}
589+
}
590+
591+
Status HdfsOrcScanner::ValidateOrcReader() {
592+
DCHECK(reader_ != nullptr);
593+
594+
if (reader_->getNumberOfRows() == 0) return Status::OK();
518595
if (reader_->getNumberOfStripes() == 0) {
519596
return Status(Substitute("Invalid ORC file: $0. No stripes in this file but"
520597
" numberOfRows in footer is $1", filename(), reader_->getNumberOfRows()));
521598
}
522599
return Status::OK();
523600
}
524601

602+
Status HdfsOrcScanner::ProcessFileTail() {
603+
FooterCache* footer_cache = ExecEnv::GetInstance()->disk_io_mgr()->footer_cache();
604+
605+
// Try to get footer from cache first if footer cache is enabled
606+
if (footer_cache != nullptr) {
607+
std::string cached_footer;
608+
bool cache_hit = false;
609+
RETURN_IF_ERROR(TryGetFooterFromCache(footer_cache, &cached_footer, &cache_hit));
610+
if (cache_hit) {
611+
RETURN_IF_ERROR(CreateOrcReaderWithCachedFooter(cached_footer));
612+
return ValidateOrcReader();
613+
}
614+
}
615+
616+
// Cache miss or cache disabled - read footer from disk
617+
if (footer_cache != nullptr) {
618+
COUNTER_ADD(num_footer_cache_misses_counter_, 1);
619+
}
620+
621+
RETURN_IF_ERROR(CreateOrcReaderFromDisk());
622+
623+
// Write to cache if footer cache is enabled
624+
if (footer_cache != nullptr) {
625+
CacheOrcFooter(footer_cache);
626+
}
627+
628+
return ValidateOrcReader();
629+
}
630+
525631
inline THdfsCompression::type HdfsOrcScanner::TranslateCompressionKind(
526632
orc::CompressionKind kind) {
527633
switch (kind) {

be/src/exec/orc/hdfs-orc-scanner.h

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
#include "runtime/exec-env.h"
2727
#include "runtime/io/disk-io-mgr.h"
28+
#include "runtime/io/footer-cache.h"
2829
#include "runtime/runtime-state.h"
2930
#include "exec/acid-metadata-utils.h"
3031
#include "exec/hdfs-columnar-scanner.h"
@@ -38,6 +39,10 @@ struct HdfsFileDesc;
3839
class OrcStructReader;
3940
class OrcComplexColumnReader;
4041

42+
namespace io {
43+
class FooterCache;
44+
}
45+
4146
/// This scanner leverage the ORC library to parse ORC files located in HDFS. Data is
4247
/// transformed into Impala in-memory representation (i.e. Tuples, RowBatches) by
4348
/// different kinds of OrcColumnReaders.
@@ -151,13 +156,13 @@ class HdfsOrcScanner : public HdfsColumnarScanner {
151156
return filename_;
152157
}
153158

159+
/// Default read implementation for non async IO.
160+
Status readRandom(void* buf, uint64_t length, uint64_t offset);
161+
154162
private:
155163
HdfsOrcScanner* scanner_;
156164
const HdfsFileDesc* file_desc_;
157165
std::string filename_;
158-
159-
/// Default read implementation for non async IO.
160-
Status readRandom(void* buf, uint64_t length, uint64_t offset);
161166
};
162167

163168
HdfsOrcScanner(HdfsScanNodeBase* scan_node, RuntimeState* state);
@@ -307,6 +312,12 @@ class HdfsOrcScanner : public HdfsColumnarScanner {
307312
/// Number of runtime filters that are pushed down to the ORC reader.
308313
RuntimeProfile::Counter* num_pushed_down_runtime_filters_counter_ = nullptr;
309314

315+
/// Number of footer cache hits.
316+
RuntimeProfile::Counter* num_footer_cache_hits_counter_ = nullptr;
317+
318+
/// Number of footer cache misses.
319+
RuntimeProfile::Counter* num_footer_cache_misses_counter_ = nullptr;
320+
310321
/// Number of arrived runtime IN-list filters that can be pushed down.
311322
/// Used in ShouldUpdateSearchArgument(). Init to -1 so the check can pass at first.
312323
int num_pushable_in_list_filters_ = -1;
@@ -359,6 +370,25 @@ class HdfsOrcScanner : public HdfsColumnarScanner {
359370
/// last ORC_FOOTER_SIZE bytes in context_.
360371
Status ProcessFileTail() WARN_UNUSED_RESULT;
361372

373+
/// Helper methods for ProcessFileTail() to reduce complexity:
374+
375+
/// Try to get ORC footer from cache. Sets cache_hit to true if cache hit,
376+
/// false if cache miss. Returns error status on failure.
377+
Status TryGetFooterFromCache(io::FooterCache* footer_cache,
378+
std::string* cached_footer, bool* cache_hit) WARN_UNUSED_RESULT;
379+
/// Create ORC reader using cached footer.
380+
Status CreateOrcReaderWithCachedFooter(const std::string& cached_footer)
381+
WARN_UNUSED_RESULT;
382+
383+
/// Create ORC reader by reading footer from disk.
384+
Status CreateOrcReaderFromDisk() WARN_UNUSED_RESULT;
385+
386+
/// Cache the ORC footer after reading from disk.
387+
void CacheOrcFooter(io::FooterCache* footer_cache);
388+
389+
/// Validate the ORC reader after creation.
390+
Status ValidateOrcReader() WARN_UNUSED_RESULT;
391+
362392
/// Resolve SchemaPath in TupleDescriptors and translate them to ORC type ids into
363393
/// 'selected_nodes'. Track the position slots by pre-order traversal in the
364394
/// descriptors and push them to a stack as 'pos_slots'.

0 commit comments

Comments
 (0)