Skip to content

Conversation

@gabrielkerr
Copy link

@gabrielkerr gabrielkerr commented Jan 28, 2026

Summary

This PR allows NetworkCoalesceExec to output multiple consumer tasks. This helps build the foundation for partial aggregation trees: with multi-consumer coalesce, we can introduce intermediate “gather” stages where each task aggregates a subset of upstream tasks, then a final stage merges/aggregates again.

What changed

  • NetworkCoalesceExec now supports task_count > 1
    • Each consumer task reads a contiguous group of upstream tasks.
    • Output partitioning is sized using the maximum group size, and consumer tasks with smaller groups return empty streams for the “extra” partitions.

Note

This PR was larger in scope, adding support for Network Coalesce Trees. Coalesce trees are not that useful on their own, and were removed from this PR. Future work to add coalesce trees in tandem with partial aggregation roll-up. Please see the discussion below for more details.


Previous Summary with Network Coalesce Tree (Outdated)

This PR adds support for hierarchical coalescing across the network by allowing NetworkCoalesceExec to output to multiple consumer tasks and adjusting the distributed planner to optionally insert a coalesce tree (multiple NetworkCoalesceExec layers) when a plan must ultimately coalesce to a single task.

The key semantic is preserved: the final coalesce boundary still ends in one task (matching the intent of CoalescePartitionsExec / SortPreservingMergeExec), but fan-in can be reduced via intermediate coalesce stages.

What changed

  • NetworkCoalesceExec now supports task_count > 1
    • Each consumer task reads a contiguous group of upstream tasks.
    • Output partitioning is sized using the maximum group size, and consumer tasks with smaller groups return empty streams for the “extra” partitions.
  • Distributed planner can insert a “coalesce tree”
    • When enabled, the planner may insert intermediate NetworkCoalesceExec stages to reduce fan-in while still converging to a single final task.
    • The intermediate task count is computed using a log-based fan-in heuristic derived from the worker count (clamped to a maximum of 16).
  • Plan/stage visualization improvements
    • Stage discovery and edge rendering account for coalesce-tree behavior, so display_plan_ascii (and related stage displays) show the expected stage structure.

Why this is useful

  • Lower fan-in cost vs a single wide coalesce
    • A coalesce tree can reduce the bottleneck of having one task directly pull from (N) upstream tasks. In practice this reduces connection pressure and can improve throughput/tail latency by spreading the work across (O(\log N)) coalesce levels instead of one wide fan-in point (workload/network dependent).
  • Hierarchical distributed aggregation
    • Avoids a single “hot” reducer when many upstream tasks must converge to one final aggregation stage.
    • Enables partial merging closer to producers, reducing bytes transferred and improving tail latency.
  • Global sorts / ORDER BY pipelines
    • Sort-preserving merge patterns often require a final single task; intermediate coalescing reduces fan-in and connection pressure before the final merge.
  • Top-K / limit with ordering
    • Intermediate coalesce stages can reduce the number of upstream streams feeding the final merge/limit stage, improving responsiveness and memory behavior.
  • Skewed workloads
    • When one stage produces many tasks (or uneven partitions), hierarchical coalescing reduces single-node bottlenecks and evens out execution.

Configuration

This PR adds two distributed planner configuration options:

  • distributed.coalesce_tree_enabled (bool, default: false)
    • Enables insertion of intermediate NetworkCoalesceExec stages (a coalesce tree) for boundaries that must ultimately coalesce to a single task.
  • distributed.coalesce_tree_min_input_tasks (usize, default: 8)
    • Minimum number of upstream tasks required before the planner will insert a coalesce tree.
    • If the upstream task count is below this threshold, the planner inserts only the final single-task NetworkCoalesceExec.

@gabotechs
Copy link
Collaborator

Nice! will take a look at this one soon, thanks @gabrielkerr!

@gabotechs gabotechs self-requested a review January 28, 2026 11:13
@gabrielkerr
Copy link
Author

gabrielkerr commented Jan 28, 2026

Hmm, I'll fix the failing unit test and clippy. Looks like both failures come from that same spot.

@gabrielkerr gabrielkerr force-pushed the hierarchical-networkcoalesce branch from 0e2d3fd to d906d29 Compare January 28, 2026 15:53
@gabrielkerr
Copy link
Author

Rebased to absorb a changed signature.

Copy link
Collaborator

@gabotechs gabotechs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if this shows some improvement, it's very likely revealing an underlaying problem somewhere else in the stack.

I see that we should not be reducing any congestion in the top level node with this, this should be really just paying the price of extra network hops without really alleviating the amount of data the top level stage needs to pull from the bottom.

For illustrating with an example, we are going from:

                   ┌────────────────────────┐                   
                   │  NetworkCoalesceExec   │                   
                   └──▲────▲────────▲─────▲─┘                   
                      │    │        │     │                     
        ┌───1 byte────┘1 byte      1 byte └─────1 byte──┐       
        │               │               │               │       
        │               │               │               │       
        │               │               │               │       
┌──────────────┐┌──────────────┐┌──────────────┐┌──────────────┐
│   SomeExec   ││   SomeExec   ││   SomeExec   ││   SomeExec   │
└──────────────┘└──────────────┘└──────────────┘└──────────────┘

To:

                     ┌────────────────────────┐                 
                     │  NetworkCoalesceExec   │                 
                     └─────────▲────▲─────────┘                 
                    ┌─2 bytes──┘    └2 bytes──┐                 
                    │                         │                 
       ┌────────────────────────┐┌────────────────────────┐     
       │  NetworkCoalesceExec   ││  NetworkCoalesceExec   │     
       └───────────▲──────▲─────┘└───▲───────▲────────────┘     
        ┌──1 byte──┘   1 byte      1 byte    └───1 byte─┐       
        │               │               │               │       
┌──────────────┐┌──────────────┐┌──────────────┐┌──────────────┐
│   SomeExec   ││   SomeExec   ││   SomeExec   ││   SomeExec   │
└──────────────┘└──────────────┘└──────────────┘└──────────────┘

Which in both cases the amount of data pulled by the top level node is 4 bytes. Pulling data from less machines should not yield any improvement, as the overhead of connecting to more machines is at practical effects null (or should be, if not there's something wrong happening).

Passing these through the remote benchmarks (cluster of 4 c5n.2xlarge machines) I see that running query TPC-H q3 with npm run datafusion-bench -- --dataset tpch_sf100 --queries q3 shows some slight performance regression:

Plan with one hierarchical network coalesce
┌───── DistributedExec ── Tasks: t0:[p0] 
│ SortPreservingMergeExec: [revenue@1 DESC, o_orderdate@2 ASC NULLS LAST]
│   [Stage 6] => NetworkCoalesceExec: output_partitions=32, input_tasks=2
└──────────────────────────────────────────────────
  ┌───── Stage 6 ── Tasks: t0:[p0..p15] t1:[p0..p15] 
  │ [Stage 5] => NetworkCoalesceExec: output_partitions=16, input_tasks=4
  └──────────────────────────────────────────────────
    ┌───── Stage 5 ── Tasks: t0:[p0..p7] t1:[p0..p7] t2:[p0..p7] t3:[p0..p7] 
    │ SortExec: expr=[revenue@1 DESC, o_orderdate@2 ASC NULLS LAST], preserve_partitioning=[true]
    │   ProjectionExec: expr=[l_orderkey@0 as l_orderkey, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@3 as revenue, o_orderdate@1 as o_orderdate, o_shippriority@2 as o_shippriority]
    │     AggregateExec: mode=SinglePartitioned, gby=[l_orderkey@2 as l_orderkey, o_orderdate@0 as o_orderdate, o_shippriority@1 as o_shippriority], aggr=[sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]
    │       HashJoinExec: mode=Partitioned, join_type=Inner, on=[(o_orderkey@0, l_orderkey@0)], projection=[o_orderdate@1, o_shippriority@2, l_orderkey@3, l_extendedprice@4, l_discount@5]
    │         [Stage 3] => NetworkShuffleExec: output_partitions=8, input_tasks=4
    │         [Stage 4] => NetworkShuffleExec: output_partitions=8, input_tasks=4
    └──────────────────────────────────────────────────
      ┌───── Stage 3 ── Tasks: t0:[p0..p31] t1:[p0..p31] t2:[p0..p31] t3:[p0..p31] 
      │ RepartitionExec: partitioning=Hash([o_orderkey@0], 32), input_partitions=8
      │   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c_custkey@0, o_custkey@1)], projection=[o_orderkey@1, o_orderdate@3, o_shippriority@4]
      │     [Stage 1] => NetworkShuffleExec: output_partitions=8, input_tasks=4
      │     [Stage 2] => NetworkShuffleExec: output_partitions=8, input_tasks=4
      └──────────────────────────────────────────────────
        ┌───── Stage 1 ── Tasks: t0:[p0..p31] t1:[p0..p31] t2:[p0..p31] t3:[p0..p31] 
        │ RepartitionExec: partitioning=Hash([c_custkey@0], 32), input_partitions=8
        │   FilterExec: c_mktsegment@1 = BUILDING, projection=[c_custkey@0]
        │     PartitionIsolatorExec: t0:[p0,p1,p2,p3,p4,p5,p6,p7,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__] t1:[__,__,__,__,__,__,__,__,p0,p1,p2,p3,p4,p5,p6,p7,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__] t2:[__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,p0,p1,p2,p3,p4,p5,p6,p7,__,__,__,__,__,__,__,__] t3:[__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,p0,p1,p2,p3,p4,p5,p6,p7] 
        │       DataSourceExec: file_groups={32 groups: [[tpch_sf100/customer/part-0.parquet:0..7272764, tpch_sf100/customer/part-1.parquet:0..7229287, tpch_sf100/customer/part-10.parquet:0..7188139, tpch_sf100/customer/part-100.parquet:0..7147612, tpch_sf100/customer/part-101.parquet:0..7139423], [tpch_sf100/customer/part-102.parquet:0..7146767, tpch_sf100/customer/part-103.parquet:0..7152822, tpch_sf100/customer/part-104.parquet:0..7142448, tpch_sf100/customer/part-105.parquet:0..7150181, tpch_sf100/customer/part-106.parquet:0..7151090], [tpch_sf100/customer/part-107.parquet:0..7139082, tpch_sf100/customer/part-108.parquet:0..7148213, tpch_sf100/customer/part-109.parquet:0..7155975, tpch_sf100/customer/part-11.parquet:0..7182400, tpch_sf100/customer/part-110.parquet:0..7143359], [tpch_sf100/customer/part-111.parquet:0..7146719, tpch_sf100/customer/part-112.parquet:0..222792], [tpch_sf100/customer/part-112.parquet:222792..7148154, tpch_sf100/customer/part-113.parquet:0..7140622, tpch_sf100/customer/part-114.parquet:0..7149552, tpch_sf100/customer/part-115.parquet:0..7154248, tpch_sf100/customer/part-116.parquet:0..7144685], ...]}, projection=[c_custkey, c_mktsegment], file_type=parquet, predicate=c_mktsegment@6 = BUILDING, pruning_predicate=c_mktsegment_null_count@2 != row_count@3 AND c_mktsegment_min@0 <= BUILDING AND BUILDING <= c_mktsegment_max@1, required_guarantees=[c_mktsegment in (BUILDING)]
        └──────────────────────────────────────────────────
        ┌───── Stage 2 ── Tasks: t0:[p0..p31] t1:[p0..p31] t2:[p0..p31] t3:[p0..p31] 
        │ RepartitionExec: partitioning=Hash([o_custkey@1], 32), input_partitions=8
        │   FilterExec: o_orderdate@2 < 1995-03-15
        │     PartitionIsolatorExec: t0:[p0,p1,p2,p3,p4,p5,p6,p7,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__] t1:[__,__,__,__,__,__,__,__,p0,p1,p2,p3,p4,p5,p6,p7,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__] t2:[__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,p0,p1,p2,p3,p4,p5,p6,p7,__,__,__,__,__,__,__,__] t3:[__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,p0,p1,p2,p3,p4,p5,p6,p7] 
        │       DataSourceExec: file_groups={32 groups: [[tpch_sf100/orders/part-0.parquet:0..34517849, tpch_sf100/orders/part-1.parquet:0..34457246, tpch_sf100/orders/part-10.parquet:0..34165236, tpch_sf100/orders/part-100.parquet:0..33839600], [tpch_sf100/orders/part-101.parquet:0..33858988, tpch_sf100/orders/part-102.parquet:0..33852274, tpch_sf100/orders/part-103.parquet:0..33832365, tpch_sf100/orders/part-104.parquet:0..33829587], [tpch_sf100/orders/part-105.parquet:0..33848041, tpch_sf100/orders/part-106.parquet:0..33844914, tpch_sf100/orders/part-107.parquet:0..33845152, tpch_sf100/orders/part-108.parquet:0..33855503], [tpch_sf100/orders/part-109.parquet:0..33826354, tpch_sf100/orders/part-11.parquet:0..34176019, tpch_sf100/orders/part-110.parquet:0..33844991, tpch_sf100/orders/part-111.parquet:0..32890242], [tpch_sf100/orders/part-111.parquet:32890242..33844640, tpch_sf100/orders/part-112.parquet:0..33854969, tpch_sf100/orders/part-113.parquet:0..33850398, tpch_sf100/orders/part-114.parquet:0..33841666, tpch_sf100/orders/part-115.parquet:0..33857482], ...]}, projection=[o_orderkey, o_custkey, o_orderdate, o_shippriority], file_type=parquet, predicate=o_orderdate@4 < 1995-03-15 AND DynamicFilter [ empty ], pruning_predicate=o_orderdate_null_count@1 != row_count@2 AND o_orderdate_min@0 < 1995-03-15, required_guarantees=[]
        └──────────────────────────────────────────────────
      ┌───── Stage 4 ── Tasks: t0:[p0..p31] t1:[p0..p31] t2:[p0..p31] t3:[p0..p31] 
      │ RepartitionExec: partitioning=Hash([l_orderkey@0], 32), input_partitions=8
      │   FilterExec: l_shipdate@3 > 1995-03-15, projection=[l_orderkey@0, l_extendedprice@1, l_discount@2]
      │     PartitionIsolatorExec: t0:[p0,p1,p2,p3,p4,p5,p6,p7,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__] t1:[__,__,__,__,__,__,__,__,p0,p1,p2,p3,p4,p5,p6,p7,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__] t2:[__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,p0,p1,p2,p3,p4,p5,p6,p7,__,__,__,__,__,__,__,__] t3:[__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,p0,p1,p2,p3,p4,p5,p6,p7] 
      │       DataSourceExec: file_groups={32 groups: [[tpch_sf100/lineitem/part-0.parquet:0..137644780, tpch_sf100/lineitem/part-1.parquet:0..137399890, tpch_sf100/lineitem/part-10.parquet:0..136435572, tpch_sf100/lineitem/part-100.parquet:0..135375583], [tpch_sf100/lineitem/part-101.parquet:0..135381657, tpch_sf100/lineitem/part-102.parquet:0..135400866, tpch_sf100/lineitem/part-103.parquet:0..135376406, tpch_sf100/lineitem/part-104.parquet:0..135402800], [tpch_sf100/lineitem/part-105.parquet:0..135395196, tpch_sf100/lineitem/part-106.parquet:0..135405877, tpch_sf100/lineitem/part-107.parquet:0..135388252, tpch_sf100/lineitem/part-108.parquet:0..135393161], [tpch_sf100/lineitem/part-109.parquet:0..135413301, tpch_sf100/lineitem/part-11.parquet:0..136391603, tpch_sf100/lineitem/part-110.parquet:0..135371137, tpch_sf100/lineitem/part-111.parquet:0..132132026], [tpch_sf100/lineitem/part-111.parquet:132132026..135361309, tpch_sf100/lineitem/part-112.parquet:0..135447893, tpch_sf100/lineitem/part-113.parquet:0..135411017, tpch_sf100/lineitem/part-114.parquet:0..135377126, tpch_sf100/lineitem/part-115.parquet:0..135387988], ...]}, projection=[l_orderkey, l_extendedprice, l_discount, l_shipdate], file_type=parquet, predicate=l_shipdate@10 > 1995-03-15 AND DynamicFilter [ empty ], pruning_predicate=l_shipdate_null_count@1 != row_count@2 AND l_shipdate_max@0 > 1995-03-15, required_guarantees=[]
      └──────────────────────────────────────────────────

Took 8398 ms on average

Plan with code from main
┌───── DistributedExec ── Tasks: t0:[p0] 
│ SortPreservingMergeExec: [revenue@1 DESC, o_orderdate@2 ASC NULLS LAST]
│   [Stage 5] => NetworkCoalesceExec: output_partitions=32, input_tasks=4
└──────────────────────────────────────────────────
  ┌───── Stage 5 ── Tasks: t0:[p0..p7] t1:[p0..p7] t2:[p0..p7] t3:[p0..p7] 
  │ SortExec: expr=[revenue@1 DESC, o_orderdate@2 ASC NULLS LAST], preserve_partitioning=[true]
  │   ProjectionExec: expr=[l_orderkey@0 as l_orderkey, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@3 as revenue, o_orderdate@1 as o_orderdate, o_shippriority@2 as o_shippriority]
  │     AggregateExec: mode=SinglePartitioned, gby=[l_orderkey@2 as l_orderkey, o_orderdate@0 as o_orderdate, o_shippriority@1 as o_shippriority], aggr=[sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]
  │       HashJoinExec: mode=Partitioned, join_type=Inner, on=[(o_orderkey@0, l_orderkey@0)], projection=[o_orderdate@1, o_shippriority@2, l_orderkey@3, l_extendedprice@4, l_discount@5]
  │         [Stage 3] => NetworkShuffleExec: output_partitions=8, input_tasks=4
  │         [Stage 4] => NetworkShuffleExec: output_partitions=8, input_tasks=4
  └──────────────────────────────────────────────────
    ┌───── Stage 3 ── Tasks: t0:[p0..p31] t1:[p0..p31] t2:[p0..p31] t3:[p0..p31] 
    │ RepartitionExec: partitioning=Hash([o_orderkey@0], 32), input_partitions=8
    │   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c_custkey@0, o_custkey@1)], projection=[o_orderkey@1, o_orderdate@3, o_shippriority@4]
    │     [Stage 1] => NetworkShuffleExec: output_partitions=8, input_tasks=4
    │     [Stage 2] => NetworkShuffleExec: output_partitions=8, input_tasks=4
    └──────────────────────────────────────────────────
      ┌───── Stage 1 ── Tasks: t0:[p0..p31] t1:[p0..p31] t2:[p0..p31] t3:[p0..p31] 
      │ RepartitionExec: partitioning=Hash([c_custkey@0], 32), input_partitions=8
      │   FilterExec: c_mktsegment@1 = BUILDING, projection=[c_custkey@0]
      │     PartitionIsolatorExec: t0:[p0,p1,p2,p3,p4,p5,p6,p7,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__] t1:[__,__,__,__,__,__,__,__,p0,p1,p2,p3,p4,p5,p6,p7,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__] t2:[__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,p0,p1,p2,p3,p4,p5,p6,p7,__,__,__,__,__,__,__,__] t3:[__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,p0,p1,p2,p3,p4,p5,p6,p7] 
      │       DataSourceExec: file_groups={32 groups: [[tpch_sf100/customer/part-0.parquet:0..7272764, tpch_sf100/customer/part-1.parquet:0..7229287, tpch_sf100/customer/part-10.parquet:0..7188139, tpch_sf100/customer/part-100.parquet:0..7147612, tpch_sf100/customer/part-101.parquet:0..7139423], [tpch_sf100/customer/part-102.parquet:0..7146767, tpch_sf100/customer/part-103.parquet:0..7152822, tpch_sf100/customer/part-104.parquet:0..7142448, tpch_sf100/customer/part-105.parquet:0..7150181, tpch_sf100/customer/part-106.parquet:0..7151090], [tpch_sf100/customer/part-107.parquet:0..7139082, tpch_sf100/customer/part-108.parquet:0..7148213, tpch_sf100/customer/part-109.parquet:0..7155975, tpch_sf100/customer/part-11.parquet:0..7182400, tpch_sf100/customer/part-110.parquet:0..7143359], [tpch_sf100/customer/part-111.parquet:0..7146719, tpch_sf100/customer/part-112.parquet:0..222792], [tpch_sf100/customer/part-112.parquet:222792..7148154, tpch_sf100/customer/part-113.parquet:0..7140622, tpch_sf100/customer/part-114.parquet:0..7149552, tpch_sf100/customer/part-115.parquet:0..7154248, tpch_sf100/customer/part-116.parquet:0..7144685], ...]}, projection=[c_custkey, c_mktsegment], file_type=parquet, predicate=c_mktsegment@6 = BUILDING, pruning_predicate=c_mktsegment_null_count@2 != row_count@3 AND c_mktsegment_min@0 <= BUILDING AND BUILDING <= c_mktsegment_max@1, required_guarantees=[c_mktsegment in (BUILDING)]
      └──────────────────────────────────────────────────
      ┌───── Stage 2 ── Tasks: t0:[p0..p31] t1:[p0..p31] t2:[p0..p31] t3:[p0..p31] 
      │ RepartitionExec: partitioning=Hash([o_custkey@1], 32), input_partitions=8
      │   FilterExec: o_orderdate@2 < 1995-03-15
      │     PartitionIsolatorExec: t0:[p0,p1,p2,p3,p4,p5,p6,p7,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__] t1:[__,__,__,__,__,__,__,__,p0,p1,p2,p3,p4,p5,p6,p7,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__] t2:[__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,p0,p1,p2,p3,p4,p5,p6,p7,__,__,__,__,__,__,__,__] t3:[__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,p0,p1,p2,p3,p4,p5,p6,p7] 
      │       DataSourceExec: file_groups={32 groups: [[tpch_sf100/orders/part-0.parquet:0..34517849, tpch_sf100/orders/part-1.parquet:0..34457246, tpch_sf100/orders/part-10.parquet:0..34165236, tpch_sf100/orders/part-100.parquet:0..33839600], [tpch_sf100/orders/part-101.parquet:0..33858988, tpch_sf100/orders/part-102.parquet:0..33852274, tpch_sf100/orders/part-103.parquet:0..33832365, tpch_sf100/orders/part-104.parquet:0..33829587], [tpch_sf100/orders/part-105.parquet:0..33848041, tpch_sf100/orders/part-106.parquet:0..33844914, tpch_sf100/orders/part-107.parquet:0..33845152, tpch_sf100/orders/part-108.parquet:0..33855503], [tpch_sf100/orders/part-109.parquet:0..33826354, tpch_sf100/orders/part-11.parquet:0..34176019, tpch_sf100/orders/part-110.parquet:0..33844991, tpch_sf100/orders/part-111.parquet:0..32890242], [tpch_sf100/orders/part-111.parquet:32890242..33844640, tpch_sf100/orders/part-112.parquet:0..33854969, tpch_sf100/orders/part-113.parquet:0..33850398, tpch_sf100/orders/part-114.parquet:0..33841666, tpch_sf100/orders/part-115.parquet:0..33857482], ...]}, projection=[o_orderkey, o_custkey, o_orderdate, o_shippriority], file_type=parquet, predicate=o_orderdate@4 < 1995-03-15 AND DynamicFilter [ empty ], pruning_predicate=o_orderdate_null_count@1 != row_count@2 AND o_orderdate_min@0 < 1995-03-15, required_guarantees=[]
      └──────────────────────────────────────────────────
    ┌───── Stage 4 ── Tasks: t0:[p0..p31] t1:[p0..p31] t2:[p0..p31] t3:[p0..p31] 
    │ RepartitionExec: partitioning=Hash([l_orderkey@0], 32), input_partitions=8
    │   FilterExec: l_shipdate@3 > 1995-03-15, projection=[l_orderkey@0, l_extendedprice@1, l_discount@2]
    │     PartitionIsolatorExec: t0:[p0,p1,p2,p3,p4,p5,p6,p7,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__] t1:[__,__,__,__,__,__,__,__,p0,p1,p2,p3,p4,p5,p6,p7,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__] t2:[__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,p0,p1,p2,p3,p4,p5,p6,p7,__,__,__,__,__,__,__,__] t3:[__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,p0,p1,p2,p3,p4,p5,p6,p7] 
    │       DataSourceExec: file_groups={32 groups: [[tpch_sf100/lineitem/part-0.parquet:0..137644780, tpch_sf100/lineitem/part-1.parquet:0..137399890, tpch_sf100/lineitem/part-10.parquet:0..136435572, tpch_sf100/lineitem/part-100.parquet:0..135375583], [tpch_sf100/lineitem/part-101.parquet:0..135381657, tpch_sf100/lineitem/part-102.parquet:0..135400866, tpch_sf100/lineitem/part-103.parquet:0..135376406, tpch_sf100/lineitem/part-104.parquet:0..135402800], [tpch_sf100/lineitem/part-105.parquet:0..135395196, tpch_sf100/lineitem/part-106.parquet:0..135405877, tpch_sf100/lineitem/part-107.parquet:0..135388252, tpch_sf100/lineitem/part-108.parquet:0..135393161], [tpch_sf100/lineitem/part-109.parquet:0..135413301, tpch_sf100/lineitem/part-11.parquet:0..136391603, tpch_sf100/lineitem/part-110.parquet:0..135371137, tpch_sf100/lineitem/part-111.parquet:0..132132026], [tpch_sf100/lineitem/part-111.parquet:132132026..135361309, tpch_sf100/lineitem/part-112.parquet:0..135447893, tpch_sf100/lineitem/part-113.parquet:0..135411017, tpch_sf100/lineitem/part-114.parquet:0..135377126, tpch_sf100/lineitem/part-115.parquet:0..135387988], ...]}, projection=[l_orderkey, l_extendedprice, l_discount, l_shipdate], file_type=parquet, predicate=l_shipdate@10 > 1995-03-15 AND DynamicFilter [ empty ], pruning_predicate=l_shipdate_null_count@1 != row_count@2 AND l_shipdate_max@0 > 1995-03-15, required_guarantees=[]
    └──────────────────────────────────────────────────

Took 7746 ms on average.


If you see this actually speeding up queries, It's very likely that there is hidden deeper problem that it might be getting covered or mitigated by this, I think it would be worth exploring that.

If we wanted to move forward with this change, we would need to prove with the public benchmarks in this repository that it's actually yielding latency improvements, otherwise we might just be shipping something that covers a real problem that is happening because a yet unknown reason.

Comment on lines 200 to 205
│ [Stage 3] => NetworkCoalesceExec: output_partitions=48, input_tasks=4
└──────────────────────────────────────────────────
┌───── Stage 3 ── Tasks: t0:[p0..p11] t1:[p0..p11] t2:[p0..p11] t3:[p0..p11]
│ [Stage 2] => NetworkCoalesceExec: output_partitions=12, input_tasks=16
└──────────────────────────────────────────────────
┌───── Stage 2 ── Tasks: t0:[p0..p2] t1:[p0..p2] t2:[p0..p2] t3:[p0..p2] t4:[p0..p2] t5:[p0..p2] t6:[p0..p2] t7:[p0..p2] t8:[p0..p2] t9:[p0..p2] t10:[p0..p2] t11:[p0..p2] t12:[p0..p2] t13:[p0..p2] t14:[p0..p2] t15:[p0..p2]
Copy link
Collaborator

@gabotechs gabotechs Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting 🧐

In what situations do you think this setup can beneficial? At first sight, it looks like it's adding some extra network hops that pay the price of network bandwidth + Arrow IPC serialization/deserialization without actually contributing any compute power to the query itself.

Looking at the reasons why you think this can be useful:

Lower fan-in cost vs a single wide coalesce

Looking at the top most NetworkCoalesceExec, I see we would be pulling the same amount of bytes from the stage below as before: instead of pulling B bytes from T tasks, we would be pulling B * N bytes from T / N tasks. This implies pulling more data from less machines, which should not be yielding any improvement, as we don't pay an extra price for connecting to more machines.

Keep in mind that issuing a gRPC request to a worker does not imply establishing any new TCP connection, all the workers are constantly connected to each other, and connections are reused when it makes sense. This is fundamental to how gRPC works.

Hierarchical distributed aggregation

I think this is the main purpose for apache/datafusion#20019 upstream, which seems correlated with this point. I do imagine how engines that rely on intermediate result materialization could see some improvement from this pattern, as in those engines the alternative (shuffling) is very expensive.

However, I'm not so sure that this would yield good improvements in a zero-copy streaming distributed engine. The fact that other streaming based engines (Trino) do not have this, and the engines that rely on materialization (Spark) do, leads me to think that this assumption might be right, although more than happy to be proven wrong with some benchmarks.

Global sorts / ORDER BY pipelines

I think I'm still missing the part in how this fan-in pattern reduces connection pressure. The network overhead comes from:

  1. bandwidth occupied over the wire by the Arrow data in IPC format
  2. serialization/deserialization overhead from IPC to RecordBatch and back

I'm missing the part in how adding these network hopes would help reduce those two on the top most single task stage.

Top-K / limit with ordering

Network boundaries buffer data by default, so the more network boundaries a query has, the more chances are given for data to get buffered, which might lead to extra data accumulation before the Top-K cancellation kicks in. My bet is that introducing more network boundaries should be making this slightly worse actually.

Skewed workloads

I imagine that for accounting for data skews, some redistribution of the work load would need to happen right? however, NetworkCoalesceExec does not perform any redistribution, as it just passes through data. 🤔 or could I be missing something here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the in depth review! You make some great points. Allow me to investigate further to see if I can find some concrete evidence and improve on the benchmarks.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's one thing that comes to mind:

What about shipping just the changes to the NetworkCoalesceExec node for now? I do see value in them not only for this, but if at some point we want to ship something based on apache/datafusion#20019.

I imagine that having the NetworkCoalesceExec changes merged should allow you to more easily experiment and benchmark, and in the worst case scenario, you can still use it for having your own optimizer rule.

what do you think?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, I just merged apache/datafusion#20019 upstream. Regardless of my speculations, I'd be really curious to try out tree aggregations in this project.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gabotechs thanks so much for your engagement with this PR (and for merging the upstream change in DF).
I work on the same project as @gabrielkerr and I'll try to provide some context, but I'll defer to him for specific decisions on the PR.

I think you have a great intuition on where the real value is. Taken in isolation and applied on top of a shuffle that already performed the expensive work, there is little benefit we can add with the "coalesce tree".
The target use case for this is to be combined with a tree of partial aggregations that distribute the compute for successive partial aggregations in a tree of reducers.

In our current system we have reports that are commonly fetching data from tens or hundreds of leaf executor nodes after which all we need to do is to repeatedly merge/rollup partial aggregations. There are a few reasons for which we prefer this physical plan to a NetworkShuffle:

  • in one of the biggest data sources we already shard data by visitor on ingestion, so it's safe to perform rollups (e.g. count distinct users that did X and group by Y) without shuffling by visitor
  • another common report type is a typical time series where we aggregate by a time dimension (e.g. hour) and some low-to-mid cardinality metric (e.g. product category)

In both situations the partial and final results always fit on a single node, BUT the same keys will appear repeatedly on all the input partitions and the cardinality will typically be in the thousands-tens of thousands. Doing partial aggregations/rollups as I brought up here is a core part of the current system.

I think our challenge is to describe the benefits using what is currently possible in terms of customizing the distributed planning logic. The ideal state is one in which we add basic infrastructure (like you suggest here, the changes to NetworkCoalesce) + extensibility with custom distributed planning rules in which we can generate a tree of partial aggregations instead of shuffles + tree of coalesce (not that useful).

What about shipping just the changes to the NetworkCoalesceExec node for now? I do see value in them not only for this, but if at some point we want to ship something based on apache/datafusion#20019.
I imagine that having the NetworkCoalesceExec changes merged should allow you to more easily experiment and benchmark, and in the worst case scenario, you can still use it for having your own optimizer rule.

To summarize - I think your proposal sounds great, but I will wait for @gabrielkerr to acknowledge the feedback and incorporate the upstream DF changes in another revision of this PR.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @aditanase! This context is much appreciated.

@gabotechs I think merging the changes to NetworkCoalesceExec to start is a great idea.

@aditanase and I are very excited to experiment with the new tree-reduce aggregation that was just merged. I believe this will help us get closer to the functionality we need without using the coalesce tree as an awkward stepping stone.

I'll reduce the scope of this PR to remove the coalesce tree and keep the changes to the NetworkCoalesceExec, removing the limitation of a single output task.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good! thank you both

src/stage.rs Outdated
let max_child_tasks_per_consumer = child_task_count.div_ceil(consumer_tasks).max(1);
let child_partitions_per_task = output_partitions / max_child_tasks_per_consumer;

// Determine which input-task group this consumer task reads.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if we can encapsulate some of this logic in the TaskGroup struct (same comment for execute from above) or at least some function that we can test in isolation?

gkerr_adobe added 3 commits January 30, 2026 10:44
Drop coalesce-tree planner/config and demo-test changes so the branch only retains the minimum behavior needed for NetworkCoalesceExec to support multiple output tasks.
@gabrielkerr gabrielkerr force-pushed the hierarchical-networkcoalesce branch from 74ac092 to 9b8090c Compare January 30, 2026 20:56
@gabrielkerr gabrielkerr changed the title feat: Add hierarchical network coalesce support. feat: Support multiple output tasks from NetworkCoalesceExec Jan 30, 2026
/// tasks from Stage N.
/// - Output partitioning for Stage N+1 is sized based on the maximum upstream-group size. When
/// groups are uneven, consumer tasks with smaller groups return empty streams for the “extra”
/// partitions.
Copy link

@aditanase aditanase Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a simple example like we had before ((1,2,3,4,5,6,7,8,9) = (1,2,3)+(4,5,6)+(7,8,9)) to help visualize this split. E.g. (1,2,3,4,5) + (6,7,8,9,0) = (1,2,3)+(4,5,6)+(7,8,9) or something that respects your algorithm from above.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a simple example with a diagram. Although, adding the diagram was not as simple as I hoped!

Copy link

@aditanase aditanase left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 with minor cosmetic comments

.max(1),
)
.unwrap_or(0);
if partitions_per_task == 0 {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cosmetic: rewrite using if let Some(partitions_per_task) { ... } else { return exec_err... } instead of using 0 as placehoder for error?

}

#[cfg(test)]
mod tests {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to add a test that verifies we're not breaking anything with the default value of 1 for consumer task count? Or is that well taken care of by existing tests?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is well take care of by existing tests. Multiple tests assert plans that include NetworkCoalesceExec in the common situations (e.g. ORDER BY), and those stages are forced to 1 task by the annotator.

Comment on lines 21 to 25
/// [ExecutionPlan] that coalesces partitions from multiple tasks into a single task without
#[derive(Debug, Clone, Copy)]
struct TaskGroup {
/// The first input task index in this group.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to put this new structures at the bottom of this file, right above the tests. That way, when sharing this file via GitHub link, the first thing that pops up is the documentation of the execution node.

One rule of thumb we usually follow is: the things that are most likely to be of interest of readers go at the top, and the things that are less likely to be read, go at the bottom.

Copy link
Author

@gabrielkerr gabrielkerr Feb 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this! I'll make the update and ensure to follow this convention in the future as well.

let input_task_offset = partition / partitions_per_task;
let target_partition = partition % partitions_per_task;

if input_task_offset >= group.max_len || input_task_offset >= group.len {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 Here, I can imagine how input_task_offset >= group.len can happen, and it should be fine to return an empty string in that case, but input_task_offset >= group.max_len? if that ever happens, isn't that reflecting a logical error in the code?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're correct. I added this to account for "shouldn't happen" scenarios and thought it would be better to return an empty stream instead of erroring out.

As long as upstream logic for partitioning holds this case should never happen. I'll remove the >= group.max_len for now. If you'd prefer, I can add a separate block that checks for this condition and creates a debug log and returns an empty stream. Or we can be more aggressive and throw an exec_err!.

Comment on lines 338 to 343
let padding_partition = group.len * partitions_per_task;
let batches = exec
.execute(padding_partition, ctx.task_ctx())?
.try_collect::<Vec<_>>()
.await?;
assert!(batches.is_empty());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if the tests reflected how different configurations do not result in duplicate or missing data. It would be interesting to test:

  • 1 to N tasks
  • N to 1 tasks
  • N to M where N > M tasks
  • M to N where N > M tasks

For making the test as future proof as possible, it would be nice if it didn't refer to implementation details private to the NetworkCoalesceExec node. For example, the current tests are referring to TaskGroup, but this structure is private and therefore subject to change based on implementation details. I'd not overindex too much in this if you don't find a better way of testing this, but just planting the seed in case you have some ideas.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great feedback. I'm looking into this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added some tests that verify the output partitioning is advertised as expected from NetworkCoalesceExec. I've add some tests that indirectly test the task grouping without using TaskGroup itself. If you'd like more changes to this, please let me know.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants