Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion be/src/exec/hdfs-scan-node-base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,8 @@ Status HdfsScanPlanNode::ProcessScanRangesAndInitSharedState(FragmentState* stat
shared_state_.use_mt_scan_node_ = tnode_->hdfs_scan_node.use_mt_scan_node;

// Distribute the work evenly for issuing initial scan ranges.
DCHECK(shared_state_.use_mt_scan_node_ || instance_ctx_pbs.size() == 1)
DCHECK(shared_state_.use_mt_scan_node_ || instance_ctx_pbs.size() == 1
|| (instance_ctx_pbs.size() > 1 && state->query_options().disable_scan_node_mt))
<< "Non MT scan node should only have a single instance.";
auto instance_ctxs = state->instance_ctxs();
DCHECK_EQ(instance_ctxs.size(), instance_ctx_pbs.size());
Expand Down Expand Up @@ -664,6 +665,7 @@ Status HdfsScanNodeBase::IssueInitialScanRanges(RuntimeState* state) {
for (HdfsFileDesc* file : *file_list) {
if (FilePassesFilterPredicates(filter_ctxs_, file->file_format, file)) {
matching_per_type_files[file->file_format].push_back(file);
num_scan_ranges_.Add(file->splits.size());
} else {
SkipFile(file->file_format, file);
}
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/hdfs-scan-node-base.h
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,9 @@ class HdfsScanNodeBase : public ScanNode {
AtomicInt32 num_scanners_codegen_enabled_;
AtomicInt32 num_scanners_codegen_disabled_;

/// The number of scan ranges that need to be scanned by a non-MT scan node.
AtomicInt32 num_scan_ranges_;

/// If true, counters are actively running and need to be reported in the runtime
/// profile.
bool counters_running_ = false;
Expand Down
16 changes: 10 additions & 6 deletions be/src/exec/hdfs-scan-node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,11 @@ Status HdfsScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos

// Release the scanner threads
discard_result(ranges_issued_barrier_.Notify());
}

if (shared_state_->progress().done()) SetDone();
// All ranges are finished or no range to read. Indicate we are done.
if (shared_state_->progress().done() || num_scan_ranges_.Load() == 0) {
SetDone();
}

Status status = GetNextInternal(state, row_batch, eos);
Expand Down Expand Up @@ -256,8 +259,8 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourcePool* pool) {
// promptly. However, we want to minimize that by checking a conditions.
// 1. Don't start up if the ScanNode is done
// 2. Don't start up if all the ranges have been taken by another thread.
// 3. Don't start up if the number of ranges left is less than the number of
// active scanner threads.
// 3. Don't start up if the number of ranges is less than the number of
// started scanner threads.
// 4. Don't start up if no initial ranges have been issued (see IMPALA-1722).
// 5. Don't start up a ScannerThread if the row batch queue is not full since
// we are not scanner bound.
Expand Down Expand Up @@ -296,12 +299,13 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourcePool* pool) {
}

const int64_t num_active_scanner_threads = thread_state_.GetNumActive();
const int64_t num_scanner_threads_started = thread_state_.GetNumStarted();
const bool first_thread = num_active_scanner_threads == 0;
const int64_t est_mem = thread_state_.estimated_per_thread_mem();
const int64_t scanner_thread_reservation = resource_profile_.min_reservation;
// Cases 1, 2, 3.
if (done() || all_ranges_started_ ||
num_active_scanner_threads >= shared_state_->progress().remaining()) {
if (done() || all_ranges_started_
|| num_scanner_threads_started >= num_scan_ranges_.Load()) {
break;
}

Expand Down Expand Up @@ -460,7 +464,7 @@ void HdfsScanNode::ScannerThread(bool first_thread, int64_t scanner_thread_reser
runtime_state_->query_state()->scanner_mem_limiter()->ReleaseMemoryForScannerThread(
this, thread_state_.estimated_per_thread_mem());
}
thread_state_.DecrementNumActive();
if (thread_state_.DecrementNumActive()) SetDone();
}

void HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/scan-node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ Status ScanPlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) const
*node = pool->Add(new KuduScanNodeMt(pool, *this, state->desc_tbl()));
} else {
DCHECK(state->query_options().mt_dop == 0
|| state->query_options().num_scanner_threads == 1);
|| state->query_options().num_scanner_threads == 1
|| state->query_options().disable_scan_node_mt);
*node = pool->Add(new KuduScanNode(pool, *this, state->desc_tbl()));
}
break;
Expand Down
4 changes: 4 additions & 0 deletions be/src/service/query-options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1220,6 +1220,10 @@ Status impala::SetQueryOption(const string& key, const string& value,
query_options->__set_test_replan(IsTrue(value));
break;
}
case TImpalaQueryOptions::DISABLE_SCAN_NODE_MT: {
query_options->__set_disable_scan_node_mt(IsTrue(value));
break;
}
default:
if (IsRemovedQueryOption(key)) {
LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
Expand Down
3 changes: 2 additions & 1 deletion be/src/service/query-options.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
// time we add or remove a query option to/from the enum TImpalaQueryOptions.
#define QUERY_OPTS_TABLE\
DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
TImpalaQueryOptions::LOCK_MAX_WAIT_TIME_S + 1);\
TImpalaQueryOptions::DISABLE_SCAN_NODE_MT + 1);\
REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
Expand Down Expand Up @@ -282,6 +282,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
QUERY_OPT_FN(test_replan, TEST_REPLAN,\
TQueryOptionLevel::ADVANCED)\
QUERY_OPT_FN(lock_max_wait_time_s, LOCK_MAX_WAIT_TIME_S, TQueryOptionLevel::REGULAR)\
QUERY_OPT_FN(disable_scan_node_mt, DISABLE_SCAN_NODE_MT, TQueryOptionLevel::ADVANCED)\
;

/// Enforce practical limits on some query options to avoid undesired query state.
Expand Down
4 changes: 4 additions & 0 deletions common/thrift/ImpalaService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,10 @@ enum TImpalaQueryOptions {

// Maximum wait time on HMS ACID lock in seconds.
LOCK_MAX_WAIT_TIME_S = 145

// If true, use HdfsScanNode/KuduScanNode instead of HdfsScanNodeMt/KuduScanNodeMt
// in any cases even if mt_dop > 0.
DISABLE_SCAN_NODE_MT = 146;
}

// The summary of a DML statement.
Expand Down
3 changes: 3 additions & 0 deletions common/thrift/Query.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,9 @@ struct TQueryOptions {

// See comment in ImpalaService.thrift
146: optional i32 lock_max_wait_time_s = 300

// See comment in ImpalaService.thrift
147: optional bool disable_scan_node_mt = false;
}

// Impala currently has three types of sessions: Beeswax, HiveServer2 and external
Expand Down
13 changes: 11 additions & 2 deletions fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -1911,6 +1911,16 @@ protected String getNodeExplainString(String prefix, String detailPrefix,
.append(" max-scan-range-rows=")
.append(PrintUtils.printEstCardinality(maxScanRangeNumRows_))
.append("\n");
String scanNodeType;
if (useMtScanNode_) {
scanNodeType = "single-threaded";
} else {
scanNodeType = "multiple-threaded";
}
output.append(detailPrefix)
.append("scan node type: ")
.append(scanNodeType)
.append("\n");
if (numScanRangesNoDiskIds_ > 0) {
output.append(detailPrefix)
.append(String.format("missing disk ids: "
Expand All @@ -1923,7 +1933,6 @@ numFilesNoDiskIds_, sumValues(totalFilesPerFs_), numScanRangesNoDiskIds_,
output.append(getMinMaxOriginalConjunctsExplainString(detailPrefix, detailLevel));
// Groups the dictionary filterable conjuncts by tuple descriptor.
output.append(getDictionaryConjunctsExplainString(detailPrefix, detailLevel));

}
if (detailLevel.ordinal() >= TExplainLevel.VERBOSE.ordinal()) {
// Add file formats after sorting so their order is deterministic in the explain
Expand Down Expand Up @@ -2070,7 +2079,7 @@ public void computeNodeResourceProfile(TQueryOptions queryOptions) {
}

// The non-MT scan node requires at least one scanner thread.
useMtScanNode_ = queryOptions.mt_dop > 0;
useMtScanNode_ = queryOptions.mt_dop > 0 && !queryOptions.disable_scan_node_mt;
int requiredThreads = useMtScanNode_ ? 0 : 1;
int maxScannerThreads = computeMaxNumberOfScannerThreads(queryOptions,
perHostScanRanges);
Expand Down
12 changes: 11 additions & 1 deletion fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ public void computeNodeResourceProfile(TQueryOptions queryOptions) {
kudu_scanner_thread_max_estimated_bytes;
long mem_estimate_per_thread = Math.min(getNumMaterializedSlots(desc_) *
estimated_bytes_per_column_per_thread, max_estimated_bytes_per_thread);
useMtScanNode_ = queryOptions.mt_dop > 0;
useMtScanNode_ = queryOptions.mt_dop > 0 && !queryOptions.disable_scan_node_mt;
nodeResourceProfile_ = new ResourceProfileBuilder()
.setMemEstimateBytes(mem_estimate_per_thread * maxScannerThreads)
.setThreadReservation(useMtScanNode_ ? 0 : 1).build();
Expand Down Expand Up @@ -446,6 +446,16 @@ protected String getNodeExplainString(String prefix, String detailPrefix,
result.append(detailPrefix + "runtime filters: ");
result.append(getRuntimeFilterExplainString(false, detailLevel));
}
String scanNodeType;
if (useMtScanNode_) {
scanNodeType = "single-threaded";
} else {
scanNodeType = "multiple-threaded";
}
result.append(detailPrefix)
.append("scan node type: ")
.append(scanNodeType)
.append("\n");
}
}
return result.toString();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
====
---- QUERY
# DISABLE_SCAN_NODE_MT=false
set mt_dop=3;
set explain_level=3;
explain select count(*) from alltypes;
---- RESULTS: VERIFY_IS_SUBSET
row_regex: .*scan node type: single-threaded
---- TYPES
STRING
====
---- QUERY
# DISABLE_SCAN_NODE_MT=true
set mt_dop=3;
set disable_scan_node_mt=1;
set explain_level=3;
explain select count(*) from alltypes;
---- RESULTS: VERIFY_IS_SUBSET
row_regex: .*scan node type: multiple-threaded
---- TYPES
STRING
====
---- QUERY
# DISABLE_SCAN_NODE_MT=true, so we can get scanner thread related counters in profile.
set mt_dop=3;
set disable_scan_node_mt=1;
select count(*) from alltypes;
---- TYPES
BIGINT
---- RESULTS
7300
---- RUNTIME_PROFILE
row_regex: .*scan node type: multiple-threaded
row_regex: .*Query Options \(set by configuration\): .*MT_DOP=3.*DISABLE_SCAN_NODE_MT=1.*
row_regex: .*AverageScannerThreadConcurrency: .*
====
81 changes: 81 additions & 0 deletions tests/query_test/test_disable_scan_node_mt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.skip import (SkipIfEC, SkipIfLocal, SkipIfS3, SkipIfABFS,
SkipIfGCS, SkipIfCOS, SkipIfADLS)
from tests.common.test_dimensions import create_kudu_dimension
from tests.common.test_dimensions import create_parquet_dimension
from tests.common.test_dimensions import create_uncompressed_text_dimension


class TestDisableHdfsScanNodeMt(ImpalaTestSuite):
"""Test disable HdfsScanNodeMt functionality."""

@classmethod
def get_workload(self):
return 'functional-query'

@classmethod
def add_test_dimensions(cls):
super(TestDisableHdfsScanNodeMt, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_dimension(
create_parquet_dimension(cls.get_workload()))

def test_disable_scan_node_mt(self, vector):
self.run_test_case('QueryTest/disable-scan-node-mt', vector)


class TestDisableKuduScanNodeMt(ImpalaTestSuite):
"""Test disable KuduScanNodeMt functionality."""

@classmethod
def get_workload(self):
return 'functional-query'

@classmethod
def add_test_dimensions(cls):
super(TestDisableKuduScanNodeMt, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_dimension(
create_kudu_dimension(cls.get_workload()))

def test_disable_scan_node_mt(self, vector):
self.run_test_case('QueryTest/disable-scan-node-mt', vector)


class TestDisableHdfsScanNodeMtWithTextFile(ImpalaTestSuite):
"""Test disable HdfsScanNodeMt with text file. We may get more instances than files
in some cases, test that we can execute query successfully and get right result."""

@classmethod
def get_workload(self):
return 'tpch'

@classmethod
def add_test_dimensions(cls):
super(TestDisableHdfsScanNodeMtWithTextFile, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_dimension(
create_uncompressed_text_dimension(cls.get_workload()))

def test_disable_scan_node_mt_with_text_file(self, vector):
query = "select count(*) from tpch.lineitem;"
vector.get_value('exec_option')['disable_scan_node_mt'] = 1
vector.get_value('exec_option')['mt_dop'] = 3
results = self.execute_query(query, vector.get_value('exec_option'))
assert results.success
assert len(results.data) == 1
assert int(results.data[0]) == 6001215