diff --git a/be/src/scheduling/cluster-membership-mgr-test.cc b/be/src/scheduling/cluster-membership-mgr-test.cc index fd86eead4f..2c6dfe19d7 100644 --- a/be/src/scheduling/cluster-membership-mgr-test.cc +++ b/be/src/scheduling/cluster-membership-mgr-test.cc @@ -779,15 +779,58 @@ TEST(ClusterMembershipMgrUnitTest, PopulateExecutorMembershipRequest) { exec_group2.AddExecutor(MakeBackendDescriptor(1, exec_group2, 1)); exec_group2.AddExecutor(MakeBackendDescriptor(2, exec_group2, 2)); snapshot_ptr->executor_groups.insert({exec_group2.name(), exec_group2}); + EXPECT_EQ(snapshot_ptr->executor_groups.size(), 2); ClusterMembershipMgr::SnapshotPtr ptr = snapshot_ptr; PopulateExecutorMembershipRequest(ptr, populated_exec_group_sets, update_req); - EXPECT_EQ(update_req.exec_group_sets.size(), 2); + // The executor group that does not match to any executor group sets is not included + // in the update request. + EXPECT_EQ(update_req.exec_group_sets.size(), 1); EXPECT_EQ(update_req.exec_group_sets[0].curr_num_executors, 1); EXPECT_EQ(update_req.exec_group_sets[0].expected_num_executors, 2); EXPECT_EQ(update_req.exec_group_sets[0].exec_group_name_prefix, "foo"); - EXPECT_EQ(update_req.exec_group_sets[1].curr_num_executors, 0); - EXPECT_EQ(update_req.exec_group_sets[1].expected_num_executors, 10); - EXPECT_EQ(update_req.exec_group_sets[1].exec_group_name_prefix, "bar"); + snapshot_ptr->executor_groups.clear(); + } + + // Case 2e: Using executor groups, expected_exec_group_sets is non-empty + // and one default executor group has more number of executors. + { + ExecutorGroup exec_group("foo-group1", 1); + exec_group.AddExecutor(MakeBackendDescriptor(1, exec_group, 0)); + snapshot_ptr->executor_groups.insert({exec_group.name(), exec_group}); + // Adding a default exec group. + ExecutorGroup exec_group2(ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME, 1); + exec_group2.AddExecutor(MakeBackendDescriptor(1, exec_group2, 1)); + exec_group2.AddExecutor(MakeBackendDescriptor(2, exec_group2, 2)); + snapshot_ptr->executor_groups.insert({exec_group2.name(), exec_group2}); + ClusterMembershipMgr::SnapshotPtr ptr = snapshot_ptr; + PopulateExecutorMembershipRequest(ptr, populated_exec_group_sets, update_req); + EXPECT_EQ(update_req.exec_group_sets.size(), 2); + EXPECT_EQ(update_req.exec_group_sets[0].curr_num_executors, 2); + EXPECT_EQ(update_req.exec_group_sets[0].expected_num_executors, 20); + EXPECT_EQ(update_req.exec_group_sets[0].exec_group_name_prefix, ""); + EXPECT_EQ(update_req.exec_group_sets[1].curr_num_executors, 1); + EXPECT_EQ(update_req.exec_group_sets[1].expected_num_executors, 2); + EXPECT_EQ(update_req.exec_group_sets[1].exec_group_name_prefix, "foo"); + snapshot_ptr->executor_groups.clear(); + } + + // Case 2f: Using executor groups, expected_exec_group_sets is non-empty + // and no executor group match to the configured executor group sets. + { + ExecutorGroup exec_group("unmatch-group1", 1); + exec_group.AddExecutor(MakeBackendDescriptor(1, exec_group, 0)); + snapshot_ptr->executor_groups.insert({exec_group.name(), exec_group}); + // Adding a default exec group. + ExecutorGroup exec_group2(ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME, 1); + exec_group2.AddExecutor(MakeBackendDescriptor(1, exec_group2, 1)); + exec_group2.AddExecutor(MakeBackendDescriptor(2, exec_group2, 2)); + snapshot_ptr->executor_groups.insert({exec_group2.name(), exec_group2}); + ClusterMembershipMgr::SnapshotPtr ptr = snapshot_ptr; + PopulateExecutorMembershipRequest(ptr, populated_exec_group_sets, update_req); + EXPECT_EQ(update_req.exec_group_sets.size(), 1); + EXPECT_EQ(update_req.exec_group_sets[0].curr_num_executors, 2); + EXPECT_EQ(update_req.exec_group_sets[0].expected_num_executors, 20); + EXPECT_EQ(update_req.exec_group_sets[0].exec_group_name_prefix, ""); snapshot_ptr->executor_groups.clear(); } } diff --git a/be/src/scheduling/cluster-membership-mgr.cc b/be/src/scheduling/cluster-membership-mgr.cc index 426931a544..35a14997ac 100644 --- a/be/src/scheduling/cluster-membership-mgr.cc +++ b/be/src/scheduling/cluster-membership-mgr.cc @@ -665,48 +665,55 @@ void PopulateExecutorMembershipRequest(ClusterMembershipMgr::SnapshotPtr& snapsh exec_group_sets.back().curr_num_executors++; } } + } + + if (exec_group_sets.empty() && expected_exec_group_sets.empty()) { + // Add a default exec group set if no expected group sets were specified. + exec_group_sets.emplace_back(); + exec_group_sets.back().__set_expected_num_executors(FLAGS_num_expected_executors); } else { - if (expected_exec_group_sets.empty()) { - // Add a default exec group set if no expected group sets were specified. - exec_group_sets.emplace_back(); - exec_group_sets.back().__set_expected_num_executors(FLAGS_num_expected_executors); - } else { - exec_group_sets.insert(exec_group_sets.begin(), expected_exec_group_sets.begin(), - expected_exec_group_sets.end()); - } - int matching_exec_groups_found = 0; - for (auto& set : exec_group_sets) { - int max_num_executors = -1; - StringPiece prefix(set.exec_group_name_prefix); - DCHECK(!prefix.empty() || exec_group_sets.size() == 1) - << "An empty group set prefix can only exist if no executor group sets are " - "specified"; - for (const auto& it : snapshot->executor_groups) { - StringPiece name(it.first); - if (!prefix.empty() && !name.starts_with(prefix)) continue; - matching_exec_groups_found++; - if (!it.second.IsHealthy()) continue; - int num_executors = it.second.NumExecutors(); - if (num_executors > max_num_executors) { - max_num_executors = num_executors; - set.curr_num_executors = num_executors; - } + exec_group_sets.insert(exec_group_sets.end(), expected_exec_group_sets.begin(), + expected_exec_group_sets.end()); + } + int matching_exec_groups_found = 0; + for (auto& set : exec_group_sets) { + // Ignore the processed default executor group. + if (set.exec_group_name_prefix.empty() && set.curr_num_executors > 0) continue; + int max_num_executors = -1; + StringPiece prefix(set.exec_group_name_prefix); + for (const auto& it : snapshot->executor_groups) { + if (it.first == ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME) continue; + StringPiece name(it.first); + if (!prefix.empty() && !name.starts_with(prefix)) continue; + matching_exec_groups_found++; + if (!it.second.IsHealthy()) continue; + int num_executors = it.second.NumExecutors(); + if (num_executors > max_num_executors) { + max_num_executors = num_executors; + set.curr_num_executors = num_executors; } } - if (matching_exec_groups_found != snapshot->executor_groups.size()) { - vector group_sets; - group_sets.reserve(exec_group_sets.size()); - for (const auto& set : exec_group_sets) { - group_sets.push_back(set.exec_group_name_prefix); - } - vector group_names; - for (const auto& it : snapshot->executor_groups) { - group_names.push_back(it.first); + } + if (matching_exec_groups_found != snapshot->executor_groups.size()) { + vector group_sets(exec_group_sets.size()); + for (const auto& set : exec_group_sets) { + group_sets.push_back(set.exec_group_name_prefix); + } + vector group_names(snapshot->executor_groups.size()); + for (const auto& it : snapshot->executor_groups) { + group_names.push_back(it.first); + } + LOG(WARNING) << "Some executor groups either do not match expected group sets or " + "match to more than one set. Expected group sets: " << + boost::algorithm::join(group_sets, ",") << " Current executor groups: " << + boost::algorithm::join(group_names, ","); + // Remove executor group sets that do not have any matched executors + for (auto it = exec_group_sets.begin(); it != exec_group_sets.end();) { + if (it->curr_num_executors == 0) { + exec_group_sets.erase(it); + } else { + it++; } - LOG(WARNING) << "Some executor groups either do not match expected group sets or " - "match to more than one set. Expected group sets: " - << boost::algorithm::join(group_sets, ",") << " Current executor groups: " - << boost::algorithm::join(group_names, ","); } } update_req.__set_exec_group_sets(exec_group_sets); diff --git a/be/src/scheduling/cluster-membership-mgr.h b/be/src/scheduling/cluster-membership-mgr.h index ff698f1b5a..341abbd28a 100644 --- a/be/src/scheduling/cluster-membership-mgr.h +++ b/be/src/scheduling/cluster-membership-mgr.h @@ -289,6 +289,7 @@ class ClusterMembershipMgr { /// The frontend uses cluster membership information to determine whether it expects the /// scheduler to assign local or remote reads. It also uses the number of executors to /// determine the join type (partitioned vs broadcast). +/// Only non-empty executor group sets will be included in the 'update_req'. void PopulateExecutorMembershipRequest(ClusterMembershipMgr::SnapshotPtr& snapshot, const std::vector& expected_exec_group_sets, TUpdateExecutorMembershipRequest& update_req); diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index 794a65a798..5f1ead248e 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -1888,11 +1888,11 @@ private void markTimelineRetries(int numRetries, String msg, EventSequence timel */ public static List setupThresholdsForExecutorGroupSets( List executorGroupSets, String request_pool, - boolean default_executor_group, boolean test_replan) throws ImpalaException { + boolean default_executor_group_only, boolean test_replan) throws ImpalaException { RequestPoolService poolService = RequestPoolService.getInstance(); List result = Lists.newArrayList(); - if (default_executor_group) { + if (default_executor_group_only) { TExecutorGroupSet e = executorGroupSets.get(0); if (e.getCurr_num_executors() == 0) { // The default group has 0 executors. Return one with the number of default @@ -1969,8 +1969,17 @@ public static List setupThresholdsForExecutorGroupSets( } if (executorGroupSets.size() > 0 && result.size() == 0 && StringUtils.isNotEmpty(request_pool)) { - throw new AnalysisException("Request pool: " + request_pool - + " does not map to any known executor group set."); + // If the default group exists, fall back to the default group. + TExecutorGroupSet e = executorGroupSets.get(0); + if (isDefaultExecGroupSet(e)) { + result.add(new TExecutorGroupSet(e)); + result.get(0).setMax_mem_limit(Long.MAX_VALUE); + result.get(0).setNum_cores_per_executor(Integer.MAX_VALUE); + } + else { + throw new AnalysisException("Request pool: " + request_pool + + " does not map to any known executor group set."); + } } // Sort 'executorGroupSets' by @@ -2013,6 +2022,10 @@ private static int expectedTotalCores(TExecutorGroupSet execGroupSet) { expectedNumExecutor(execGroupSet), execGroupSet.getNum_cores_per_executor()); } + private static boolean isDefaultExecGroupSet(TExecutorGroupSet execGroupSet) { + return StringUtils.isEmpty(execGroupSet.getExec_group_name_prefix()); + } + private TExecRequest getTExecRequest(PlanCtx planCtx, EventSequence timeline) throws ImpalaException { TQueryCtx queryCtx = planCtx.getQueryContext(); @@ -2031,15 +2044,14 @@ private TExecRequest getTExecRequest(PlanCtx planCtx, EventSequence timeline) LOG.info("The original executor group sets from executor membership snapshot: " + originalExecutorGroupSets); - boolean default_executor_group = false; + boolean default_executor_group_only = false; if (originalExecutorGroupSets.size() == 1) { TExecutorGroupSet e = originalExecutorGroupSets.get(0); - default_executor_group = e.getExec_group_name_prefix() == null - || e.getExec_group_name_prefix().isEmpty(); + default_executor_group_only = isDefaultExecGroupSet(e); } List executorGroupSetsToUse = Frontend.setupThresholdsForExecutorGroupSets(originalExecutorGroupSets, - queryOptions.getRequest_pool(), default_executor_group, + queryOptions.getRequest_pool(), default_executor_group_only, enable_replan && (RuntimeEnv.INSTANCE.isTestEnv() || queryOptions.isTest_replan())); @@ -2137,7 +2149,8 @@ private TExecRequest getTExecRequest(PlanCtx planCtx, EventSequence timeline) } if (notScalable) { - setGroupNamePrefix(default_executor_group, clientSetRequestPool, req, group_set); + setGroupNamePrefix(default_executor_group_only, + clientSetRequestPool, req, group_set); addInfoString( groupSetProfile, VERDICT, "Assign to first group because " + reason); FrontendProfile.getCurrent().addChildrenProfile(groupSetProfile); @@ -2194,7 +2207,8 @@ private TExecRequest getTExecRequest(PlanCtx planCtx, EventSequence timeline) boolean matchFound = false; if (clientSetRequestPool) { - if (!default_executor_group) { + if (!default_executor_group_only && + !group_set.getExec_group_name_prefix().isEmpty()) { Preconditions.checkState(group_set.getExec_group_name_prefix().endsWith( queryOptions.getRequest_pool())); } @@ -2222,7 +2236,8 @@ private TExecRequest getTExecRequest(PlanCtx planCtx, EventSequence timeline) FrontendProfile.getCurrent().addChildrenProfile(groupSetProfile); if (matchFound) { - setGroupNamePrefix(default_executor_group, clientSetRequestPool, req, group_set); + setGroupNamePrefix(default_executor_group_only, + clientSetRequestPool, req, group_set); break; } @@ -2273,11 +2288,12 @@ private TExecRequest getTExecRequest(PlanCtx planCtx, EventSequence timeline) return req; } - private static void setGroupNamePrefix(boolean default_executor_group, + private static void setGroupNamePrefix(boolean default_executor_group_only, boolean clientSetRequestPool, TExecRequest req, TExecutorGroupSet group_set) { // Set the group name prefix in both the returned query options and // the query context for non default group setup. - if (!default_executor_group) { + if (!default_executor_group_only && + !group_set.getExec_group_name_prefix().isEmpty()) { String namePrefix = group_set.getExec_group_name_prefix(); req.query_options.setRequest_pool(namePrefix); req.setRequest_pool_set_by_frontend(!clientSetRequestPool); diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py index af94e68f01..d6e5d3f57d 100644 --- a/tests/custom_cluster/test_executor_groups.py +++ b/tests/custom_cluster/test_executor_groups.py @@ -1542,3 +1542,47 @@ def test_expected_executors_no_healthy_groups(self): assert "Request Pool: root.large" in profile, profile self.client.close_query(handle) + + @pytest.mark.execute_serially + def test_query_with_default_and_non_default_executor_groups(self): + """Test query planning and execution when there are default and non-default executor + groups in a cluster.""" + + # The path to resources directory which contains the admission control config files. + RESOURCES_DIR = os.path.join(os.environ['IMPALA_HOME'], "fe", "src", "test", + "resources") + fs_allocation_path = os.path.join(RESOURCES_DIR, "fair-scheduler-allocation.xml") + llama_site_path = os.path.join(RESOURCES_DIR, "llama-site-empty.xml") + # Start with a regular admission config with multiple pools and no resource limits. + self._restart_coordinators(num_coordinators=1, + extra_args="-expected_executor_group_sets=root.queue1:2,root.queue2:1 " + "-fair_scheduler_allocation_path %s " + "-llama_site_path %s" % ( + fs_allocation_path, llama_site_path)) + + # Set up a default group and a non-default group. + self._add_executor_group("", min_size=1, num_executors=1) + self._add_executor_group("group", min_size=2, num_executors=2, + resource_pool="root.queue1") + self._wait_for_num_executor_groups(2, only_healthy=True) + + # Create fresh client + self.create_impala_clients() + result = self.execute_query_expect_success(self.client, TEST_QUERY) + # Planner assumes that the query run on the default group. + assert "F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1" in str(result.runtime_profile) + # Expect to run the query on the default group. + assert "Executor Group: default" in str(result.runtime_profile) + + result = self.execute_query_expect_success(self.client, TEST_QUERY, + query_options={'request_pool': 'queue1'}) + # Planner assumes that the query run on root.queue1-group. + assert "F00:PLAN FRAGMENT [RANDOM] hosts=2 instances=2" in str(result.runtime_profile) + # Expect to run the query on root.queue1-group. + assert "Executor Group: root.queue1-group" in str(result.runtime_profile) + + result = self.execute_query_expect_success(self.client, TEST_QUERY, + query_options={'request_pool': 'queue2'}) + # The request pool does not exist, fall back to the default group. + assert "F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1" in str(result.runtime_profile) + assert "Executor Group: default" in str(result.runtime_profile)