Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 72 additions & 43 deletions src/distributed_planner/distributed_physical_optimizer_rule.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use crate::distributed_planner::distributed_config::DistributedConfig;
use crate::distributed_planner::distributed_plan_error::get_distribute_plan_err;
use crate::distributed_planner::task_estimator::TaskEstimator;
use crate::distributed_planner::{
DistributedPlanError, NetworkBoundaryExt, limit_tasks_err, non_distributable_err,
};
use crate::execution_plans::{DistributedExec, NetworkCoalesceExec};
use crate::distributed_planner::{DistributedPlanError, NetworkBoundaryExt, non_distributable_err};
use crate::execution_plans::{DistributedExec, NetworkBroadcastExec, NetworkCoalesceExec};
use crate::stage::Stage;
use crate::{ChannelResolver, NetworkShuffleExec, PartitionIsolatorExec};
use datafusion::common::plan_err;
Expand Down Expand Up @@ -48,7 +46,7 @@ use uuid::Uuid;
///
///
/// 2. Break down the plan into stages
///
///
/// Based on the network boundaries ([NetworkShuffleExec], [NetworkCoalesceExec], ...) placed in
/// the plan by the first step, the plan is divided into stages and tasks are assigned to each
/// stage.
Expand Down Expand Up @@ -408,14 +406,29 @@ fn _distribute_plan_inner(
n_tasks: usize,
) -> Result<Stage, DataFusionError> {
let mut distributed = plan.clone().transform_down(|plan| {
// We cannot break down CollectLeft hash joins into more than 1 task, as these need
// a full materialized build size with all the data in it.
//
// Maybe in the future these can be broadcast joins?
if let Some(node) = plan.as_any().downcast_ref::<HashJoinExec>() {
if n_tasks > 1 && node.mode == PartitionMode::CollectLeft {
return Err(limit_tasks_err(1));
}
// Handle CollectLeft hash joins by injecting NetworkBroadcastExec on the left side
// This allows the small left table to be broadcast to all workers, enabling parallel execution
if let Some(node) = plan.as_any().downcast_ref::<HashJoinExec>()
&& n_tasks > 1 && node.mode == PartitionMode::CollectLeft
{
let broadcast_left = Arc::new(NetworkBroadcastExec::new(
node.left().clone(),
1,
));

// Reconstruct HashJoinExec with broadcast left side
let new_join = HashJoinExec::try_new(
broadcast_left,
node.right().clone(),
node.on().to_vec(),
node.filter().cloned(),
node.join_type(),
node.projection.clone(),
*node.partition_mode(),
node.null_equality(),
)?;

return Ok(Transformed::yes(Arc::new(new_join)));
}

// We cannot distribute [StreamingTableExec] nodes, so abort distribution.
Expand Down Expand Up @@ -733,11 +746,21 @@ mod tests {
})
.await;
assert_snapshot!(plan, @r"
CoalesceBatchesExec: target_batch_size=8192
HashJoinExec: mode=CollectLeft, join_type=Left, on=[(RainToday@1, RainToday@1)], projection=[MinTemp@0, MaxTemp@2]
CoalescePartitionsExec
DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday], file_type=parquet
DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday], file_type=parquet
┌───── DistributedExec ── Tasks: t0:[p0]
│ CoalescePartitionsExec
│ [Stage 2] => NetworkCoalesceExec: output_partitions=3, input_tasks=3
└──────────────────────────────────────────────────
┌───── Stage 2 ── Tasks: t0:[p0] t1:[p1] t2:[p2]
│ CoalesceBatchesExec: target_batch_size=8192
│ HashJoinExec: mode=CollectLeft, join_type=Left, on=[(RainToday@1, RainToday@1)], projection=[MinTemp@0, MaxTemp@2]
│ NetworkBroadcastExec: [Stage 1] (1 tasks)
│ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0]
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday], file_type=parquet
└──────────────────────────────────────────────────
┌───── Stage 1 ── Tasks: t0:[p0]
│ CoalescePartitionsExec
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday], file_type=parquet
└──────────────────────────────────────────────────
");
}

Expand Down Expand Up @@ -773,39 +796,45 @@ mod tests {
assert_snapshot!(plan, @r"
┌───── DistributedExec ── Tasks: t0:[p0]
│ CoalescePartitionsExec
│ CoalesceBatchesExec: target_batch_size=8192
│ HashJoinExec: mode=CollectLeft, join_type=Left, on=[(RainTomorrow@1, RainTomorrow@1)], projection=[MinTemp@0, MaxTemp@2]
│ CoalescePartitionsExec
│ [Stage 2] => NetworkCoalesceExec: output_partitions=8, input_tasks=2
│ ProjectionExec: expr=[avg(weather.MaxTemp)@1 as MaxTemp, RainTomorrow@0 as RainTomorrow]
│ AggregateExec: mode=FinalPartitioned, gby=[RainTomorrow@0 as RainTomorrow], aggr=[avg(weather.MaxTemp)]
│ [Stage 3] => NetworkShuffleExec: output_partitions=4, input_tasks=3
│ [Stage 5] => NetworkCoalesceExec: output_partitions=8, input_tasks=2
└──────────────────────────────────────────────────
┌───── Stage 2 ── Tasks: t0:[p0..p3] t1:[p0..p3]
│ ProjectionExec: expr=[avg(weather.MinTemp)@1 as MinTemp, RainTomorrow@0 as RainTomorrow]
│ AggregateExec: mode=FinalPartitioned, gby=[RainTomorrow@0 as RainTomorrow], aggr=[avg(weather.MinTemp)]
│ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=3
┌───── Stage 5 ── Tasks: t0:[p0..p3] t1:[p4..p7]
│ CoalesceBatchesExec: target_batch_size=8192
│ HashJoinExec: mode=CollectLeft, join_type=Left, on=[(RainTomorrow@1, RainTomorrow@1)], projection=[MinTemp@0, MaxTemp@2]
│ NetworkBroadcastExec: [Stage 3] (1 tasks)
│ ProjectionExec: expr=[avg(weather.MaxTemp)@1 as MaxTemp, RainTomorrow@0 as RainTomorrow]
│ AggregateExec: mode=FinalPartitioned, gby=[RainTomorrow@0 as RainTomorrow], aggr=[avg(weather.MaxTemp)]
│ [Stage 4] => NetworkShuffleExec: output_partitions=4, input_tasks=3
└──────────────────────────────────────────────────
┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7] t2:[p0..p7]
┌───── Stage 3 ── Tasks: t0:[p0]
│ CoalescePartitionsExec
│ [Stage 2] => NetworkCoalesceExec: output_partitions=8, input_tasks=2
└──────────────────────────────────────────────────
┌───── Stage 2 ── Tasks: t0:[p0..p3] t1:[p0..p3]
│ ProjectionExec: expr=[avg(weather.MinTemp)@1 as MinTemp, RainTomorrow@0 as RainTomorrow]
│ AggregateExec: mode=FinalPartitioned, gby=[RainTomorrow@0 as RainTomorrow], aggr=[avg(weather.MinTemp)]
│ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=3
└──────────────────────────────────────────────────
┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7] t2:[p0..p7]
│ CoalesceBatchesExec: target_batch_size=8192
│ RepartitionExec: partitioning=Hash([RainTomorrow@0], 8), input_partitions=4
│ AggregateExec: mode=Partial, gby=[RainTomorrow@1 as RainTomorrow], aggr=[avg(weather.MinTemp)]
│ CoalesceBatchesExec: target_batch_size=8192
│ FilterExec: RainToday@1 = yes, projection=[MinTemp@0, RainTomorrow@2]
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
│ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0]
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday, RainTomorrow], file_type=parquet, predicate=RainToday@1 = yes, pruning_predicate=RainToday_null_count@2 != row_count@3 AND RainToday_min@0 <= yes AND yes <= RainToday_max@1, required_guarantees=[RainToday in (yes)]
└──────────────────────────────────────────────────
┌───── Stage 4 ── Tasks: t0:[p0..p7] t1:[p0..p7] t2:[p0..p7]
│ CoalesceBatchesExec: target_batch_size=8192
│ RepartitionExec: partitioning=Hash([RainTomorrow@0], 8), input_partitions=4
│ AggregateExec: mode=Partial, gby=[RainTomorrow@1 as RainTomorrow], aggr=[avg(weather.MinTemp)]
│ AggregateExec: mode=Partial, gby=[RainTomorrow@1 as RainTomorrow], aggr=[avg(weather.MaxTemp)]
│ CoalesceBatchesExec: target_batch_size=8192
│ FilterExec: RainToday@1 = yes, projection=[MinTemp@0, RainTomorrow@2]
│ FilterExec: RainToday@1 = no, projection=[MaxTemp@0, RainTomorrow@2]
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
│ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0]
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday, RainTomorrow], file_type=parquet, predicate=RainToday@1 = yes, pruning_predicate=RainToday_null_count@2 != row_count@3 AND RainToday_min@0 <= yes AND yes <= RainToday_max@1, required_guarantees=[RainToday in (yes)]
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday, RainTomorrow], file_type=parquet, predicate=RainToday@1 = no, pruning_predicate=RainToday_null_count@2 != row_count@3 AND RainToday_min@0 <= no AND no <= RainToday_max@1, required_guarantees=[RainToday in (no)]
└──────────────────────────────────────────────────
┌───── Stage 3 ── Tasks: t0:[p0..p3] t1:[p0..p3] t2:[p0..p3]
│ CoalesceBatchesExec: target_batch_size=8192
│ RepartitionExec: partitioning=Hash([RainTomorrow@0], 4), input_partitions=4
│ AggregateExec: mode=Partial, gby=[RainTomorrow@1 as RainTomorrow], aggr=[avg(weather.MaxTemp)]
│ CoalesceBatchesExec: target_batch_size=8192
│ FilterExec: RainToday@1 = no, projection=[MaxTemp@0, RainTomorrow@2]
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
│ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0]
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday, RainTomorrow], file_type=parquet, predicate=RainToday@1 = no, pruning_predicate=RainToday_null_count@2 != row_count@3 AND RainToday_min@0 <= no AND no <= RainToday_max@1, required_guarantees=[RainToday in (no)]
└──────────────────────────────────────────────────
");
}

Expand Down
3 changes: 3 additions & 0 deletions src/distributed_planner/network_boundary.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::execution_plans::NetworkBroadcastExec;
use crate::{NetworkCoalesceExec, NetworkShuffleExec, Stage};
use datafusion::common::plan_err;
use datafusion::physical_plan::ExecutionPlan;
Expand Down Expand Up @@ -82,6 +83,8 @@ impl NetworkBoundaryExt for dyn ExecutionPlan {
Some(node)
} else if let Some(node) = self.as_any().downcast_ref::<NetworkCoalesceExec>() {
Some(node)
} else if let Some(node) = self.as_any().downcast_ref::<NetworkBroadcastExec>() {
Some(node)
} else {
None
}
Expand Down
2 changes: 2 additions & 0 deletions src/execution_plans/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
mod common;
mod distributed;
mod metrics;
mod network_broadcast;
mod network_coalesce;
mod network_shuffle;
mod partition_isolator;

pub use distributed::DistributedExec;
pub(crate) use metrics::MetricsWrapperExec;
pub use network_broadcast::{NetworkBroadcastExec, NetworkBroadcastReady};
pub use network_coalesce::{NetworkCoalesceExec, NetworkCoalesceReady};
pub use network_shuffle::{NetworkShuffleExec, NetworkShuffleReadyExec};
pub use partition_isolator::PartitionIsolatorExec;
Loading
Loading