-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Push down preferred sorts into TableScan
logical plan node
#17337
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Push down preferred sorts into TableScan
logical plan node
#17337
Conversation
This commit adds a new optional field `preferred_ordering` to the `TableScan` logical plan node to support sort pushdown optimizations. Changes include: - Add `preferred_ordering: Option<Vec<SortExpr>>` field to `TableScan` struct - Add `try_new_with_preferred_ordering` constructor method - Update all `TableScan` constructors throughout the codebase to include the new field - Update `Debug`, `PartialEq`, `Hash`, and `PartialOrd` implementations - Update pattern matching in optimizer and other modules The preferred_ordering field is currently not used by any optimization rules but provides the foundation for future sort pushdown implementations. This is part 2 of 2 PRs split from apache#17273 as requested in apache#17273 (comment) 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]>
This commit adds a new optimizer rule that pushes sort expressions down into TableScan nodes as preferred_ordering, enabling table providers to potentially optimize scans based on sort requirements. Features: - PushDownSort optimizer rule that detects Sort -> TableScan patterns - Pushes down simple column-based sort expressions only - Sets TableScan.preferred_ordering field for table provider optimization - Completely eliminates Sort node when all expressions can be pushed down - Comprehensive test coverage The rule is positioned strategically in the optimizer pipeline after limit pushdown but before filter pushdown to maximize optimization opportunities. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]>
TableScan
logical plan nodeTableScan
logical plan node
/// Optional preferred ordering for the scan | ||
pub preferred_ordering: Option<Vec<SortExpr>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@berkaysynnada do you think this is the right information to pass down? Or is there a world where it makes sense to pass down some sort of "equivalence" information?
cc @alamb
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also think @suremarc and @ozankabak may be interested in this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A more future-proof API (that we could change the internal representation) might be something like
/// Preferred ordering
///
/// Preferred orderings can potentially help DataFusion optimize queries, even in cases
/// when the output does not completely follow that order. This is information passed
/// to the scan about what might help.
///
/// For example, a query with `ORDER BY time DESC LIMIT 10`, DataFusion's dynamic
/// predicates and TopK operator will work better if the data is roughly ordered by descending
/// time (more recent data first)
struct PreferredOrdering {
exprs: Vec<SortExpr>
}
And then change this API to
/// Optional preferred ordering for the scan | |
pub preferred_ordering: Option<Vec<SortExpr>>, | |
/// Optional preferred ordering for the scan | |
pub preferred_ordering: Option<PreferredOrdering>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@berkaysynnada do you think this is the right information to pass down? Or is there a world where it makes sense to pass down some sort of "equivalence" information?
cc @alamb
When we are registering the sources, we can provide multiple orderings if the table supports them. However, the requirements are singular, and I don't think there would be any meaning in ordering the table for both col_a
and col_b
simultaneously. So, I've always thought that requirements need only one ordering, but specs should be capable of having multiple orderings. So there isn't any obvious advantage of using equivalences here, IMO
TableScan
logical plan nodeTableScan
logical plan node
🤖 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this looks good but I was confused that the tests don't seem to show the preferred ordering. I think we should fix those tests before merging -- I also expect it to show that some of the pushdown isn't working quite as expected (aka pushing through a projection or filter)
I also recommend putting the prefered sort expressions in their own struct, but that is not required in my mind.
As I understand the plan, in the next few PRs, @adriangb will update the various APIs so that this preferred sort is provided to TableProvider::scan
(really via scan_with_args)
I also wonder if we should wait for the DataFusion 50 release before merging this or if it is ok to merge now.
/// Optional preferred ordering for the scan | ||
pub preferred_ordering: Option<Vec<SortExpr>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A more future-proof API (that we could change the internal representation) might be something like
/// Preferred ordering
///
/// Preferred orderings can potentially help DataFusion optimize queries, even in cases
/// when the output does not completely follow that order. This is information passed
/// to the scan about what might help.
///
/// For example, a query with `ORDER BY time DESC LIMIT 10`, DataFusion's dynamic
/// predicates and TopK operator will work better if the data is roughly ordered by descending
/// time (more recent data first)
struct PreferredOrdering {
exprs: Vec<SortExpr>
}
And then change this API to
/// Optional preferred ordering for the scan | |
pub preferred_ordering: Option<Vec<SortExpr>>, | |
/// Optional preferred ordering for the scan | |
pub preferred_ordering: Option<PreferredOrdering>, |
|
||
/// Sets the preferred ordering for this table scan using the builder pattern. | ||
/// | ||
/// The preferred ordering serves as a hint to table providers about the desired |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mybe we can move some of this comment to PreferredOrdering
if we go with the struct approach
} | ||
|
||
// If the table scan already has preferred ordering, don't overwrite it | ||
// This preserves any existing sort preferences from other optimizations |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when would there be existing preferences?
/// Currently, we only support pushing down simple column references | ||
/// because table providers typically can't optimize complex expressions | ||
/// in sort pushdown. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this a fundamental limitation? I ask because @pepijnve was asking about "column only" support the other day at
/// | ||
/// # Examples | ||
/// | ||
/// ```rust |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as much as I love examples, I am not sure this one adds much
let new_table_scan = TableScan { | ||
table_name: table_scan.table_name.clone(), | ||
source: Arc::clone(&table_scan.source), | ||
projection: table_scan.projection.clone(), | ||
projected_schema: Arc::clone(&table_scan.projected_schema), | ||
filters: table_scan.filters.clone(), | ||
fetch: table_scan.fetch, | ||
preferred_ordering: Some(sort.expr.clone()), | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would be clearer and less error prone, if you could do something that only change the field of interest. Perhaps like this:
let new_table_scan = TableScan { | |
table_name: table_scan.table_name.clone(), | |
source: Arc::clone(&table_scan.source), | |
projection: table_scan.projection.clone(), | |
projected_schema: Arc::clone(&table_scan.projected_schema), | |
filters: table_scan.filters.clone(), | |
fetch: table_scan.fetch, | |
preferred_ordering: Some(sort.expr.clone()), | |
}; | |
let new_table_scan = table_scan.clone() | |
.with_preferred_ordering(Some(sort.expr.clone())) |
Sort: t1.a ASC NULLS LAST | ||
Inner Join: t1.a = t2.a | ||
TableScan: t1 | ||
TableScan: t2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these tests don't really show that the preferred ordering is pushed through. Perhaps we can update the plan to show any preferred ordering
#[derive(Default, Debug)] | ||
pub struct PushDownSort {} | ||
|
||
impl PushDownSort { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the EnforceSorting
rule already pushes sorts down in the plan -- https://docs.rs/datafusion/latest/datafusion/physical_optimizer/enforce_sorting/struct.EnforceSorting.html
Do you think we will need more sort pushdown? Or will this always just be "pass down preferred sorts" to LogicalPlans?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice. I was hoping another optimizer rule does the "hard work" so we can do just the simple thing here (only a subset of node types we need to support).
@ r" | ||
Sort: test.a ASC NULLS LAST | ||
Filter: test.a > Int32(10) | ||
TableScan: test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW I think this table scan should have the preferred ordering passed to it, but I am not sure the current code will do so
Here is a PR that avoids some clones, which might improve performance |
🤔 this seems to have caused a massive slowdown in the sql planner benchmark somehow: Benchmarking physical_sorted_union_order_by_300: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 3942.3s, or reduce sample count to 10.
physical_sorted_union_order_by_300
time: [38.914 s 38.997 s 39.079 s]
Benchmarking logical_plan_optimize: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 17189.6s, or reduce sample count to 10. It is still running... |
I'm sure it's just a dumb mistake on my end. Let me do a round of looking at your comments and investigating, thank you for your patience 🙏🏻 |
I think we should wait until after v50 |
TableProvider::scan_with_args
to support pushdown sorting #17273 (comment).This will enable TableProvider's to produce files in an order and partitioning that optimizes query execution, e.g. to make a TopK operator stop earlier via dynamic filters or to completely optimize away a sort if the files can be ordered to do so.
Note that this is:
Because physical optimizer rules should remove unnecessary sorts later on.
This avoids complexity of negotiating sort order with the TableProvider: ExecutionPlan (the thing TableProvider returns) already has APIs to negotiate sort orders.
So TableProvider can encode into the ExecutionPlan that it returns that "the sort is completely handled in the scan" and then phyiscal optimizer rules will remove the SortExec.