diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc index 0194677323..900e5a45a0 100644 --- a/be/src/exec/hdfs-scan-node-base.cc +++ b/be/src/exec/hdfs-scan-node-base.cc @@ -330,7 +330,8 @@ Status HdfsScanPlanNode::ProcessScanRangesAndInitSharedState(FragmentState* stat shared_state_.use_mt_scan_node_ = tnode_->hdfs_scan_node.use_mt_scan_node; // Distribute the work evenly for issuing initial scan ranges. - DCHECK(shared_state_.use_mt_scan_node_ || instance_ctx_pbs.size() == 1) + DCHECK(shared_state_.use_mt_scan_node_ || instance_ctx_pbs.size() == 1 + || (instance_ctx_pbs.size() > 1 && state->query_options().disable_scan_node_mt)) << "Non MT scan node should only have a single instance."; auto instance_ctxs = state->instance_ctxs(); DCHECK_EQ(instance_ctxs.size(), instance_ctx_pbs.size()); @@ -664,6 +665,7 @@ Status HdfsScanNodeBase::IssueInitialScanRanges(RuntimeState* state) { for (HdfsFileDesc* file : *file_list) { if (FilePassesFilterPredicates(filter_ctxs_, file->file_format, file)) { matching_per_type_files[file->file_format].push_back(file); + num_scan_ranges_.Add(file->splits.size()); } else { SkipFile(file->file_format, file); } diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h index 7f5643756e..fe234e0e7b 100644 --- a/be/src/exec/hdfs-scan-node-base.h +++ b/be/src/exec/hdfs-scan-node-base.h @@ -740,6 +740,9 @@ class HdfsScanNodeBase : public ScanNode { AtomicInt32 num_scanners_codegen_enabled_; AtomicInt32 num_scanners_codegen_disabled_; + /// The number of scan ranges that need to be scanned by a non-MT scan node. + AtomicInt32 num_scan_ranges_; + /// If true, counters are actively running and need to be reported in the runtime /// profile. bool counters_running_ = false; diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc index d9d7985f2f..c933ca390f 100644 --- a/be/src/exec/hdfs-scan-node.cc +++ b/be/src/exec/hdfs-scan-node.cc @@ -104,8 +104,11 @@ Status HdfsScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos // Release the scanner threads discard_result(ranges_issued_barrier_.Notify()); + } - if (shared_state_->progress().done()) SetDone(); + // All ranges are finished or no range to read. Indicate we are done. + if (shared_state_->progress().done() || num_scan_ranges_.Load() == 0) { + SetDone(); } Status status = GetNextInternal(state, row_batch, eos); @@ -256,8 +259,8 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourcePool* pool) { // promptly. However, we want to minimize that by checking a conditions. // 1. Don't start up if the ScanNode is done // 2. Don't start up if all the ranges have been taken by another thread. - // 3. Don't start up if the number of ranges left is less than the number of - // active scanner threads. + // 3. Don't start up if the number of ranges is less than the number of + // started scanner threads. // 4. Don't start up if no initial ranges have been issued (see IMPALA-1722). // 5. Don't start up a ScannerThread if the row batch queue is not full since // we are not scanner bound. @@ -296,12 +299,13 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourcePool* pool) { } const int64_t num_active_scanner_threads = thread_state_.GetNumActive(); + const int64_t num_scanner_threads_started = thread_state_.GetNumStarted(); const bool first_thread = num_active_scanner_threads == 0; const int64_t est_mem = thread_state_.estimated_per_thread_mem(); const int64_t scanner_thread_reservation = resource_profile_.min_reservation; // Cases 1, 2, 3. - if (done() || all_ranges_started_ || - num_active_scanner_threads >= shared_state_->progress().remaining()) { + if (done() || all_ranges_started_ + || num_scanner_threads_started >= num_scan_ranges_.Load()) { break; } @@ -460,7 +464,7 @@ void HdfsScanNode::ScannerThread(bool first_thread, int64_t scanner_thread_reser runtime_state_->query_state()->scanner_mem_limiter()->ReleaseMemoryForScannerThread( this, thread_state_.estimated_per_thread_mem()); } - thread_state_.DecrementNumActive(); + if (thread_state_.DecrementNumActive()) SetDone(); } void HdfsScanNode::ProcessSplit(const vector& filter_ctxs, diff --git a/be/src/exec/scan-node.cc b/be/src/exec/scan-node.cc index d036d5b620..10770bce20 100644 --- a/be/src/exec/scan-node.cc +++ b/be/src/exec/scan-node.cc @@ -132,7 +132,8 @@ Status ScanPlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) const *node = pool->Add(new KuduScanNodeMt(pool, *this, state->desc_tbl())); } else { DCHECK(state->query_options().mt_dop == 0 - || state->query_options().num_scanner_threads == 1); + || state->query_options().num_scanner_threads == 1 + || state->query_options().disable_scan_node_mt); *node = pool->Add(new KuduScanNode(pool, *this, state->desc_tbl())); } break; diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc index 49bb06476e..0097005bc6 100644 --- a/be/src/service/query-options.cc +++ b/be/src/service/query-options.cc @@ -1220,6 +1220,10 @@ Status impala::SetQueryOption(const string& key, const string& value, query_options->__set_test_replan(IsTrue(value)); break; } + case TImpalaQueryOptions::DISABLE_SCAN_NODE_MT: { + query_options->__set_disable_scan_node_mt(IsTrue(value)); + break; + } default: if (IsRemovedQueryOption(key)) { LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'"; diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h index fa9691b2c4..9f050f63c7 100644 --- a/be/src/service/query-options.h +++ b/be/src/service/query-options.h @@ -50,7 +50,7 @@ typedef std::unordered_map // time we add or remove a query option to/from the enum TImpalaQueryOptions. #define QUERY_OPTS_TABLE\ DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\ - TImpalaQueryOptions::LOCK_MAX_WAIT_TIME_S + 1);\ + TImpalaQueryOptions::DISABLE_SCAN_NODE_MT + 1);\ REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\ QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\ REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\ @@ -282,6 +282,7 @@ typedef std::unordered_map QUERY_OPT_FN(test_replan, TEST_REPLAN,\ TQueryOptionLevel::ADVANCED)\ QUERY_OPT_FN(lock_max_wait_time_s, LOCK_MAX_WAIT_TIME_S, TQueryOptionLevel::REGULAR)\ + QUERY_OPT_FN(disable_scan_node_mt, DISABLE_SCAN_NODE_MT, TQueryOptionLevel::ADVANCED)\ ; /// Enforce practical limits on some query options to avoid undesired query state. diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift index 8a118d2413..7e8a50e9c6 100644 --- a/common/thrift/ImpalaService.thrift +++ b/common/thrift/ImpalaService.thrift @@ -733,6 +733,10 @@ enum TImpalaQueryOptions { // Maximum wait time on HMS ACID lock in seconds. LOCK_MAX_WAIT_TIME_S = 145 + + // If true, use HdfsScanNode/KuduScanNode instead of HdfsScanNodeMt/KuduScanNodeMt + // in any cases even if mt_dop > 0. + DISABLE_SCAN_NODE_MT = 146; } // The summary of a DML statement. diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift index 4d45ad1c87..47d20e4fbe 100644 --- a/common/thrift/Query.thrift +++ b/common/thrift/Query.thrift @@ -591,6 +591,9 @@ struct TQueryOptions { // See comment in ImpalaService.thrift 146: optional i32 lock_max_wait_time_s = 300 + + // See comment in ImpalaService.thrift + 147: optional bool disable_scan_node_mt = false; } // Impala currently has three types of sessions: Beeswax, HiveServer2 and external diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java index 78577769dd..c0ca161db3 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java @@ -1911,6 +1911,16 @@ protected String getNodeExplainString(String prefix, String detailPrefix, .append(" max-scan-range-rows=") .append(PrintUtils.printEstCardinality(maxScanRangeNumRows_)) .append("\n"); + String scanNodeType; + if (useMtScanNode_) { + scanNodeType = "single-threaded"; + } else { + scanNodeType = "multiple-threaded"; + } + output.append(detailPrefix) + .append("scan node type: ") + .append(scanNodeType) + .append("\n"); if (numScanRangesNoDiskIds_ > 0) { output.append(detailPrefix) .append(String.format("missing disk ids: " @@ -1923,7 +1933,6 @@ numFilesNoDiskIds_, sumValues(totalFilesPerFs_), numScanRangesNoDiskIds_, output.append(getMinMaxOriginalConjunctsExplainString(detailPrefix, detailLevel)); // Groups the dictionary filterable conjuncts by tuple descriptor. output.append(getDictionaryConjunctsExplainString(detailPrefix, detailLevel)); - } if (detailLevel.ordinal() >= TExplainLevel.VERBOSE.ordinal()) { // Add file formats after sorting so their order is deterministic in the explain @@ -2070,7 +2079,7 @@ public void computeNodeResourceProfile(TQueryOptions queryOptions) { } // The non-MT scan node requires at least one scanner thread. - useMtScanNode_ = queryOptions.mt_dop > 0; + useMtScanNode_ = queryOptions.mt_dop > 0 && !queryOptions.disable_scan_node_mt; int requiredThreads = useMtScanNode_ ? 0 : 1; int maxScannerThreads = computeMaxNumberOfScannerThreads(queryOptions, perHostScanRanges); diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java index 6663460aeb..54affaa9d1 100644 --- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java @@ -412,7 +412,7 @@ public void computeNodeResourceProfile(TQueryOptions queryOptions) { kudu_scanner_thread_max_estimated_bytes; long mem_estimate_per_thread = Math.min(getNumMaterializedSlots(desc_) * estimated_bytes_per_column_per_thread, max_estimated_bytes_per_thread); - useMtScanNode_ = queryOptions.mt_dop > 0; + useMtScanNode_ = queryOptions.mt_dop > 0 && !queryOptions.disable_scan_node_mt; nodeResourceProfile_ = new ResourceProfileBuilder() .setMemEstimateBytes(mem_estimate_per_thread * maxScannerThreads) .setThreadReservation(useMtScanNode_ ? 0 : 1).build(); @@ -446,6 +446,16 @@ protected String getNodeExplainString(String prefix, String detailPrefix, result.append(detailPrefix + "runtime filters: "); result.append(getRuntimeFilterExplainString(false, detailLevel)); } + String scanNodeType; + if (useMtScanNode_) { + scanNodeType = "single-threaded"; + } else { + scanNodeType = "multiple-threaded"; + } + result.append(detailPrefix) + .append("scan node type: ") + .append(scanNodeType) + .append("\n"); } } return result.toString(); diff --git a/testdata/workloads/functional-query/queries/QueryTest/disable-scan-node-mt.test b/testdata/workloads/functional-query/queries/QueryTest/disable-scan-node-mt.test new file mode 100644 index 0000000000..8a4282f4e3 --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/disable-scan-node-mt.test @@ -0,0 +1,36 @@ +==== +---- QUERY +# DISABLE_SCAN_NODE_MT=false +set mt_dop=3; +set explain_level=3; +explain select count(*) from alltypes; +---- RESULTS: VERIFY_IS_SUBSET +row_regex: .*scan node type: single-threaded +---- TYPES +STRING +==== +---- QUERY +# DISABLE_SCAN_NODE_MT=true +set mt_dop=3; +set disable_scan_node_mt=1; +set explain_level=3; +explain select count(*) from alltypes; +---- RESULTS: VERIFY_IS_SUBSET +row_regex: .*scan node type: multiple-threaded +---- TYPES +STRING +==== +---- QUERY +# DISABLE_SCAN_NODE_MT=true, so we can get scanner thread related counters in profile. +set mt_dop=3; +set disable_scan_node_mt=1; +select count(*) from alltypes; +---- TYPES +BIGINT +---- RESULTS +7300 +---- RUNTIME_PROFILE +row_regex: .*scan node type: multiple-threaded +row_regex: .*Query Options \(set by configuration\): .*MT_DOP=3.*DISABLE_SCAN_NODE_MT=1.* +row_regex: .*AverageScannerThreadConcurrency: .* +==== diff --git a/tests/query_test/test_disable_scan_node_mt.py b/tests/query_test/test_disable_scan_node_mt.py new file mode 100644 index 0000000000..105e28a70a --- /dev/null +++ b/tests/query_test/test_disable_scan_node_mt.py @@ -0,0 +1,81 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from tests.common.impala_test_suite import ImpalaTestSuite +from tests.common.skip import (SkipIfEC, SkipIfLocal, SkipIfS3, SkipIfABFS, + SkipIfGCS, SkipIfCOS, SkipIfADLS) +from tests.common.test_dimensions import create_kudu_dimension +from tests.common.test_dimensions import create_parquet_dimension +from tests.common.test_dimensions import create_uncompressed_text_dimension + + +class TestDisableHdfsScanNodeMt(ImpalaTestSuite): + """Test disable HdfsScanNodeMt functionality.""" + + @classmethod + def get_workload(self): + return 'functional-query' + + @classmethod + def add_test_dimensions(cls): + super(TestDisableHdfsScanNodeMt, cls).add_test_dimensions() + cls.ImpalaTestMatrix.add_dimension( + create_parquet_dimension(cls.get_workload())) + + def test_disable_scan_node_mt(self, vector): + self.run_test_case('QueryTest/disable-scan-node-mt', vector) + + +class TestDisableKuduScanNodeMt(ImpalaTestSuite): + """Test disable KuduScanNodeMt functionality.""" + + @classmethod + def get_workload(self): + return 'functional-query' + + @classmethod + def add_test_dimensions(cls): + super(TestDisableKuduScanNodeMt, cls).add_test_dimensions() + cls.ImpalaTestMatrix.add_dimension( + create_kudu_dimension(cls.get_workload())) + + def test_disable_scan_node_mt(self, vector): + self.run_test_case('QueryTest/disable-scan-node-mt', vector) + + +class TestDisableHdfsScanNodeMtWithTextFile(ImpalaTestSuite): + """Test disable HdfsScanNodeMt with text file. We may get more instances than files + in some cases, test that we can execute query successfully and get right result.""" + + @classmethod + def get_workload(self): + return 'tpch' + + @classmethod + def add_test_dimensions(cls): + super(TestDisableHdfsScanNodeMtWithTextFile, cls).add_test_dimensions() + cls.ImpalaTestMatrix.add_dimension( + create_uncompressed_text_dimension(cls.get_workload())) + + def test_disable_scan_node_mt_with_text_file(self, vector): + query = "select count(*) from tpch.lineitem;" + vector.get_value('exec_option')['disable_scan_node_mt'] = 1 + vector.get_value('exec_option')['mt_dop'] = 3 + results = self.execute_query(query, vector.get_value('exec_option')) + assert results.success + assert len(results.data) == 1 + assert int(results.data[0]) == 6001215