Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/distributed_planner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ mod distributed_physical_optimizer_rule;
mod insert_broadcast;
mod network_boundary;
mod plan_annotator;
mod stage_partitioning;
mod task_estimator;

pub(crate) use batch_coalescing_below_network_boundaries::batch_coalescing_below_network_boundaries;
pub use distributed_config::DistributedConfig;
pub use distributed_physical_optimizer_rule::DistributedPhysicalOptimizerRule;
pub use network_boundary::{NetworkBoundary, NetworkBoundaryExt};
pub(crate) use task_estimator::set_distributed_task_estimator;
pub use task_estimator::{TaskCountAnnotation, TaskEstimation, TaskEstimator};
pub use task_estimator::{StagePartitioning, TaskCountAnnotation, TaskEstimation, TaskEstimator};
65 changes: 58 additions & 7 deletions src/distributed_planner/plan_annotator.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
use crate::TaskCountAnnotation::{Desired, Maximum};
use crate::distributed_planner::stage_partitioning::{
stage_partitioning_covers_hash, stage_partitioning_for_plan,
};
use crate::execution_plans::ChildrenIsolatorUnionExec;
use crate::{BroadcastExec, DistributedConfig, TaskCountAnnotation, TaskEstimator};
use crate::{
BroadcastExec, DistributedConfig, StagePartitioning, TaskCountAnnotation, TaskEstimator,
};
use datafusion::common::{DataFusionError, plan_datafusion_err};
use datafusion::config::ConfigOptions;
use datafusion::physical_expr::Partitioning;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::aggregates::AggregateExec;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::execution_plan::CardinalityEffect;
use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode};
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion::physical_plan::union::UnionExec;
use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties};
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

Expand Down Expand Up @@ -52,6 +58,8 @@ pub(super) struct AnnotatedPlan {
// annotation fields
/// How many distributed tasks this plan should run on.
pub(super) task_count: TaskCountAnnotation,
/// Stage-scoped partitioning information for this plan node.
pub(super) stage_partitioning: StagePartitioning,
}

impl Debug for AnnotatedPlan {
Expand Down Expand Up @@ -191,6 +199,7 @@ fn _annotate_plan(
plan_or_nb: PlanOrNetworkBoundary::Plan(plan),
children: Vec::new(),
task_count: estimate.task_count.limit(n_workers),
stage_partitioning: estimate.stage_partitioning,
})
} else {
// We could not determine how many tasks this leaf node should run on, so
Expand All @@ -199,6 +208,7 @@ fn _annotate_plan(
plan_or_nb: PlanOrNetworkBoundary::Plan(plan),
children: Vec::new(),
task_count: Maximum(1),
stage_partitioning: StagePartitioning::Unspecified,
})
};
}
Expand Down Expand Up @@ -240,20 +250,59 @@ fn _annotate_plan(
task_count = task_count.limit(n_workers);

// Wrap the node with a boundary node if the parent marks it.
let child_partitionings = annotated_children
.iter()
.map(|child| child.stage_partitioning.clone())
.collect::<Vec<_>>();
let mut stage_partitioning = stage_partitioning_for_plan(&plan, &child_partitionings)?;
if task_count.as_usize() > 1 && matches!(stage_partitioning, StagePartitioning::Single) {
stage_partitioning = StagePartitioning::Unspecified;
}

let mut annotation = AnnotatedPlan {
plan_or_nb: PlanOrNetworkBoundary::Plan(Arc::clone(&plan)),
children: annotated_children,
task_count: task_count.clone(),
stage_partitioning,
};

// Upon reaching a hash repartition, we need to introduce a shuffle right above it.
if let Some(r_exec) = plan.as_any().downcast_ref::<RepartitionExec>() {
if matches!(r_exec.partitioning(), Partitioning::Hash(_, _)) {
annotation = AnnotatedPlan {
plan_or_nb: PlanOrNetworkBoundary::Shuffle,
children: vec![annotation],
task_count,
};
// Subset satisfaction is safe for most ops (e.g. aggregates) but must be exact for:
// - Partitioned hash joins: both sides must align on full join keys
// - Grouping sets: requires exact hash including __grouping_id
let allow_subset = parent
.map(|plan| {
let is_partitioned_join = plan
.as_any()
.downcast_ref::<HashJoinExec>()
.is_some_and(|join| join.mode == PartitionMode::Partitioned);
let is_grouping_set = plan
.as_any()
.downcast_ref::<AggregateExec>()
.is_some_and(|aggregate| !aggregate.group_expr().is_single());
!is_partitioned_join && !is_grouping_set
})
.unwrap_or(true);
let already_partitioned = stage_partitioning_covers_hash(
&annotation.stage_partitioning,
r_exec.partitioning(),
plan.equivalence_properties(),
allow_subset,
);
if !already_partitioned {
let stage_partitioning = match r_exec.partitioning() {
Partitioning::Hash(keys, _) => StagePartitioning::Hash(keys.clone()),
_ => StagePartitioning::Unspecified,
};
annotation = AnnotatedPlan {
plan_or_nb: PlanOrNetworkBoundary::Shuffle,
children: vec![annotation],
task_count,
stage_partitioning,
};
}
}
} else if let Some(parent) = parent
// If this node is a leaf node, putting a network boundary above is a bit wasteful, so
Expand All @@ -270,12 +319,14 @@ fn _annotate_plan(
plan_or_nb: PlanOrNetworkBoundary::Broadcast,
children: vec![annotation],
task_count,
stage_partitioning: StagePartitioning::Unspecified,
};
} else {
annotation = AnnotatedPlan {
plan_or_nb: PlanOrNetworkBoundary::Coalesce,
children: vec![annotation],
task_count,
stage_partitioning: StagePartitioning::Unspecified,
};
}
}
Expand Down
Loading
Loading