Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 108 additions & 2 deletions be/src/exec/orc/hdfs-orc-scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "exprs/scalar-expr.h"
#include "runtime/collection-value-builder.h"
#include "runtime/exec-env.h"
#include "runtime/io/footer-cache.h"
#include "runtime/io/request-context.h"
#include "runtime/mem-tracker.h"
#include "runtime/runtime-filter.inline.h"
Expand Down Expand Up @@ -329,6 +330,10 @@ Status HdfsOrcScanner::Open(ScannerContext* context) {
num_pushed_down_runtime_filters_counter_ =
ADD_COUNTER(scan_node_->runtime_profile(), "NumPushedDownRuntimeFilters",
TUnit::UNIT);
num_footer_cache_hits_counter_ =
ADD_COUNTER(scan_node_->runtime_profile(), "NumOrcFooterCacheHits", TUnit::UNIT);
num_footer_cache_misses_counter_ =
ADD_COUNTER(scan_node_->runtime_profile(), "NumOrcFooterCacheMisses", TUnit::UNIT);

codegend_process_scratch_batch_fn_ = scan_node_->GetCodegenFn(THdfsFileFormat::ORC);
if (codegend_process_scratch_batch_fn_ == nullptr) {
Expand Down Expand Up @@ -503,7 +508,59 @@ void HdfsOrcScanner::Close(RowBatch* row_batch) {
CloseInternal();
}

Status HdfsOrcScanner::ProcessFileTail() {
Status HdfsOrcScanner::TryGetFooterFromCache(FooterCache* footer_cache,
std::string* cached_footer, bool* cache_hit) {
DCHECK(footer_cache != nullptr);
DCHECK(cached_footer != nullptr);
DCHECK(cache_hit != nullptr);

*cache_hit = false;
const HdfsFileDesc* file_desc = stream_->file_desc();
FooterCacheValue cached_value =
footer_cache->GetFooter(file_desc->filename, file_desc->mtime);

// Check if we got an ORC footer from cache
if (!std::holds_alternative<std::string>(cached_value)) {
return Status::OK();
}
*cached_footer = std::get<std::string>(cached_value);
if (cached_footer->empty()) {
return Status::OK();
}

COUNTER_ADD(num_footer_cache_hits_counter_, 1);
VLOG(2) << "Footer cache hit for ORC file: " << filename();
*cache_hit = true;
return Status::OK();
}

Status HdfsOrcScanner::CreateOrcReaderWithCachedFooter(
const std::string& cached_footer) {
try {
unique_ptr<orc::InputStream> input_stream(new ScanRangeInputStream(this));
VLOG_FILE << "Processing FileTail of ORC file: " << input_stream->getName()
<< ", file_length: " << input_stream->getLength();
reader_options_.setSerializedFileTail(cached_footer);
reader_ = orc::createReader(move(input_stream), reader_options_);

// When using cached footer, we need to position the stream as if we had read
// the footer from disk. Skip to the end of the footer range to match the state
// that would exist after reading the footer from disk.
int64_t bytes_to_skip = reader_->getSerializedFileTail().length();
if (bytes_to_skip > 0) {
Status status;
if (!stream_->SkipBytes(bytes_to_skip, &status)) {
return Status(Substitute("Failed to position stream after cached footer: $0",
status.GetDetail()));
}
VLOG(2) << Substitute("Skipped $0 bytes to position stream at footer start after "
"using cached footer", bytes_to_skip);
}
} RETURN_ON_ORC_EXCEPTION("Encountered parse error in tail of ORC file $0: $1");
return Status::OK();
}

Status HdfsOrcScanner::CreateOrcReaderFromDisk() {
try {
// ScanRangeInputStream keeps a pointer to this HdfsOrcScanner so we can hack
// async IO behind the orc::InputStream interface. The ranges of the
Expand All @@ -513,15 +570,64 @@ Status HdfsOrcScanner::ProcessFileTail() {
<< ", file_length: " << input_stream->getLength();
reader_ = orc::createReader(move(input_stream), reader_options_);
} RETURN_ON_ORC_EXCEPTION("Encountered parse error in tail of ORC file $0: $1");
return Status::OK();
}

if (reader_->getNumberOfRows() == 0) return Status::OK();
void HdfsOrcScanner::CacheOrcFooter(FooterCache* footer_cache) {
DCHECK(footer_cache != nullptr);
DCHECK(reader_ != nullptr);

const HdfsFileDesc* file_desc = stream_->file_desc();
std::string serialized_tail = reader_->getSerializedFileTail();
Status cache_status = footer_cache->PutOrcFooter(
file_desc->filename, file_desc->mtime, serialized_tail);
if (!cache_status.ok()) {
// Cache insertion failure is not fatal, just log it
VLOG(2) << "Failed to cache ORC footer for file '" << filename()
<< "': " << cache_status.GetDetail();
}
}

Status HdfsOrcScanner::ValidateOrcReader() {
DCHECK(reader_ != nullptr);

if (reader_->getNumberOfRows() == 0) return Status::OK();
if (reader_->getNumberOfStripes() == 0) {
return Status(Substitute("Invalid ORC file: $0. No stripes in this file but"
" numberOfRows in footer is $1", filename(), reader_->getNumberOfRows()));
}
return Status::OK();
}

Status HdfsOrcScanner::ProcessFileTail() {
FooterCache* footer_cache = ExecEnv::GetInstance()->disk_io_mgr()->footer_cache();

// Try to get footer from cache first if footer cache is enabled
if (footer_cache != nullptr) {
std::string cached_footer;
bool cache_hit = false;
RETURN_IF_ERROR(TryGetFooterFromCache(footer_cache, &cached_footer, &cache_hit));
if (cache_hit) {
RETURN_IF_ERROR(CreateOrcReaderWithCachedFooter(cached_footer));
return ValidateOrcReader();
}
}

// Cache miss or cache disabled - read footer from disk
if (footer_cache != nullptr) {
COUNTER_ADD(num_footer_cache_misses_counter_, 1);
}

RETURN_IF_ERROR(CreateOrcReaderFromDisk());

// Write to cache if footer cache is enabled
if (footer_cache != nullptr) {
CacheOrcFooter(footer_cache);
}

return ValidateOrcReader();
}

inline THdfsCompression::type HdfsOrcScanner::TranslateCompressionKind(
orc::CompressionKind kind) {
switch (kind) {
Expand Down
36 changes: 33 additions & 3 deletions be/src/exec/orc/hdfs-orc-scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include "runtime/exec-env.h"
#include "runtime/io/disk-io-mgr.h"
#include "runtime/io/footer-cache.h"
#include "runtime/runtime-state.h"
#include "exec/acid-metadata-utils.h"
#include "exec/hdfs-columnar-scanner.h"
Expand All @@ -38,6 +39,10 @@ struct HdfsFileDesc;
class OrcStructReader;
class OrcComplexColumnReader;

namespace io {
class FooterCache;
}

/// This scanner leverage the ORC library to parse ORC files located in HDFS. Data is
/// transformed into Impala in-memory representation (i.e. Tuples, RowBatches) by
/// different kinds of OrcColumnReaders.
Expand Down Expand Up @@ -151,13 +156,13 @@ class HdfsOrcScanner : public HdfsColumnarScanner {
return filename_;
}

/// Default read implementation for non async IO.
Status readRandom(void* buf, uint64_t length, uint64_t offset);

private:
HdfsOrcScanner* scanner_;
const HdfsFileDesc* file_desc_;
std::string filename_;

/// Default read implementation for non async IO.
Status readRandom(void* buf, uint64_t length, uint64_t offset);
};

HdfsOrcScanner(HdfsScanNodeBase* scan_node, RuntimeState* state);
Expand Down Expand Up @@ -307,6 +312,12 @@ class HdfsOrcScanner : public HdfsColumnarScanner {
/// Number of runtime filters that are pushed down to the ORC reader.
RuntimeProfile::Counter* num_pushed_down_runtime_filters_counter_ = nullptr;

/// Number of footer cache hits.
RuntimeProfile::Counter* num_footer_cache_hits_counter_ = nullptr;

/// Number of footer cache misses.
RuntimeProfile::Counter* num_footer_cache_misses_counter_ = nullptr;

/// Number of arrived runtime IN-list filters that can be pushed down.
/// Used in ShouldUpdateSearchArgument(). Init to -1 so the check can pass at first.
int num_pushable_in_list_filters_ = -1;
Expand Down Expand Up @@ -359,6 +370,25 @@ class HdfsOrcScanner : public HdfsColumnarScanner {
/// last ORC_FOOTER_SIZE bytes in context_.
Status ProcessFileTail() WARN_UNUSED_RESULT;

/// Helper methods for ProcessFileTail() to reduce complexity:

/// Try to get ORC footer from cache. Sets cache_hit to true if cache hit,
/// false if cache miss. Returns error status on failure.
Status TryGetFooterFromCache(io::FooterCache* footer_cache,
std::string* cached_footer, bool* cache_hit) WARN_UNUSED_RESULT;
/// Create ORC reader using cached footer.
Status CreateOrcReaderWithCachedFooter(const std::string& cached_footer)
WARN_UNUSED_RESULT;

/// Create ORC reader by reading footer from disk.
Status CreateOrcReaderFromDisk() WARN_UNUSED_RESULT;

/// Cache the ORC footer after reading from disk.
void CacheOrcFooter(io::FooterCache* footer_cache);

/// Validate the ORC reader after creation.
Status ValidateOrcReader() WARN_UNUSED_RESULT;

/// Resolve SchemaPath in TupleDescriptors and translate them to ORC type ids into
/// 'selected_nodes'. Track the position slots by pre-order traversal in the
/// descriptors and push them to a stack as 'pos_slots'.
Expand Down
Loading