Skip to content

Conversation

@gabotechs
Copy link
Collaborator

I don't fully understand why this happens, but as seen in #228, deadlocks were the queries hang forever happen very oftenly when dealing with TPCH benchmarks in a real environment (cluster of EC2 machines and parquet files in S3).

By trial an error, I saw that this PR fixes them, but I don't fully understand why, I think what we were doing before should be completely fine.

There's likely something else going on in either the ArrowFlight client or server networking code, something like an async task not awaking when it should, that's causing this.

As this fixes the error, I propose shipping this and keep investigating why this happens.

@geoffreyclaude
Copy link
Collaborator

I threw Claude Opus 4 at it and it came up with a pretty in-depth analysis, copy pasted below. Its TL;DR is that your "trial and error" fix is absolutely correct.

Claude Opus 4 Analysis

Network Shuffle Deadlock Analysis

Summary

This document analyzes a deadlock that occurs when using futures::stream::select_all to consume multiple Arrow Flight gRPC streams that share the same HTTP/2 connection. The fix replaces select_all with a custom spawn_select_all implementation that spawns separate tokio tasks for each stream.


Root Cause: HTTP/2 Connection-Level Flow Control

Background: HTTP/2 Flow Control

HTTP/2 implements flow control at two levels (RFC 9113 §5.2):

  1. Stream-level: Each stream has its own window (default 65,535 bytes)
  2. Connection-level: A shared window across ALL streams on the same TCP connection (default 65,535 bytes)

The h2 crate documentation explicitly states this:

An HTTP/2 client or server may not send unlimited data to the peer. When a stream is initiated, both the client and the server are provided with an initial window size for that stream. [...] There is also a connection level window governing data sent across all streams.

h2 crate documentation

Key Finding: Capacity Release is Explicit

The h2 library requires explicit capacity release via FlowControl::release_capacity(). This is documented in h2's share.rs:

A handle to release window capacity to a remote stream. [...] The caller is expected to call release_capacity after dropping data frames.

Hyper calls release_capacity() only when data is consumed (polled), not when received:

// From hyper-1.7.0/src/body/incoming.rs, lines 243-248
// https://github.com/hyperium/hyper/blob/v1.7.0/src/body/incoming.rs#L243-L248
match ready!(h2.poll_data(cx)) {
    Some(Ok(bytes)) => {
        let _ = h2.flow_control().release_capacity(bytes.len());
        // ...
    }
}

This means WINDOW_UPDATE frames (which tell the server it can send more data) are only sent after the client reads and processes data from a stream.

The Deadlock Mechanism

When multiple Flight streams target the same server, they share a single HTTP/2 connection (due to channel caching in ChannelResolver implementations).

┌─────────────────────────────────────────────────────────────────┐
│                     Single Tokio Task                           │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │              futures::select_all                        │    │
│  │  ┌─────────┐  ┌─────────┐  ┌─────────┐                  │    │
│  │  │ Stream1 │  │ Stream2 │  │ Stream3 │  ... (N streams) │    │
│  │  └────┬────┘  └────┬────┘  └────┬────┘                  │    │
│  └───────┼───────────┼───────────┼─────────────────────────┘    │
│          └───────────┼───────────┘                              │
│                      ▼                                          │
│         ┌────────────────────────┐                              │
│         │  Shared HTTP/2 Conn    │ ← Connection window: 65KB    │
│         │  (to same server)      │   (shared across ALL streams)│
│         └────────────────────────┘                              │
└─────────────────────────────────────────────────────────────────┘

The deadlock occurs through this sequence:

  1. Server sends data on multiple streams → connection-level window fills up (65KB total)
  2. Server blocks ALL streams until client sends WINDOW_UPDATE frames
  3. WINDOW_UPDATE is only sent when the client consumes data (calls poll_data() and processes it)
  4. With select_all, if processing one stream is slow (e.g., Arrow deserialization), it delays polling other streams
  5. Connection window remains saturated → server remains blocked → starvation/deadlock

Why Separate Tasks Fix This

With separate tasks, tokio's scheduler ensures fair CPU time allocation to all stream-polling tasks. No single slow stream can starve others:

┌──────────────┐  ┌──────────────┐  ┌──────────────┐
│   Task 1     │  │   Task 2     │  │   Task 3     │
│  ┌────────┐  │  │  ┌────────┐  │  │  ┌────────┐  │
│  │Stream1 │  │  │  │Stream2 │  │  │  │Stream3 │  │
│  └───┬────┘  │  │  └───┬────┘  │  │  └───┬────┘  │
│      ▼       │  │      ▼       │  │      ▼       │
│  unbounded   │  │  unbounded   │  │  unbounded   │
│   channel    │  │   channel    │  │   channel    │
└──────┬───────┘  └──────┬───────┘  └──────┬───────┘
       └─────────────────┼─────────────────┘
                         ▼
               ┌────────────────────┐
               │  Consumer Stream   │
               └────────────────────┘

Benefits:

  • Each stream is polled independently by the tokio runtime
  • HTTP/2 data is continuously drained, releasing flow control
  • No stream can block another's progress

Why Unbounded Channels Are Necessary

Bounded channels would reintroduce the deadlock:

  1. Producer task blocks on channel.send() when channel is full
  2. While blocked, it stops polling its HTTP/2 stream
  3. No polling = no data consumption = no release_capacity() call
  4. No capacity release = server stays blocked → deadlock persists

Unbounded channels ensure producers never block, allowing continuous HTTP/2 consumption.

Memory safety is ensured by integrating with DataFusion's MemoryPool, which tracks memory usage and can apply backpressure at the query level (spilling, OOM errors).


The Fix: spawn_select_all

The implementation spawns a separate task for each input stream:

pub(super) fn spawn_select_all<T, El, Err>(
    inner: Vec<T>,
    pool: Arc<dyn MemoryPool>,
) -> impl Stream<Item = Result<El, Err>>
where
    T: Stream<Item = Result<El, Err>> + Send + Unpin + 'static,
    El: MemoryFootPrint + Send + 'static,
    Err: Send + 'static,
{
    let (tx, rx) = tokio::sync::mpsc::unbounded_channel();

    let mut tasks = vec![];
    for mut t in inner {
        let tx = tx.clone();
        let pool = Arc::clone(&pool);
        let consumer = MemoryConsumer::new("NetworkBoundary");

        tasks.push(SpawnedTask::spawn(async move {
            while let Some(msg) = t.next().await {
                let mut reservation = consumer.clone_with_new_id().register(&pool);
                if let Ok(msg) = &msg {
                    reservation.grow(msg.get_memory_size());
                }

                if tx.send((msg, reservation)).is_err() {
                    return;
                };
            }
        }))
    }

    UnboundedReceiverStream::new(rx).map(move |(msg, _reservation)| {
        // keep the tasks alive as long as the stream lives
        let _ = &tasks;
        msg
    })
}

Properties

Property Purpose
Separate tasks Tokio's scheduler ensures fair polling of all streams
Unbounded channels Producers never block, ensuring continuous HTTP/2 consumption
Memory reservations DataFusion's MemoryPool provides backpressure at query level
Task lifecycle Tasks kept alive via closure capture, proper cleanup on drop

Relevant Dependency Code

h2 (HTTP/2 implementation)

hyper (HTTP client/server)

tonic (gRPC implementation)


Configuration Notes

Default HTTP/2 Window Sizes

Setting Default Value Source
Initial stream window 65,535 bytes RFC 9113 §6.9.2
Initial connection window 65,535 bytes RFC 9113 §6.9.2

Tonic Configuration Options

Tonic allows configuring window sizes via Endpoint:

Channel::from_static("http://example.com")
    .initial_stream_window_size(some_value)
    .initial_connection_window_size(some_value)

However, increasing window sizes only delays the problem—it doesn't eliminate the fundamental fairness issue with select_all.


Conclusion

The spawn_select_all implementation is the correct solution, not a workaround. It follows the established pattern of isolating network I/O into independent tasks to prevent flow control issues and ensure fair scheduling.

Key Takeaways

  1. HTTP/2 connection-level flow control creates implicit dependencies between streams on the same connection
  2. The h2/hyper stack requires explicit capacity release, which only happens when data is polled
  3. futures::stream::select_all is unsuitable for HTTP/2 streams that share a connection due to potential starvation
  4. Separate tasks + unbounded channels (with memory tracking) is the robust solution

References

@gabotechs gabotechs merged commit 94c5d77 into main Dec 8, 2025
4 checks passed
@gabotechs gabotechs deleted the gabrielmusat/fix-network-shuffle-deadlock branch December 8, 2025 10:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants