Skip to content

Conversation

gabotechs
Copy link
Collaborator

@gabotechs gabotechs commented Sep 26, 2025

Previously, once a stage was evicted, it was dropped immediately even if the SendableRecordBatchStream is still in progress.

This is problematic because if a plan node inside the stage is holding some state (like an spawned task) necessary for the SendableRecordBatchStream to work, it might get dropped to soon, cancelling all tasks associated to that node too early.

This is something that does not happen in normal DataFusion, there, users can assume that references to plan nodes are going to live at least as long as the SendableRecordBatchStream.

@jayshrivastava
Copy link
Collaborator

👋🏽 I think this is fixed by do_get.rs in my WIP PR: #160. Please let me know what you think!

@gabotechs
Copy link
Collaborator Author

🤔 yeah, I think it should solve it. To make sure, you can port tests/stateful_execution_plan.rs to your PR and double check that it passes

Copy link
Collaborator

@NGA-TRAN NGA-TRAN left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is well described and the fix is very nice. I just have a suggestion to describe where exactly the bug happens in your specific test


/// Stream that executes a callback when it is fully consumed or gets cancelled.
#[pin_project(PinnedDrop)]
pub struct CallbackStream<S, F>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read through the implementation and usage. I makes sense and a very nice fix.
My only question is about the name. Is it CallBack a common name for this kind of thing? The name does not spark immediate meaning of the work but if it is only me, it is fine to keep the name as-is

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'd say "callback" is pretty common for this in the software industry. A callback is a function whose execution gets deferred until a certain event happens. In this case, when the stream finishes.

use tokio_stream::wrappers::ReceiverStream;

#[tokio::test]
async fn stateful_execution_plan() -> Result<(), Box<dyn std::error::Error>> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is hard for me to understand the problem of this test before the fix. Maybe adding comments in the plan where/why problem happens and why this fix solve it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment explaining what the test is about

@jayshrivastava
Copy link
Collaborator

@gabotechs Your test is passing on my PR here: #168. However, I think you should merge this fix. Even though my TrailingFlightDataStream fixes the issue, I will eventually make it so non explain (analyze) queries do not use it. In that case, the callback stream will be appropriate.

@gabotechs gabotechs merged commit d3b46ab into main Sep 29, 2025
4 checks passed
@gabotechs gabotechs deleted the gabrielmusat/fix-early-drop-stateful-nodes branch September 29, 2025 19:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants