Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 47 additions & 4 deletions be/src/scheduling/cluster-membership-mgr-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -779,15 +779,58 @@ TEST(ClusterMembershipMgrUnitTest, PopulateExecutorMembershipRequest) {
exec_group2.AddExecutor(MakeBackendDescriptor(1, exec_group2, 1));
exec_group2.AddExecutor(MakeBackendDescriptor(2, exec_group2, 2));
snapshot_ptr->executor_groups.insert({exec_group2.name(), exec_group2});
EXPECT_EQ(snapshot_ptr->executor_groups.size(), 2);
ClusterMembershipMgr::SnapshotPtr ptr = snapshot_ptr;
PopulateExecutorMembershipRequest(ptr, populated_exec_group_sets, update_req);
EXPECT_EQ(update_req.exec_group_sets.size(), 2);
// The executor group that does not match to any executor group sets is not included
// in the update request.
EXPECT_EQ(update_req.exec_group_sets.size(), 1);
EXPECT_EQ(update_req.exec_group_sets[0].curr_num_executors, 1);
EXPECT_EQ(update_req.exec_group_sets[0].expected_num_executors, 2);
EXPECT_EQ(update_req.exec_group_sets[0].exec_group_name_prefix, "foo");
EXPECT_EQ(update_req.exec_group_sets[1].curr_num_executors, 0);
EXPECT_EQ(update_req.exec_group_sets[1].expected_num_executors, 10);
EXPECT_EQ(update_req.exec_group_sets[1].exec_group_name_prefix, "bar");
snapshot_ptr->executor_groups.clear();
}

// Case 2e: Using executor groups, expected_exec_group_sets is non-empty
// and one default executor group has more number of executors.
{
ExecutorGroup exec_group("foo-group1", 1);
exec_group.AddExecutor(MakeBackendDescriptor(1, exec_group, 0));
snapshot_ptr->executor_groups.insert({exec_group.name(), exec_group});
// Adding a default exec group.
ExecutorGroup exec_group2(ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME, 1);
exec_group2.AddExecutor(MakeBackendDescriptor(1, exec_group2, 1));
exec_group2.AddExecutor(MakeBackendDescriptor(2, exec_group2, 2));
snapshot_ptr->executor_groups.insert({exec_group2.name(), exec_group2});
ClusterMembershipMgr::SnapshotPtr ptr = snapshot_ptr;
PopulateExecutorMembershipRequest(ptr, populated_exec_group_sets, update_req);
EXPECT_EQ(update_req.exec_group_sets.size(), 2);
EXPECT_EQ(update_req.exec_group_sets[0].curr_num_executors, 2);
EXPECT_EQ(update_req.exec_group_sets[0].expected_num_executors, 20);
EXPECT_EQ(update_req.exec_group_sets[0].exec_group_name_prefix, "");
EXPECT_EQ(update_req.exec_group_sets[1].curr_num_executors, 1);
EXPECT_EQ(update_req.exec_group_sets[1].expected_num_executors, 2);
EXPECT_EQ(update_req.exec_group_sets[1].exec_group_name_prefix, "foo");
snapshot_ptr->executor_groups.clear();
}

// Case 2f: Using executor groups, expected_exec_group_sets is non-empty
// and no executor group match to the configured executor group sets.
{
ExecutorGroup exec_group("unmatch-group1", 1);
exec_group.AddExecutor(MakeBackendDescriptor(1, exec_group, 0));
snapshot_ptr->executor_groups.insert({exec_group.name(), exec_group});
// Adding a default exec group.
ExecutorGroup exec_group2(ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME, 1);
exec_group2.AddExecutor(MakeBackendDescriptor(1, exec_group2, 1));
exec_group2.AddExecutor(MakeBackendDescriptor(2, exec_group2, 2));
snapshot_ptr->executor_groups.insert({exec_group2.name(), exec_group2});
ClusterMembershipMgr::SnapshotPtr ptr = snapshot_ptr;
PopulateExecutorMembershipRequest(ptr, populated_exec_group_sets, update_req);
EXPECT_EQ(update_req.exec_group_sets.size(), 1);
EXPECT_EQ(update_req.exec_group_sets[0].curr_num_executors, 2);
EXPECT_EQ(update_req.exec_group_sets[0].expected_num_executors, 20);
EXPECT_EQ(update_req.exec_group_sets[0].exec_group_name_prefix, "");
snapshot_ptr->executor_groups.clear();
}
}
Expand Down
83 changes: 45 additions & 38 deletions be/src/scheduling/cluster-membership-mgr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -665,48 +665,55 @@ void PopulateExecutorMembershipRequest(ClusterMembershipMgr::SnapshotPtr& snapsh
exec_group_sets.back().curr_num_executors++;
}
}
}

if (exec_group_sets.empty() && expected_exec_group_sets.empty()) {
// Add a default exec group set if no expected group sets were specified.
exec_group_sets.emplace_back();
exec_group_sets.back().__set_expected_num_executors(FLAGS_num_expected_executors);
} else {
if (expected_exec_group_sets.empty()) {
// Add a default exec group set if no expected group sets were specified.
exec_group_sets.emplace_back();
exec_group_sets.back().__set_expected_num_executors(FLAGS_num_expected_executors);
} else {
exec_group_sets.insert(exec_group_sets.begin(), expected_exec_group_sets.begin(),
expected_exec_group_sets.end());
}
int matching_exec_groups_found = 0;
for (auto& set : exec_group_sets) {
int max_num_executors = -1;
StringPiece prefix(set.exec_group_name_prefix);
DCHECK(!prefix.empty() || exec_group_sets.size() == 1)
<< "An empty group set prefix can only exist if no executor group sets are "
"specified";
for (const auto& it : snapshot->executor_groups) {
StringPiece name(it.first);
if (!prefix.empty() && !name.starts_with(prefix)) continue;
matching_exec_groups_found++;
if (!it.second.IsHealthy()) continue;
int num_executors = it.second.NumExecutors();
if (num_executors > max_num_executors) {
max_num_executors = num_executors;
set.curr_num_executors = num_executors;
}
exec_group_sets.insert(exec_group_sets.end(), expected_exec_group_sets.begin(),
expected_exec_group_sets.end());
}
int matching_exec_groups_found = 0;
for (auto& set : exec_group_sets) {
// Ignore the processed default executor group.
if (set.exec_group_name_prefix.empty() && set.curr_num_executors > 0) continue;
int max_num_executors = -1;
StringPiece prefix(set.exec_group_name_prefix);
for (const auto& it : snapshot->executor_groups) {
if (it.first == ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME) continue;
StringPiece name(it.first);
if (!prefix.empty() && !name.starts_with(prefix)) continue;
matching_exec_groups_found++;
if (!it.second.IsHealthy()) continue;
int num_executors = it.second.NumExecutors();
if (num_executors > max_num_executors) {
max_num_executors = num_executors;
set.curr_num_executors = num_executors;
}
}
if (matching_exec_groups_found != snapshot->executor_groups.size()) {
vector<string> group_sets;
group_sets.reserve(exec_group_sets.size());
for (const auto& set : exec_group_sets) {
group_sets.push_back(set.exec_group_name_prefix);
}
vector<string> group_names;
for (const auto& it : snapshot->executor_groups) {
group_names.push_back(it.first);
}
if (matching_exec_groups_found != snapshot->executor_groups.size()) {
vector<string> group_sets(exec_group_sets.size());
for (const auto& set : exec_group_sets) {
group_sets.push_back(set.exec_group_name_prefix);
}
vector<string> group_names(snapshot->executor_groups.size());
for (const auto& it : snapshot->executor_groups) {
group_names.push_back(it.first);
}
LOG(WARNING) << "Some executor groups either do not match expected group sets or "
"match to more than one set. Expected group sets: " <<
boost::algorithm::join(group_sets, ",") << " Current executor groups: " <<
boost::algorithm::join(group_names, ",");
// Remove executor group sets that do not have any matched executors
for (auto it = exec_group_sets.begin(); it != exec_group_sets.end();) {
if (it->curr_num_executors == 0) {
exec_group_sets.erase(it);
} else {
it++;
}
LOG(WARNING) << "Some executor groups either do not match expected group sets or "
"match to more than one set. Expected group sets: "
<< boost::algorithm::join(group_sets, ",") << " Current executor groups: "
<< boost::algorithm::join(group_names, ",");
}
}
update_req.__set_exec_group_sets(exec_group_sets);
Expand Down
1 change: 1 addition & 0 deletions be/src/scheduling/cluster-membership-mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ class ClusterMembershipMgr {
/// The frontend uses cluster membership information to determine whether it expects the
/// scheduler to assign local or remote reads. It also uses the number of executors to
/// determine the join type (partitioned vs broadcast).
/// Only non-empty executor group sets will be included in the 'update_req'.
void PopulateExecutorMembershipRequest(ClusterMembershipMgr::SnapshotPtr& snapshot,
const std::vector<TExecutorGroupSet>& expected_exec_group_sets,
TUpdateExecutorMembershipRequest& update_req);
Expand Down
42 changes: 29 additions & 13 deletions fe/src/main/java/org/apache/impala/service/Frontend.java
Original file line number Diff line number Diff line change
Expand Up @@ -1888,11 +1888,11 @@ private void markTimelineRetries(int numRetries, String msg, EventSequence timel
*/
public static List<TExecutorGroupSet> setupThresholdsForExecutorGroupSets(
List<TExecutorGroupSet> executorGroupSets, String request_pool,
boolean default_executor_group, boolean test_replan) throws ImpalaException {
boolean default_executor_group_only, boolean test_replan) throws ImpalaException {
RequestPoolService poolService = RequestPoolService.getInstance();

List<TExecutorGroupSet> result = Lists.newArrayList();
if (default_executor_group) {
if (default_executor_group_only) {
TExecutorGroupSet e = executorGroupSets.get(0);
if (e.getCurr_num_executors() == 0) {
// The default group has 0 executors. Return one with the number of default
Expand Down Expand Up @@ -1969,8 +1969,17 @@ public static List<TExecutorGroupSet> setupThresholdsForExecutorGroupSets(
}
if (executorGroupSets.size() > 0 && result.size() == 0
&& StringUtils.isNotEmpty(request_pool)) {
throw new AnalysisException("Request pool: " + request_pool
+ " does not map to any known executor group set.");
// If the default group exists, fall back to the default group.
TExecutorGroupSet e = executorGroupSets.get(0);
if (isDefaultExecGroupSet(e)) {
result.add(new TExecutorGroupSet(e));
result.get(0).setMax_mem_limit(Long.MAX_VALUE);
result.get(0).setNum_cores_per_executor(Integer.MAX_VALUE);
}
else {
throw new AnalysisException("Request pool: " + request_pool
+ " does not map to any known executor group set.");
}
}

// Sort 'executorGroupSets' by
Expand Down Expand Up @@ -2013,6 +2022,10 @@ private static int expectedTotalCores(TExecutorGroupSet execGroupSet) {
expectedNumExecutor(execGroupSet), execGroupSet.getNum_cores_per_executor());
}

private static boolean isDefaultExecGroupSet(TExecutorGroupSet execGroupSet) {
return StringUtils.isEmpty(execGroupSet.getExec_group_name_prefix());
}

private TExecRequest getTExecRequest(PlanCtx planCtx, EventSequence timeline)
throws ImpalaException {
TQueryCtx queryCtx = planCtx.getQueryContext();
Expand All @@ -2031,15 +2044,14 @@ private TExecRequest getTExecRequest(PlanCtx planCtx, EventSequence timeline)
LOG.info("The original executor group sets from executor membership snapshot: "
+ originalExecutorGroupSets);

boolean default_executor_group = false;
boolean default_executor_group_only = false;
if (originalExecutorGroupSets.size() == 1) {
TExecutorGroupSet e = originalExecutorGroupSets.get(0);
default_executor_group = e.getExec_group_name_prefix() == null
|| e.getExec_group_name_prefix().isEmpty();
default_executor_group_only = isDefaultExecGroupSet(e);
}
List<TExecutorGroupSet> executorGroupSetsToUse =
Frontend.setupThresholdsForExecutorGroupSets(originalExecutorGroupSets,
queryOptions.getRequest_pool(), default_executor_group,
queryOptions.getRequest_pool(), default_executor_group_only,
enable_replan
&& (RuntimeEnv.INSTANCE.isTestEnv() || queryOptions.isTest_replan()));

Expand Down Expand Up @@ -2137,7 +2149,8 @@ private TExecRequest getTExecRequest(PlanCtx planCtx, EventSequence timeline)
}

if (notScalable) {
setGroupNamePrefix(default_executor_group, clientSetRequestPool, req, group_set);
setGroupNamePrefix(default_executor_group_only,
clientSetRequestPool, req, group_set);
addInfoString(
groupSetProfile, VERDICT, "Assign to first group because " + reason);
FrontendProfile.getCurrent().addChildrenProfile(groupSetProfile);
Expand Down Expand Up @@ -2194,7 +2207,8 @@ private TExecRequest getTExecRequest(PlanCtx planCtx, EventSequence timeline)

boolean matchFound = false;
if (clientSetRequestPool) {
if (!default_executor_group) {
if (!default_executor_group_only &&
!group_set.getExec_group_name_prefix().isEmpty()) {
Preconditions.checkState(group_set.getExec_group_name_prefix().endsWith(
queryOptions.getRequest_pool()));
}
Expand Down Expand Up @@ -2222,7 +2236,8 @@ private TExecRequest getTExecRequest(PlanCtx planCtx, EventSequence timeline)
FrontendProfile.getCurrent().addChildrenProfile(groupSetProfile);

if (matchFound) {
setGroupNamePrefix(default_executor_group, clientSetRequestPool, req, group_set);
setGroupNamePrefix(default_executor_group_only,
clientSetRequestPool, req, group_set);
break;
}

Expand Down Expand Up @@ -2273,11 +2288,12 @@ private TExecRequest getTExecRequest(PlanCtx planCtx, EventSequence timeline)
return req;
}

private static void setGroupNamePrefix(boolean default_executor_group,
private static void setGroupNamePrefix(boolean default_executor_group_only,
boolean clientSetRequestPool, TExecRequest req, TExecutorGroupSet group_set) {
// Set the group name prefix in both the returned query options and
// the query context for non default group setup.
if (!default_executor_group) {
if (!default_executor_group_only &&
!group_set.getExec_group_name_prefix().isEmpty()) {
String namePrefix = group_set.getExec_group_name_prefix();
req.query_options.setRequest_pool(namePrefix);
req.setRequest_pool_set_by_frontend(!clientSetRequestPool);
Expand Down
44 changes: 44 additions & 0 deletions tests/custom_cluster/test_executor_groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -1542,3 +1542,47 @@ def test_expected_executors_no_healthy_groups(self):
assert "Request Pool: root.large" in profile, profile

self.client.close_query(handle)

@pytest.mark.execute_serially
def test_query_with_default_and_non_default_executor_groups(self):
"""Test query planning and execution when there are default and non-default executor
groups in a cluster."""

# The path to resources directory which contains the admission control config files.
RESOURCES_DIR = os.path.join(os.environ['IMPALA_HOME'], "fe", "src", "test",
"resources")
fs_allocation_path = os.path.join(RESOURCES_DIR, "fair-scheduler-allocation.xml")
llama_site_path = os.path.join(RESOURCES_DIR, "llama-site-empty.xml")
# Start with a regular admission config with multiple pools and no resource limits.
self._restart_coordinators(num_coordinators=1,
extra_args="-expected_executor_group_sets=root.queue1:2,root.queue2:1 "
"-fair_scheduler_allocation_path %s "
"-llama_site_path %s" % (
fs_allocation_path, llama_site_path))

# Set up a default group and a non-default group.
self._add_executor_group("", min_size=1, num_executors=1)
self._add_executor_group("group", min_size=2, num_executors=2,
resource_pool="root.queue1")
self._wait_for_num_executor_groups(2, only_healthy=True)

# Create fresh client
self.create_impala_clients()
result = self.execute_query_expect_success(self.client, TEST_QUERY)
# Planner assumes that the query run on the default group.
assert "F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1" in str(result.runtime_profile)
# Expect to run the query on the default group.
assert "Executor Group: default" in str(result.runtime_profile)

result = self.execute_query_expect_success(self.client, TEST_QUERY,
query_options={'request_pool': 'queue1'})
# Planner assumes that the query run on root.queue1-group.
assert "F00:PLAN FRAGMENT [RANDOM] hosts=2 instances=2" in str(result.runtime_profile)
# Expect to run the query on root.queue1-group.
assert "Executor Group: root.queue1-group" in str(result.runtime_profile)

result = self.execute_query_expect_success(self.client, TEST_QUERY,
query_options={'request_pool': 'queue2'})
# The request pool does not exist, fall back to the default group.
assert "F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1" in str(result.runtime_profile)
assert "Executor Group: default" in str(result.runtime_profile)