Skip to content
Open
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 161 additions & 16 deletions src/execution_plans/network_coalesce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,60 @@ use datafusion::error::Result;
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::physical_expr_common::metrics::MetricsSet;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, EmptyRecordBatchStream, ExecutionPlan, PlanProperties,
};
use std::any::Any;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use uuid::Uuid;

/// [ExecutionPlan] that coalesces partitions from multiple tasks into a single task without
#[derive(Debug, Clone, Copy)]
struct TaskGroup {
/// The first input task index in this group.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to put this new structures at the bottom of this file, right above the tests. That way, when sharing this file via GitHub link, the first thing that pops up is the documentation of the execution node.

One rule of thumb we usually follow is: the things that are most likely to be of interest of readers go at the top, and the things that are less likely to be read, go at the bottom.

Copy link
Author

@gabrielkerr gabrielkerr Feb 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this! I'll make the update and ensure to follow this convention in the future as well.

start_task: usize,
/// The number of input tasks in this group.
len: usize,
/// The maximum possible group size across all groups.
///
/// When groups are uneven (input_tasks % task_count != 0), some groups are shorter. We still
/// size the output partitioning based on this max and return empty streams for the extra
/// partitions in smaller groups.
max_len: usize,
}

/// Returns the contiguous group of input tasks assigned to DistributedTaskContext::task_index.
fn task_group(input_task_count: usize, task_index: usize, task_count: usize) -> TaskGroup {
if task_count == 0 {
return TaskGroup {
start_task: 0,
len: 0,
max_len: 0,
};
}

// Split `input_task_count` into `task_count` contiguous groups.
// - base_tasks_per_group: floor(input_task_count / task_count)
// - groups_with_extra_task: first N groups that get one extra task (remainder)
let base_tasks_per_group = input_task_count / task_count;
let groups_with_extra_task = input_task_count % task_count;

let len = base_tasks_per_group + usize::from(task_index < groups_with_extra_task);
let start_task = (task_index * base_tasks_per_group) + task_index.min(groups_with_extra_task);
let max_len = base_tasks_per_group + usize::from(groups_with_extra_task > 0);

TaskGroup {
start_task,
len,
max_len,
}
}

/// [ExecutionPlan] that coalesces partitions from multiple tasks into a one or more task without
/// performing any repartition, and maintaining the same partitioning scheme.
///
/// This is the equivalent of a [CoalescePartitionsExec] but coalescing tasks across the network
/// into one.
/// between distributed stages.
///
/// ```text
/// ┌───────────────────────────┐ ■
Expand All @@ -44,9 +87,11 @@ use uuid::Uuid;
///
/// The communication between two stages across a [NetworkCoalesceExec] has two implications:
///
/// - Stage N+1 must have exactly 1 task. The distributed planner ensures this is true.
/// - The number of partitions in the single task of Stage N+1 is equal to the total number of
/// partitions across all tasks in Stage N (e.g. (1,2,3,4,5,6,7,8,9) = (1,2,3)+(4,5,6)+(7,8,9) )
/// - Stage N+1 may have one or more tasks. Each consumer task reads a contiguous group of upstream
/// tasks from Stage N.
/// - Output partitioning for Stage N+1 is sized based on the maximum upstream-group size. When
/// groups are uneven, consumer tasks with smaller groups return empty streams for the “extra”
/// partitions.
Copy link

@aditanase aditanase Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a simple example like we had before ((1,2,3,4,5,6,7,8,9) = (1,2,3)+(4,5,6)+(7,8,9)) to help visualize this split. E.g. (1,2,3,4,5) + (6,7,8,9,0) = (1,2,3)+(4,5,6)+(7,8,9) or something that respects your algorithm from above.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a simple example with a diagram. Although, adding the diagram was not as simple as I hoped!

///
/// This node has two variants.
/// 1. Pending: acts as a placeholder for the distributed optimization step to mark it as ready.
Expand Down Expand Up @@ -83,13 +128,16 @@ impl NetworkCoalesceExec {
task_count: usize,
input_task_count: usize,
) -> Result<Self> {
if task_count > 1 {
return plan_err!(
"NetworkCoalesceExec cannot be executed in more than one task, {task_count} were passed."
);
if task_count == 0 {
return plan_err!("NetworkCoalesceExec cannot be executed with task_count=0");
}

// Each output task coalesces a group of input tasks. We size the output partition count
// per output task based on the maximum group size, returning empty streams for tasks with
// smaller groups.
let max_input_task_count = input_task_count.div_ceil(task_count).max(1);
Ok(Self {
properties: scale_partitioning_props(input.properties(), |p| p * input_task_count),
properties: scale_partitioning_props(input.properties(), |p| p * max_input_task_count),
input_stage: Stage {
query_id,
num,
Expand Down Expand Up @@ -161,16 +209,46 @@ impl ExecutionPlan for NetworkCoalesceExec {
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let task_context = DistributedTaskContext::from_ctx(&context);
if task_context.task_index > 0 {
return exec_err!("NetworkCoalesceExec cannot be executed in more than one task");
if task_context.task_index >= task_context.task_count {
return exec_err!(
"NetworkCoalesceExec invalid task context: task_index={} >= task_count={}",
task_context.task_index,
task_context.task_count
);
}

let partitions_per_task = self
.properties()
.partitioning
.partition_count()
.checked_div(
self.input_stage
.tasks
.len()
.div_ceil(task_context.task_count)
.max(1),
)
.unwrap_or(0);
if partitions_per_task == 0 {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cosmetic: rewrite using if let Some(partitions_per_task) { ... } else { return exec_err... } instead of using 0 as placehoder for error?

return exec_err!("NetworkCoalesceExec has 0 partitions per input task");
}

let partitions_per_task =
self.properties().partitioning.partition_count() / self.input_stage.tasks.len();
let input_task_count = self.input_stage.tasks.len();
let group = task_group(
input_task_count,
task_context.task_index,
task_context.task_count,
);

let target_task = partition / partitions_per_task;
let input_task_offset = partition / partitions_per_task;
let target_partition = partition % partitions_per_task;

if input_task_offset >= group.max_len || input_task_offset >= group.len {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 Here, I can imagine how input_task_offset >= group.len can happen, and it should be fine to return an empty string in that case, but input_task_offset >= group.max_len? if that ever happens, isn't that reflecting a logical error in the code?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're correct. I added this to account for "shouldn't happen" scenarios and thought it would be better to return an empty stream instead of erroring out.

As long as upstream logic for partitioning holds this case should never happen. I'll remove the >= group.max_len for now. If you'd prefer, I can add a separate block that checks for this condition and creates a debug log and returns an empty stream. Or we can be more aggressive and throw an exec_err!.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be aggressive here, and error out with an internal_err! claiming that this is a bug in our code. If this was to happen, I want to think it would be caught by tests, so better to do that rather than returning unexpected results.

return Ok(Box::pin(EmptyRecordBatchStream::new(self.schema())));
}

let target_task = group.start_task + input_task_offset;

let worker_connection = self.worker_connections.get_or_init_worker_connection(
&self.input_stage,
0..partitions_per_task,
Expand Down Expand Up @@ -200,3 +278,70 @@ impl ExecutionPlan for NetworkCoalesceExec {
Some(self.worker_connections.metrics.clone_inner())
}
}

#[cfg(test)]
mod tests {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to add a test that verifies we're not breaking anything with the default value of 1 for consumer task count? Or is that well taken care of by existing tests?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is well take care of by existing tests. Multiple tests assert plans that include NetworkCoalesceExec in the common situations (e.g. ORDER BY), and those stages are forced to 1 task by the annotator.

use super::*;
use datafusion::arrow::datatypes::Schema;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::prelude::SessionContext;
use futures::TryStreamExt;

#[tokio::test]
async fn supports_multiple_output_tasks_with_padding_partitions() -> Result<()> {
const INPUT_TASK_COUNT: usize = 5;
const CONSUMER_TASK_COUNT: usize = 2;
const CONSUMER_TASK_IDX: usize = 1;
const STAGE_NUM: usize = 1;

// Child plan used only for properties/schema (we won't reach network codepaths).
let child: Arc<dyn ExecutionPlan> = Arc::new(EmptyExec::new(Arc::new(Schema::empty())));
let child_partitions = child.properties().partitioning.partition_count();

let exec = NetworkCoalesceExec::try_new(
Arc::clone(&child),
Uuid::nil(),
STAGE_NUM,
CONSUMER_TASK_COUNT,
INPUT_TASK_COUNT,
)?;

// `try_new` should scale output partitions based on the maximum group size.
let max_input_task_count = INPUT_TASK_COUNT.div_ceil(CONSUMER_TASK_COUNT).max(1);
assert_eq!(
exec.properties().partitioning.partition_count(),
child_partitions * max_input_task_count
);

// Configure this test task as consumer task 1/2, which has fewer input tasks (2) than the
// max group size (3) for INPUT_TASK_COUNT=5, CONSUMER_TASK_COUNT=2.
let ctx = SessionContext::new();
ctx.state_ref()
.write()
.config_mut()
.set_extension(Arc::new(DistributedTaskContext {
task_index: CONSUMER_TASK_IDX,
task_count: CONSUMER_TASK_COUNT,
}));

// Pick a "padding" partition (child_task_offset == group.len) that should produce an
// empty stream without attempting any network calls.
let group = task_group(INPUT_TASK_COUNT, CONSUMER_TASK_IDX, CONSUMER_TASK_COUNT);
let partitions_per_task = exec
.properties()
.partitioning
.partition_count()
.checked_div(group.max_len)
.unwrap_or(0);
assert!(partitions_per_task > 0);

let padding_partition = group.len * partitions_per_task;
let batches = exec
.execute(padding_partition, ctx.task_ctx())?
.try_collect::<Vec<_>>()
.await?;
assert!(batches.is_empty());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if the tests reflected how different configurations do not result in duplicate or missing data. It would be interesting to test:

  • 1 to N tasks
  • N to 1 tasks
  • N to M where N > M tasks
  • M to N where N > M tasks

For making the test as future proof as possible, it would be nice if it didn't refer to implementation details private to the NetworkCoalesceExec node. For example, the current tests are referring to TaskGroup, but this structure is private and therefore subject to change based on implementation details. I'd not overindex too much in this if you don't find a better way of testing this, but just planting the seed in case you have some ideas.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great feedback. I'm looking into this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added some tests that verify the output partitioning is advertised as expected from NetworkCoalesceExec. I've add some tests that indirectly test the task grouping without using TaskGroup itself. If you'd like more changes to this, please let me know.


Ok(())
}
}