Skip to content

Conversation

gengliangwang
Copy link
Member

What changes were proposed in this pull request?

Enable column pruning and predicate pushdown in DSV2 streaming.
The pushdown happens during analysis instead of relying on the optimizer. The streaming execution needs an actual V2 Scan early so we can materialize a SparkDataStream via Scan.toMicroBatchStream or Scan.toContinuousStream.

Why are the changes needed?

To reduce data read and compute in streaming queries by pushing filters and projecting only needed columns into DSv2 readers, aligning streaming with batch DSv2 capabilities.

Does this PR introduce any user-facing change?

No

How was this patch tested?

New unit tests

Was this patch authored or co-authored using generative AI tooling?

No

@gengliangwang
Copy link
Member Author

cc @jerrypeng

// yet exist at this point, which would prevent creating the stream and collecting sources
// reliably for offset tracking and recovery.
val _logicalPlan = org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown
.apply(basePlan)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because Spark’s micro-batch streaming must materialize the MicroBatchStream during the analysis phase, and DSv2 constructs it through the sequence ScanBuilder → Scan → Scan.toMicroBatchStream, the optimizer rule V2ScanRelationPushDown needs to be applied early—specifically on the analyzed plan of MicroBatchExecution.

This makes the flow somewhat tricky. Moreover, since V2ScanRelationPushDown expects all predicates to be fully combined and pushed down, applying it too early may cause the pushdown to fail. We also need to handle streaming deduplication properly.

Until we find a cleaner solution, I’ll close this PR for now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant