From 8621aac0623aedb8480d9e5f16aff1cf59c8d923 Mon Sep 17 00:00:00 2001 From: zhangyifan27 Date: Fri, 22 Sep 2023 20:48:07 +0800 Subject: [PATCH] IMPALA-12312: Using correct executor group set info for planning Prior to this patch, planner always selects the default group if there is a default group in an impala cluster. When a client sets a non-default request pool, planner still assumes the query run on the default group and calculates the wrong number of nodes and instances. This patch fixes it by including both default and non-default groups in the update message sent from BE to FE, so planner can generate a plan based on correct executor group set info. Besides, if no matched executor group is found, planner falls back to using the default group for planning, which is consistent with BE's behavior in GetExecutorGroupsForQuery. Tests: - Add new test cases to ClusterMembershipMgrUnitTest. - Add e2e test to verify the new behavior of planner. Change-Id: Ia13fb40558441d4dcc0b3e7910d3746bb61e6b80 --- .../scheduling/cluster-membership-mgr-test.cc | 51 +++++++++++- be/src/scheduling/cluster-membership-mgr.cc | 83 ++++++++++--------- be/src/scheduling/cluster-membership-mgr.h | 1 + .../org/apache/impala/service/Frontend.java | 42 +++++++--- tests/custom_cluster/test_executor_groups.py | 44 ++++++++++ 5 files changed, 166 insertions(+), 55 deletions(-) diff --git a/be/src/scheduling/cluster-membership-mgr-test.cc b/be/src/scheduling/cluster-membership-mgr-test.cc index fd86eead4f..2c6dfe19d7 100644 --- a/be/src/scheduling/cluster-membership-mgr-test.cc +++ b/be/src/scheduling/cluster-membership-mgr-test.cc @@ -779,15 +779,58 @@ TEST(ClusterMembershipMgrUnitTest, PopulateExecutorMembershipRequest) { exec_group2.AddExecutor(MakeBackendDescriptor(1, exec_group2, 1)); exec_group2.AddExecutor(MakeBackendDescriptor(2, exec_group2, 2)); snapshot_ptr->executor_groups.insert({exec_group2.name(), exec_group2}); + EXPECT_EQ(snapshot_ptr->executor_groups.size(), 2); ClusterMembershipMgr::SnapshotPtr ptr = snapshot_ptr; PopulateExecutorMembershipRequest(ptr, populated_exec_group_sets, update_req); - EXPECT_EQ(update_req.exec_group_sets.size(), 2); + // The executor group that does not match to any executor group sets is not included + // in the update request. + EXPECT_EQ(update_req.exec_group_sets.size(), 1); EXPECT_EQ(update_req.exec_group_sets[0].curr_num_executors, 1); EXPECT_EQ(update_req.exec_group_sets[0].expected_num_executors, 2); EXPECT_EQ(update_req.exec_group_sets[0].exec_group_name_prefix, "foo"); - EXPECT_EQ(update_req.exec_group_sets[1].curr_num_executors, 0); - EXPECT_EQ(update_req.exec_group_sets[1].expected_num_executors, 10); - EXPECT_EQ(update_req.exec_group_sets[1].exec_group_name_prefix, "bar"); + snapshot_ptr->executor_groups.clear(); + } + + // Case 2e: Using executor groups, expected_exec_group_sets is non-empty + // and one default executor group has more number of executors. + { + ExecutorGroup exec_group("foo-group1", 1); + exec_group.AddExecutor(MakeBackendDescriptor(1, exec_group, 0)); + snapshot_ptr->executor_groups.insert({exec_group.name(), exec_group}); + // Adding a default exec group. + ExecutorGroup exec_group2(ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME, 1); + exec_group2.AddExecutor(MakeBackendDescriptor(1, exec_group2, 1)); + exec_group2.AddExecutor(MakeBackendDescriptor(2, exec_group2, 2)); + snapshot_ptr->executor_groups.insert({exec_group2.name(), exec_group2}); + ClusterMembershipMgr::SnapshotPtr ptr = snapshot_ptr; + PopulateExecutorMembershipRequest(ptr, populated_exec_group_sets, update_req); + EXPECT_EQ(update_req.exec_group_sets.size(), 2); + EXPECT_EQ(update_req.exec_group_sets[0].curr_num_executors, 2); + EXPECT_EQ(update_req.exec_group_sets[0].expected_num_executors, 20); + EXPECT_EQ(update_req.exec_group_sets[0].exec_group_name_prefix, ""); + EXPECT_EQ(update_req.exec_group_sets[1].curr_num_executors, 1); + EXPECT_EQ(update_req.exec_group_sets[1].expected_num_executors, 2); + EXPECT_EQ(update_req.exec_group_sets[1].exec_group_name_prefix, "foo"); + snapshot_ptr->executor_groups.clear(); + } + + // Case 2f: Using executor groups, expected_exec_group_sets is non-empty + // and no executor group match to the configured executor group sets. + { + ExecutorGroup exec_group("unmatch-group1", 1); + exec_group.AddExecutor(MakeBackendDescriptor(1, exec_group, 0)); + snapshot_ptr->executor_groups.insert({exec_group.name(), exec_group}); + // Adding a default exec group. + ExecutorGroup exec_group2(ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME, 1); + exec_group2.AddExecutor(MakeBackendDescriptor(1, exec_group2, 1)); + exec_group2.AddExecutor(MakeBackendDescriptor(2, exec_group2, 2)); + snapshot_ptr->executor_groups.insert({exec_group2.name(), exec_group2}); + ClusterMembershipMgr::SnapshotPtr ptr = snapshot_ptr; + PopulateExecutorMembershipRequest(ptr, populated_exec_group_sets, update_req); + EXPECT_EQ(update_req.exec_group_sets.size(), 1); + EXPECT_EQ(update_req.exec_group_sets[0].curr_num_executors, 2); + EXPECT_EQ(update_req.exec_group_sets[0].expected_num_executors, 20); + EXPECT_EQ(update_req.exec_group_sets[0].exec_group_name_prefix, ""); snapshot_ptr->executor_groups.clear(); } } diff --git a/be/src/scheduling/cluster-membership-mgr.cc b/be/src/scheduling/cluster-membership-mgr.cc index 426931a544..35a14997ac 100644 --- a/be/src/scheduling/cluster-membership-mgr.cc +++ b/be/src/scheduling/cluster-membership-mgr.cc @@ -665,48 +665,55 @@ void PopulateExecutorMembershipRequest(ClusterMembershipMgr::SnapshotPtr& snapsh exec_group_sets.back().curr_num_executors++; } } + } + + if (exec_group_sets.empty() && expected_exec_group_sets.empty()) { + // Add a default exec group set if no expected group sets were specified. + exec_group_sets.emplace_back(); + exec_group_sets.back().__set_expected_num_executors(FLAGS_num_expected_executors); } else { - if (expected_exec_group_sets.empty()) { - // Add a default exec group set if no expected group sets were specified. - exec_group_sets.emplace_back(); - exec_group_sets.back().__set_expected_num_executors(FLAGS_num_expected_executors); - } else { - exec_group_sets.insert(exec_group_sets.begin(), expected_exec_group_sets.begin(), - expected_exec_group_sets.end()); - } - int matching_exec_groups_found = 0; - for (auto& set : exec_group_sets) { - int max_num_executors = -1; - StringPiece prefix(set.exec_group_name_prefix); - DCHECK(!prefix.empty() || exec_group_sets.size() == 1) - << "An empty group set prefix can only exist if no executor group sets are " - "specified"; - for (const auto& it : snapshot->executor_groups) { - StringPiece name(it.first); - if (!prefix.empty() && !name.starts_with(prefix)) continue; - matching_exec_groups_found++; - if (!it.second.IsHealthy()) continue; - int num_executors = it.second.NumExecutors(); - if (num_executors > max_num_executors) { - max_num_executors = num_executors; - set.curr_num_executors = num_executors; - } + exec_group_sets.insert(exec_group_sets.end(), expected_exec_group_sets.begin(), + expected_exec_group_sets.end()); + } + int matching_exec_groups_found = 0; + for (auto& set : exec_group_sets) { + // Ignore the processed default executor group. + if (set.exec_group_name_prefix.empty() && set.curr_num_executors > 0) continue; + int max_num_executors = -1; + StringPiece prefix(set.exec_group_name_prefix); + for (const auto& it : snapshot->executor_groups) { + if (it.first == ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME) continue; + StringPiece name(it.first); + if (!prefix.empty() && !name.starts_with(prefix)) continue; + matching_exec_groups_found++; + if (!it.second.IsHealthy()) continue; + int num_executors = it.second.NumExecutors(); + if (num_executors > max_num_executors) { + max_num_executors = num_executors; + set.curr_num_executors = num_executors; } } - if (matching_exec_groups_found != snapshot->executor_groups.size()) { - vector group_sets; - group_sets.reserve(exec_group_sets.size()); - for (const auto& set : exec_group_sets) { - group_sets.push_back(set.exec_group_name_prefix); - } - vector group_names; - for (const auto& it : snapshot->executor_groups) { - group_names.push_back(it.first); + } + if (matching_exec_groups_found != snapshot->executor_groups.size()) { + vector group_sets(exec_group_sets.size()); + for (const auto& set : exec_group_sets) { + group_sets.push_back(set.exec_group_name_prefix); + } + vector group_names(snapshot->executor_groups.size()); + for (const auto& it : snapshot->executor_groups) { + group_names.push_back(it.first); + } + LOG(WARNING) << "Some executor groups either do not match expected group sets or " + "match to more than one set. Expected group sets: " << + boost::algorithm::join(group_sets, ",") << " Current executor groups: " << + boost::algorithm::join(group_names, ","); + // Remove executor group sets that do not have any matched executors + for (auto it = exec_group_sets.begin(); it != exec_group_sets.end();) { + if (it->curr_num_executors == 0) { + exec_group_sets.erase(it); + } else { + it++; } - LOG(WARNING) << "Some executor groups either do not match expected group sets or " - "match to more than one set. Expected group sets: " - << boost::algorithm::join(group_sets, ",") << " Current executor groups: " - << boost::algorithm::join(group_names, ","); } } update_req.__set_exec_group_sets(exec_group_sets); diff --git a/be/src/scheduling/cluster-membership-mgr.h b/be/src/scheduling/cluster-membership-mgr.h index ff698f1b5a..341abbd28a 100644 --- a/be/src/scheduling/cluster-membership-mgr.h +++ b/be/src/scheduling/cluster-membership-mgr.h @@ -289,6 +289,7 @@ class ClusterMembershipMgr { /// The frontend uses cluster membership information to determine whether it expects the /// scheduler to assign local or remote reads. It also uses the number of executors to /// determine the join type (partitioned vs broadcast). +/// Only non-empty executor group sets will be included in the 'update_req'. void PopulateExecutorMembershipRequest(ClusterMembershipMgr::SnapshotPtr& snapshot, const std::vector& expected_exec_group_sets, TUpdateExecutorMembershipRequest& update_req); diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index 794a65a798..5f1ead248e 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -1888,11 +1888,11 @@ private void markTimelineRetries(int numRetries, String msg, EventSequence timel */ public static List setupThresholdsForExecutorGroupSets( List executorGroupSets, String request_pool, - boolean default_executor_group, boolean test_replan) throws ImpalaException { + boolean default_executor_group_only, boolean test_replan) throws ImpalaException { RequestPoolService poolService = RequestPoolService.getInstance(); List result = Lists.newArrayList(); - if (default_executor_group) { + if (default_executor_group_only) { TExecutorGroupSet e = executorGroupSets.get(0); if (e.getCurr_num_executors() == 0) { // The default group has 0 executors. Return one with the number of default @@ -1969,8 +1969,17 @@ public static List setupThresholdsForExecutorGroupSets( } if (executorGroupSets.size() > 0 && result.size() == 0 && StringUtils.isNotEmpty(request_pool)) { - throw new AnalysisException("Request pool: " + request_pool - + " does not map to any known executor group set."); + // If the default group exists, fall back to the default group. + TExecutorGroupSet e = executorGroupSets.get(0); + if (isDefaultExecGroupSet(e)) { + result.add(new TExecutorGroupSet(e)); + result.get(0).setMax_mem_limit(Long.MAX_VALUE); + result.get(0).setNum_cores_per_executor(Integer.MAX_VALUE); + } + else { + throw new AnalysisException("Request pool: " + request_pool + + " does not map to any known executor group set."); + } } // Sort 'executorGroupSets' by @@ -2013,6 +2022,10 @@ private static int expectedTotalCores(TExecutorGroupSet execGroupSet) { expectedNumExecutor(execGroupSet), execGroupSet.getNum_cores_per_executor()); } + private static boolean isDefaultExecGroupSet(TExecutorGroupSet execGroupSet) { + return StringUtils.isEmpty(execGroupSet.getExec_group_name_prefix()); + } + private TExecRequest getTExecRequest(PlanCtx planCtx, EventSequence timeline) throws ImpalaException { TQueryCtx queryCtx = planCtx.getQueryContext(); @@ -2031,15 +2044,14 @@ private TExecRequest getTExecRequest(PlanCtx planCtx, EventSequence timeline) LOG.info("The original executor group sets from executor membership snapshot: " + originalExecutorGroupSets); - boolean default_executor_group = false; + boolean default_executor_group_only = false; if (originalExecutorGroupSets.size() == 1) { TExecutorGroupSet e = originalExecutorGroupSets.get(0); - default_executor_group = e.getExec_group_name_prefix() == null - || e.getExec_group_name_prefix().isEmpty(); + default_executor_group_only = isDefaultExecGroupSet(e); } List executorGroupSetsToUse = Frontend.setupThresholdsForExecutorGroupSets(originalExecutorGroupSets, - queryOptions.getRequest_pool(), default_executor_group, + queryOptions.getRequest_pool(), default_executor_group_only, enable_replan && (RuntimeEnv.INSTANCE.isTestEnv() || queryOptions.isTest_replan())); @@ -2137,7 +2149,8 @@ private TExecRequest getTExecRequest(PlanCtx planCtx, EventSequence timeline) } if (notScalable) { - setGroupNamePrefix(default_executor_group, clientSetRequestPool, req, group_set); + setGroupNamePrefix(default_executor_group_only, + clientSetRequestPool, req, group_set); addInfoString( groupSetProfile, VERDICT, "Assign to first group because " + reason); FrontendProfile.getCurrent().addChildrenProfile(groupSetProfile); @@ -2194,7 +2207,8 @@ private TExecRequest getTExecRequest(PlanCtx planCtx, EventSequence timeline) boolean matchFound = false; if (clientSetRequestPool) { - if (!default_executor_group) { + if (!default_executor_group_only && + !group_set.getExec_group_name_prefix().isEmpty()) { Preconditions.checkState(group_set.getExec_group_name_prefix().endsWith( queryOptions.getRequest_pool())); } @@ -2222,7 +2236,8 @@ private TExecRequest getTExecRequest(PlanCtx planCtx, EventSequence timeline) FrontendProfile.getCurrent().addChildrenProfile(groupSetProfile); if (matchFound) { - setGroupNamePrefix(default_executor_group, clientSetRequestPool, req, group_set); + setGroupNamePrefix(default_executor_group_only, + clientSetRequestPool, req, group_set); break; } @@ -2273,11 +2288,12 @@ private TExecRequest getTExecRequest(PlanCtx planCtx, EventSequence timeline) return req; } - private static void setGroupNamePrefix(boolean default_executor_group, + private static void setGroupNamePrefix(boolean default_executor_group_only, boolean clientSetRequestPool, TExecRequest req, TExecutorGroupSet group_set) { // Set the group name prefix in both the returned query options and // the query context for non default group setup. - if (!default_executor_group) { + if (!default_executor_group_only && + !group_set.getExec_group_name_prefix().isEmpty()) { String namePrefix = group_set.getExec_group_name_prefix(); req.query_options.setRequest_pool(namePrefix); req.setRequest_pool_set_by_frontend(!clientSetRequestPool); diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py index af94e68f01..d6e5d3f57d 100644 --- a/tests/custom_cluster/test_executor_groups.py +++ b/tests/custom_cluster/test_executor_groups.py @@ -1542,3 +1542,47 @@ def test_expected_executors_no_healthy_groups(self): assert "Request Pool: root.large" in profile, profile self.client.close_query(handle) + + @pytest.mark.execute_serially + def test_query_with_default_and_non_default_executor_groups(self): + """Test query planning and execution when there are default and non-default executor + groups in a cluster.""" + + # The path to resources directory which contains the admission control config files. + RESOURCES_DIR = os.path.join(os.environ['IMPALA_HOME'], "fe", "src", "test", + "resources") + fs_allocation_path = os.path.join(RESOURCES_DIR, "fair-scheduler-allocation.xml") + llama_site_path = os.path.join(RESOURCES_DIR, "llama-site-empty.xml") + # Start with a regular admission config with multiple pools and no resource limits. + self._restart_coordinators(num_coordinators=1, + extra_args="-expected_executor_group_sets=root.queue1:2,root.queue2:1 " + "-fair_scheduler_allocation_path %s " + "-llama_site_path %s" % ( + fs_allocation_path, llama_site_path)) + + # Set up a default group and a non-default group. + self._add_executor_group("", min_size=1, num_executors=1) + self._add_executor_group("group", min_size=2, num_executors=2, + resource_pool="root.queue1") + self._wait_for_num_executor_groups(2, only_healthy=True) + + # Create fresh client + self.create_impala_clients() + result = self.execute_query_expect_success(self.client, TEST_QUERY) + # Planner assumes that the query run on the default group. + assert "F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1" in str(result.runtime_profile) + # Expect to run the query on the default group. + assert "Executor Group: default" in str(result.runtime_profile) + + result = self.execute_query_expect_success(self.client, TEST_QUERY, + query_options={'request_pool': 'queue1'}) + # Planner assumes that the query run on root.queue1-group. + assert "F00:PLAN FRAGMENT [RANDOM] hosts=2 instances=2" in str(result.runtime_profile) + # Expect to run the query on root.queue1-group. + assert "Executor Group: root.queue1-group" in str(result.runtime_profile) + + result = self.execute_query_expect_success(self.client, TEST_QUERY, + query_options={'request_pool': 'queue2'}) + # The request pool does not exist, fall back to the default group. + assert "F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1" in str(result.runtime_profile) + assert "Executor Group: default" in str(result.runtime_profile)