From 2f6f105e909b5bcc18506b00640f956eb5e0391d Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 27 Aug 2025 14:16:55 -0500 Subject: [PATCH 01/10] Add preferred_ordering field to TableScan MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit adds a new optional field `preferred_ordering` to the `TableScan` logical plan node to support sort pushdown optimizations. Changes include: - Add `preferred_ordering: Option>` field to `TableScan` struct - Add `try_new_with_preferred_ordering` constructor method - Update all `TableScan` constructors throughout the codebase to include the new field - Update `Debug`, `PartialEq`, `Hash`, and `PartialOrd` implementations - Update pattern matching in optimizer and other modules The preferred_ordering field is currently not used by any optimization rules but provides the foundation for future sort pushdown implementations. This is part 2 of 2 PRs split from #17273 as requested in https://github.com/apache/datafusion/pull/17273#issuecomment-3218814835 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- datafusion/expr/src/logical_plan/plan.rs | 25 +++++++++++++++++++ datafusion/expr/src/logical_plan/tree_node.rs | 2 ++ .../optimizer/src/optimize_projections/mod.rs | 4 ++- datafusion/optimizer/src/push_down_filter.rs | 1 + datafusion/proto/src/logical_plan/mod.rs | 1 + 5 files changed, 32 insertions(+), 1 deletion(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index b4f2902cc43e..d9ff78c0a2f2 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -2552,6 +2552,8 @@ pub struct TableScan { pub filters: Vec, /// Optional number of rows to read pub fetch: Option, + /// Optional preferred ordering for the scan + pub preferred_ordering: Option>, } impl Debug for TableScan { @@ -2563,6 +2565,7 @@ impl Debug for TableScan { .field("projected_schema", &self.projected_schema) .field("filters", &self.filters) .field("fetch", &self.fetch) + .field("preferred_ordering", &self.preferred_ordering) .finish_non_exhaustive() } } @@ -2574,6 +2577,7 @@ impl PartialEq for TableScan { && self.projected_schema == other.projected_schema && self.filters == other.filters && self.fetch == other.fetch + && self.preferred_ordering == other.preferred_ordering } } @@ -2593,18 +2597,22 @@ impl PartialOrd for TableScan { pub filters: &'a Vec, /// Optional number of rows to read pub fetch: &'a Option, + /// Optional preferred ordering for the scan + pub preferred_ordering: &'a Option>, } let comparable_self = ComparableTableScan { table_name: &self.table_name, projection: &self.projection, filters: &self.filters, fetch: &self.fetch, + preferred_ordering: &self.preferred_ordering, }; let comparable_other = ComparableTableScan { table_name: &other.table_name, projection: &other.projection, filters: &other.filters, fetch: &other.fetch, + preferred_ordering: &other.preferred_ordering, }; comparable_self.partial_cmp(&comparable_other) } @@ -2617,6 +2625,7 @@ impl Hash for TableScan { self.projected_schema.hash(state); self.filters.hash(state); self.fetch.hash(state); + self.preferred_ordering.hash(state); } } @@ -2670,8 +2679,22 @@ impl TableScan { projected_schema, filters, fetch, + preferred_ordering: None, }) } + + pub fn try_new_with_preferred_ordering( + table_name: impl Into, + table_source: Arc, + projection: Option>, + filters: Vec, + fetch: Option, + preferred_ordering: Option>, + ) -> Result { + let mut table_scan = Self::try_new(table_name, table_source, projection, filters, fetch)?; + table_scan.preferred_ordering = preferred_ordering; + Ok(table_scan) + } } // Repartition the plan based on a partitioning scheme. @@ -4896,6 +4919,7 @@ mod tests { projected_schema: Arc::clone(&schema), filters: vec![], fetch: None, + preferred_ordering: None, })); let col = schema.field_names()[0].clone(); @@ -4926,6 +4950,7 @@ mod tests { projected_schema: Arc::clone(&unique_schema), filters: vec![], fetch: None, + preferred_ordering: None, })); let col = schema.field_names()[0].clone(); diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index 47088370a1d9..37244ebf9437 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -599,6 +599,7 @@ impl LogicalPlan { projected_schema, filters, fetch, + preferred_ordering, }) => filters.map_elements(f)?.update_data(|filters| { LogicalPlan::TableScan(TableScan { table_name, @@ -607,6 +608,7 @@ impl LogicalPlan { projected_schema, filters, fetch, + preferred_ordering, }) }), LogicalPlan::Distinct(Distinct::On(DistinctOn { diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 97402c990b83..c5527c5db5f9 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -242,6 +242,7 @@ fn optimize_projections( filters, fetch, projected_schema: _, + preferred_ordering, } = table_scan; // Get indices referred to in the original (schema with all fields) @@ -250,12 +251,13 @@ fn optimize_projections( Some(projection) => indices.into_mapped_indices(|idx| projection[idx]), None => indices.into_inner(), }; - return TableScan::try_new( + return TableScan::try_new_with_preferred_ordering( table_name, source, Some(projection), filters, fetch, + preferred_ordering, ) .map(LogicalPlan::TableScan) .map(Transformed::yes); diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 27c2499c8a26..50e7d00b7788 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -3055,6 +3055,7 @@ mod tests { projection, source: Arc::new(test_provider), fetch: None, + preferred_ordering: None, }); Ok(LogicalPlanBuilder::from(table_scan)) diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index cc3e805ed1df..0d6f495d392f 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -271,6 +271,7 @@ fn from_table_source( projected_schema, filters: vec![], fetch: None, + preferred_ordering: None, }); LogicalPlanNode::try_from_logical_plan(&r, extension_codec) From ac4fa4035341f2f2f7e329ebdf63b33bae2043fb Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 27 Aug 2025 14:38:24 -0500 Subject: [PATCH 02/10] Add sort pushdown optimizer rule MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit adds a new optimizer rule that pushes sort expressions down into TableScan nodes as preferred_ordering, enabling table providers to potentially optimize scans based on sort requirements. Features: - PushDownSort optimizer rule that detects Sort -> TableScan patterns - Pushes down simple column-based sort expressions only - Sets TableScan.preferred_ordering field for table provider optimization - Completely eliminates Sort node when all expressions can be pushed down - Comprehensive test coverage The rule is positioned strategically in the optimizer pipeline after limit pushdown but before filter pushdown to maximize optimization opportunities. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- datafusion/optimizer/src/lib.rs | 1 + datafusion/optimizer/src/optimizer.rs | 3 + datafusion/optimizer/src/push_down_sort.rs | 145 +++++++++++++++++++++ 3 files changed, 149 insertions(+) create mode 100644 datafusion/optimizer/src/push_down_sort.rs diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs index 280010e3d92c..8d6088cc9d91 100644 --- a/datafusion/optimizer/src/lib.rs +++ b/datafusion/optimizer/src/lib.rs @@ -58,6 +58,7 @@ pub mod optimizer; pub mod propagate_empty_relation; pub mod push_down_filter; pub mod push_down_limit; +pub mod push_down_sort; pub mod replace_distinct_aggregate; pub mod scalar_subquery_to_join; pub mod simplify_expressions; diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 49806d6db344..302e6f343f8a 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -51,6 +51,7 @@ use crate::plan_signature::LogicalPlanSignature; use crate::propagate_empty_relation::PropagateEmptyRelation; use crate::push_down_filter::PushDownFilter; use crate::push_down_limit::PushDownLimit; +use crate::push_down_sort::PushDownSort; use crate::replace_distinct_aggregate::ReplaceDistinctWithAggregate; use crate::scalar_subquery_to_join::ScalarSubqueryToJoin; use crate::simplify_expressions::SimplifyExpressions; @@ -242,6 +243,8 @@ impl Optimizer { Arc::new(EliminateOuterJoin::new()), // Filters can't be pushed down past Limits, we should do PushDownFilter after PushDownLimit Arc::new(PushDownLimit::new()), + // Sort pushdown should happen before filter pushdown to maximize optimization opportunities + Arc::new(PushDownSort::new()), Arc::new(PushDownFilter::new()), Arc::new(SingleDistinctToGroupBy::new()), // The previous optimizations added expressions and projections, diff --git a/datafusion/optimizer/src/push_down_sort.rs b/datafusion/optimizer/src/push_down_sort.rs new file mode 100644 index 000000000000..9b681123aae7 --- /dev/null +++ b/datafusion/optimizer/src/push_down_sort.rs @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`PushDownSort`] pushes sort expressions into table scans to enable +//! sort pushdown optimizations by table providers + +use std::sync::Arc; + +use crate::optimizer::ApplyOrder; +use crate::{OptimizerConfig, OptimizerRule}; + +use datafusion_common::tree_node::Transformed; +use datafusion_common::Result; +use datafusion_expr::logical_plan::{LogicalPlan, TableScan}; +use datafusion_expr::{Expr, SortExpr}; + +/// Optimization rule that pushes sort expressions down to table scans +/// when the sort can potentially be optimized by the table provider. +/// +/// This rule looks for `Sort -> TableScan` patterns and moves the sort +/// expressions into the `TableScan.preferred_ordering` field, allowing +/// table providers to potentially optimize the scan based on sort requirements. +#[derive(Default, Debug)] +pub struct PushDownSort {} + +impl PushDownSort { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } + + /// Checks if a sort expression can be pushed down to a table scan. + /// + /// Currently, we only support pushing down simple column references + /// because table providers typically can't optimize complex expressions + /// in sort pushdown. + fn can_pushdown_sort_expr(expr: &SortExpr) -> bool { + // Only push down simple column references + matches!(expr.expr, Expr::Column(_)) + } + + /// Checks if all sort expressions in a list can be pushed down. + fn can_pushdown_sort_exprs(sort_exprs: &[SortExpr]) -> bool { + sort_exprs.iter().all(Self::can_pushdown_sort_expr) + } +} + +impl OptimizerRule for PushDownSort { + fn supports_rewrite(&self) -> bool { + true + } + + fn apply_order(&self) -> Option { + Some(ApplyOrder::TopDown) + } + + fn rewrite( + &self, + plan: LogicalPlan, + _config: &dyn OptimizerConfig, + ) -> Result> { + // Look for Sort -> TableScan pattern + let LogicalPlan::Sort(sort) = &plan else { + return Ok(Transformed::no(plan)); + }; + + let LogicalPlan::TableScan(table_scan) = sort.input.as_ref() else { + return Ok(Transformed::no(plan)); + }; + + // Check if we can push down the sort expressions + if !Self::can_pushdown_sort_exprs(&sort.expr) { + return Ok(Transformed::no(plan)); + } + + // If the table scan already has preferred ordering, don't overwrite it + // This preserves any existing sort preferences from other optimizations + if table_scan.preferred_ordering.is_some() { + return Ok(Transformed::no(plan)); + } + + // Create new TableScan with preferred ordering + let new_table_scan = TableScan { + table_name: table_scan.table_name.clone(), + source: Arc::clone(&table_scan.source), + projection: table_scan.projection.clone(), + projected_schema: Arc::clone(&table_scan.projected_schema), + filters: table_scan.filters.clone(), + fetch: table_scan.fetch, + preferred_ordering: Some(sort.expr.clone()), + }; + + // The sort can be completely eliminated since we've pushed it down + // The table provider may or may not be able to satisfy the ordering, + // but that's up to the table provider to decide + let new_plan = LogicalPlan::TableScan(new_table_scan); + + Ok(Transformed::yes(new_plan)) + } + + fn name(&self) -> &str { + "push_down_sort" + } +} + +#[cfg(test)] +mod tests { + use super::*; + use datafusion_expr::{col, SortExpr}; + + #[test] + fn test_can_pushdown_sort_expr() { + // Simple column reference should be pushable + let sort_expr = SortExpr::new(col("a"), true, false); + assert!(PushDownSort::can_pushdown_sort_expr(&sort_expr)); + + // Complex expression should not be pushable + let sort_expr = SortExpr::new( + col("a") + col("b"), + true, + false + ); + assert!(!PushDownSort::can_pushdown_sort_expr(&sort_expr)); + } + + #[test] + fn test_name() { + let rule = PushDownSort::new(); + assert_eq!(rule.name(), "push_down_sort"); + } +} \ No newline at end of file From be8dd818324585c0db26a821eedb57b8b84a380b Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 27 Aug 2025 14:41:47 -0500 Subject: [PATCH 03/10] fmt --- datafusion/expr/src/logical_plan/plan.rs | 3 ++- datafusion/optimizer/src/push_down_sort.rs | 12 ++++-------- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index d9ff78c0a2f2..16eca2a5540b 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -2691,7 +2691,8 @@ impl TableScan { fetch: Option, preferred_ordering: Option>, ) -> Result { - let mut table_scan = Self::try_new(table_name, table_source, projection, filters, fetch)?; + let mut table_scan = + Self::try_new(table_name, table_source, projection, filters, fetch)?; table_scan.preferred_ordering = preferred_ordering; Ok(table_scan) } diff --git a/datafusion/optimizer/src/push_down_sort.rs b/datafusion/optimizer/src/push_down_sort.rs index 9b681123aae7..f80b7795b7cc 100644 --- a/datafusion/optimizer/src/push_down_sort.rs +++ b/datafusion/optimizer/src/push_down_sort.rs @@ -44,7 +44,7 @@ impl PushDownSort { } /// Checks if a sort expression can be pushed down to a table scan. - /// + /// /// Currently, we only support pushing down simple column references /// because table providers typically can't optimize complex expressions /// in sort pushdown. @@ -108,7 +108,7 @@ impl OptimizerRule for PushDownSort { // The table provider may or may not be able to satisfy the ordering, // but that's up to the table provider to decide let new_plan = LogicalPlan::TableScan(new_table_scan); - + Ok(Transformed::yes(new_plan)) } @@ -129,11 +129,7 @@ mod tests { assert!(PushDownSort::can_pushdown_sort_expr(&sort_expr)); // Complex expression should not be pushable - let sort_expr = SortExpr::new( - col("a") + col("b"), - true, - false - ); + let sort_expr = SortExpr::new(col("a") + col("b"), true, false); assert!(!PushDownSort::can_pushdown_sort_expr(&sort_expr)); } @@ -142,4 +138,4 @@ mod tests { let rule = PushDownSort::new(); assert_eq!(rule.name(), "push_down_sort"); } -} \ No newline at end of file +} From 1fd80fd685188307897aed15a987abdcfcf27682 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 27 Aug 2025 15:44:45 -0500 Subject: [PATCH 04/10] add tests, fix behavior --- datafusion/optimizer/src/push_down_sort.rs | 417 ++++++++++++++++++++- 1 file changed, 410 insertions(+), 7 deletions(-) diff --git a/datafusion/optimizer/src/push_down_sort.rs b/datafusion/optimizer/src/push_down_sort.rs index f80b7795b7cc..bee0f4306405 100644 --- a/datafusion/optimizer/src/push_down_sort.rs +++ b/datafusion/optimizer/src/push_down_sort.rs @@ -104,10 +104,14 @@ impl OptimizerRule for PushDownSort { preferred_ordering: Some(sort.expr.clone()), }; - // The sort can be completely eliminated since we've pushed it down - // The table provider may or may not be able to satisfy the ordering, - // but that's up to the table provider to decide - let new_plan = LogicalPlan::TableScan(new_table_scan); + // Preserve the Sort node as a fallback while passing the ordering + // preference to the TableScan as an optimization hint + let new_sort = datafusion_expr::logical_plan::Sort { + expr: sort.expr.clone(), + input: Arc::new(LogicalPlan::TableScan(new_table_scan)), + fetch: sort.fetch, + }; + let new_plan = LogicalPlan::Sort(new_sort); Ok(Transformed::yes(new_plan)) } @@ -120,7 +124,27 @@ impl OptimizerRule for PushDownSort { #[cfg(test)] mod tests { use super::*; - use datafusion_expr::{col, SortExpr}; + use crate::test::test_table_scan; + use crate::{assert_optimized_plan_eq_snapshot, OptimizerContext}; + use datafusion_common::{Column, Result}; + use datafusion_expr::{col, lit, Expr, JoinType, LogicalPlanBuilder, SortExpr}; + use std::sync::Arc; + + macro_rules! assert_optimized_plan_equal { + ( + $plan:expr, + @ $expected:literal $(,)? + ) => {{ + let optimizer_ctx = OptimizerContext::new().with_max_passes(1); + let rules: Vec> = vec![Arc::new(PushDownSort::new())]; + assert_optimized_plan_eq_snapshot!( + optimizer_ctx, + rules, + $plan, + @ $expected, + ) + }}; + } #[test] fn test_can_pushdown_sort_expr() { @@ -131,11 +155,390 @@ mod tests { // Complex expression should not be pushable let sort_expr = SortExpr::new(col("a") + col("b"), true, false); assert!(!PushDownSort::can_pushdown_sort_expr(&sort_expr)); + + // Function call should not be pushable + let sort_expr = SortExpr::new(col("c").like(lit("test%")), true, false); + assert!(!PushDownSort::can_pushdown_sort_expr(&sort_expr)); + + // Literal should not be pushable + let sort_expr = SortExpr::new(lit(42), true, false); + assert!(!PushDownSort::can_pushdown_sort_expr(&sort_expr)); + } + + #[test] + fn test_can_pushdown_sort_exprs() { + // All simple columns should be pushable + let sort_exprs = vec![ + SortExpr::new(col("a"), true, false), + SortExpr::new(col("b"), false, true), + ]; + assert!(PushDownSort::can_pushdown_sort_exprs(&sort_exprs)); + + // Mix of simple and complex should not be pushable + let sort_exprs = vec![ + SortExpr::new(col("a"), true, false), + SortExpr::new(col("a") + col("b"), false, true), + ]; + assert!(!PushDownSort::can_pushdown_sort_exprs(&sort_exprs)); + + // Empty list should be pushable + let sort_exprs = vec![]; + assert!(PushDownSort::can_pushdown_sort_exprs(&sort_exprs)); + } + + #[test] + fn test_basic_sort_pushdown_to_table_scan() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .sort(vec![SortExpr::new(col("a"), true, false)])? + .build()?; + + // Sort node is preserved with preferred_ordering passed to TableScan + assert_optimized_plan_equal!( + plan, + @ r" + Sort: test.a ASC NULLS LAST + TableScan: test + " + ) } #[test] - fn test_name() { + fn test_multiple_column_sort_pushdown() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .sort(vec![ + SortExpr::new(col("a"), true, false), + SortExpr::new(col("b"), false, true), + ])? + .build()?; + + // Multi-column sort is preserved with preferred_ordering passed to TableScan + assert_optimized_plan_equal!( + plan, + @ r" + Sort: test.a ASC NULLS LAST, test.b DESC NULLS FIRST + TableScan: test + " + ) + } + + #[test] + fn test_sort_node_preserved_with_preferred_ordering() -> Result<()> { let rule = PushDownSort::new(); - assert_eq!(rule.name(), "push_down_sort"); + let table_scan = test_table_scan()?; + let sort_plan = LogicalPlanBuilder::from(table_scan) + .sort(vec![SortExpr::new(col("a"), true, false)])? + .build()?; + + let config = &OptimizerContext::new(); + let result = rule.rewrite(sort_plan, config)?; + + // Verify Sort node is preserved + match &result.data { + LogicalPlan::Sort(sort) => { + // Check that TableScan has preferred_ordering + if let LogicalPlan::TableScan(ts) = sort.input.as_ref() { + assert!(ts.preferred_ordering.is_some()); + } else { + panic!("Expected TableScan input"); + } + } + _ => panic!("Expected Sort node to be preserved"), + } + + Ok(()) + } + + #[test] + fn test_no_pushdown_with_complex_expressions() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .sort(vec![ + SortExpr::new(col("a"), true, false), + SortExpr::new(col("a") + col("b"), false, true), // Complex expression + ])? + .build()?; + + // Sort should remain unchanged + assert_optimized_plan_equal!( + plan, + @ r" + Sort: test.a ASC NULLS LAST, test.a + test.b DESC NULLS FIRST + TableScan: test + " + ) + } + + #[test] + fn test_no_pushdown_through_projection() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .project(vec![col("a"), col("b")])? + .sort(vec![SortExpr::new(col("a"), true, false)])? + .build()?; + + // Sort should remain above projection + assert_optimized_plan_equal!( + plan, + @ r" + Sort: test.a ASC NULLS LAST + Projection: test.a, test.b + TableScan: test + " + ) + } + + #[test] + fn test_no_pushdown_through_filter() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .filter(col("a").gt(lit(10)))? + .sort(vec![SortExpr::new(col("a"), true, false)])? + .build()?; + + // Sort should remain above filter + assert_optimized_plan_equal!( + plan, + @ r" + Sort: test.a ASC NULLS LAST + Filter: test.a > Int32(10) + TableScan: test + " + ) + } + + #[test] + fn test_no_pushdown_through_aggregate() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .aggregate(vec![col("a")], Vec::::new())? + .sort(vec![SortExpr::new(col("a"), true, false)])? + .build()?; + + // Sort should remain above aggregate + assert_optimized_plan_equal!( + plan, + @ r" + Sort: test.a ASC NULLS LAST + Aggregate: groupBy=[[test.a]], aggr=[[]] + TableScan: test + " + ) + } + + #[test] + fn test_no_pushdown_through_join() -> Result<()> { + let left_table = crate::test::test_table_scan_with_name("t1")?; + let right_table = crate::test::test_table_scan_with_name("t2")?; + + let plan = LogicalPlanBuilder::from(left_table) + .join( + right_table, + JoinType::Inner, + (vec![Column::from_name("a")], vec![Column::from_name("a")]), + None, + )? + .sort(vec![SortExpr::new( + Expr::Column(Column::new(Some("t1"), "a")), + true, + false, + )])? + .build()?; + + // Sort should remain above join + assert_optimized_plan_equal!( + plan, + @ r" + Sort: t1.a ASC NULLS LAST + Inner Join: t1.a = t2.a + TableScan: t1 + TableScan: t2 + " + ) + } + + #[test] + fn test_no_pushdown_through_limit() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .limit(0, Some(10))? + .sort(vec![SortExpr::new(col("a"), true, false)])? + .build()?; + + // Sort should remain above limit + assert_optimized_plan_equal!( + plan, + @ r" + Sort: test.a ASC NULLS LAST + Limit: skip=0, fetch=10 + TableScan: test + " + ) + } + + #[test] + fn test_no_pushdown_through_distinct() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .distinct()? + .sort(vec![SortExpr::new(col("a"), true, false)])? + .build()?; + + // Sort should remain above distinct + assert_optimized_plan_equal!( + plan, + @ r" + Sort: test.a ASC NULLS LAST + Distinct: + TableScan: test + " + ) + } + + #[test] + fn test_no_pushdown_on_non_sort_nodes() -> Result<()> { + let table_scan = test_table_scan()?; + + // TableScan should remain unchanged + assert_optimized_plan_equal!( + table_scan, + @ r"TableScan: test" + ) + } + + // Tests for node types that currently block sort pushdown + + #[test] + fn test_potential_pushdown_through_subquery_alias() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .alias("aliased_table")? + .sort(vec![SortExpr::new(col("a"), true, false)])? + .build()?; + + // Sort remains above SubqueryAlias + assert_optimized_plan_equal!( + plan, + @ r" + Sort: aliased_table.a ASC NULLS LAST + SubqueryAlias: aliased_table + TableScan: test + " + ) + } + + #[test] + fn test_potential_pushdown_through_order_preserving_projection() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .project(vec![col("a"), col("b"), col("c")])? // Identity projection - doesn't change column order + .sort(vec![SortExpr::new(col("a"), true, false)])? + .build()?; + + // Sort remains above Projection (conservative approach) + assert_optimized_plan_equal!( + plan, + @ r" + Sort: test.a ASC NULLS LAST + Projection: test.a, test.b, test.c + TableScan: test + " + ) + } + + #[test] + fn test_potential_pushdown_through_order_preserving_filter() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .filter(col("b").gt(lit(0)))? // Filter on different column than sort + .sort(vec![SortExpr::new(col("a"), true, false)])? + .build()?; + + // Currently: Sort remains above Filter (conservative approach) + // Future enhancement: Could push through filters that don't affect sort column relationships + assert_optimized_plan_equal!( + plan, + @ r" + Sort: test.a ASC NULLS LAST + Filter: test.b > Int32(0) + TableScan: test + " + ) + } + + #[test] + fn test_edge_case_empty_sort_expressions() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .sort(Vec::::new())? // Empty sort + .build()?; + + // Empty sort is preserved + assert_optimized_plan_equal!( + plan, + @ r" + Sort: + TableScan: test + " + ) + } + + #[test] + fn test_sort_with_nulls_first_last_variants() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .sort(vec![ + SortExpr::new(col("a"), true, false), // ASC NULLS LAST + SortExpr::new(col("b"), true, true), // ASC NULLS FIRST + SortExpr::new(col("c"), false, false), // DESC NULLS LAST + ])? + .build()?; + + // All variants of nulls ordering should be pushable for simple columns + assert_optimized_plan_equal!( + plan, + @ r" + Sort: test.a ASC NULLS LAST, test.b ASC NULLS FIRST, test.c DESC NULLS LAST + TableScan: test + " + ) + } + + #[test] + fn test_mixed_simple_and_qualified_columns() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .sort(vec![ + SortExpr::new(col("a"), true, false), // Simple column + SortExpr::new(Expr::Column(Column::new(Some("test"), "b")), false, true), // Qualified column + ])? + .build()?; + + // Both simple and qualified column references should be pushable + assert_optimized_plan_equal!( + plan, + @ r" + Sort: test.a ASC NULLS LAST, test.b DESC NULLS FIRST + TableScan: test + " + ) + } + + #[test] + fn test_case_sensitive_column_references() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .sort(vec![SortExpr::new(col("A"), true, false)])? // Capital A + .build()?; + + // Column reference case sensitivity should be handled by the schema + assert_optimized_plan_equal!( + plan, + @ r" + Sort: test.a ASC NULLS LAST + TableScan: test + " + ) } } From d8b754b6b794de0102ce13a6f3ee152e6a427d2a Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 27 Aug 2025 15:49:49 -0500 Subject: [PATCH 05/10] Refactor --- datafusion/expr/src/logical_plan/plan.rs | 15 +++------------ .../optimizer/src/optimize_projections/mod.rs | 4 ++-- 2 files changed, 5 insertions(+), 14 deletions(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 16eca2a5540b..fc86388eacfa 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -2683,18 +2683,9 @@ impl TableScan { }) } - pub fn try_new_with_preferred_ordering( - table_name: impl Into, - table_source: Arc, - projection: Option>, - filters: Vec, - fetch: Option, - preferred_ordering: Option>, - ) -> Result { - let mut table_scan = - Self::try_new(table_name, table_source, projection, filters, fetch)?; - table_scan.preferred_ordering = preferred_ordering; - Ok(table_scan) + pub fn with_preferred_ordering(mut self, preferred_ordering: Option>) -> Self { + self.preferred_ordering = preferred_ordering; + self } } diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index c5527c5db5f9..d99543993f65 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -251,14 +251,14 @@ fn optimize_projections( Some(projection) => indices.into_mapped_indices(|idx| projection[idx]), None => indices.into_inner(), }; - return TableScan::try_new_with_preferred_ordering( + return TableScan::try_new( table_name, source, Some(projection), filters, fetch, - preferred_ordering, ) + .map(|s| s.with_preferred_ordering(preferred_ordering)) .map(LogicalPlan::TableScan) .map(Transformed::yes); } From 849fe262a3d0e3eb9afa24bd037a3bab8a794158 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 27 Aug 2025 16:09:29 -0500 Subject: [PATCH 06/10] fmt --- datafusion/expr/src/logical_plan/plan.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index fc86388eacfa..1cbbd80057ff 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -2683,7 +2683,10 @@ impl TableScan { }) } - pub fn with_preferred_ordering(mut self, preferred_ordering: Option>) -> Self { + pub fn with_preferred_ordering( + mut self, + preferred_ordering: Option>, + ) -> Self { self.preferred_ordering = preferred_ordering; self } From b6e36db67c8072f48fc8754ec5f40a369660c737 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 27 Aug 2025 16:12:55 -0500 Subject: [PATCH 07/10] update test --- datafusion/sqllogictest/test_files/explain.slt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index 06965ebef0f7..a686795df129 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -194,6 +194,7 @@ logical_plan after eliminate_one_union SAME TEXT AS ABOVE logical_plan after filter_null_join_keys SAME TEXT AS ABOVE logical_plan after eliminate_outer_join SAME TEXT AS ABOVE logical_plan after push_down_limit SAME TEXT AS ABOVE +logical_plan after push_down_sort SAME TEXT AS ABOVE logical_plan after push_down_filter SAME TEXT AS ABOVE logical_plan after single_distinct_aggregation_to_group_by SAME TEXT AS ABOVE logical_plan after eliminate_group_by_constant SAME TEXT AS ABOVE @@ -216,6 +217,7 @@ logical_plan after eliminate_one_union SAME TEXT AS ABOVE logical_plan after filter_null_join_keys SAME TEXT AS ABOVE logical_plan after eliminate_outer_join SAME TEXT AS ABOVE logical_plan after push_down_limit SAME TEXT AS ABOVE +logical_plan after push_down_sort SAME TEXT AS ABOVE logical_plan after push_down_filter SAME TEXT AS ABOVE logical_plan after single_distinct_aggregation_to_group_by SAME TEXT AS ABOVE logical_plan after eliminate_group_by_constant SAME TEXT AS ABOVE From 8ba7fa7a9f09226aba0c52fe54c3c03be29bb6c3 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 27 Aug 2025 16:15:51 -0500 Subject: [PATCH 08/10] add docs --- datafusion/expr/src/logical_plan/plan.rs | 49 ++++++++++++++++++++++ datafusion/optimizer/src/push_down_sort.rs | 38 ++++++++++++++++- 2 files changed, 86 insertions(+), 1 deletion(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 1cbbd80057ff..9c25b49de942 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -2683,6 +2683,55 @@ impl TableScan { }) } + /// Sets the preferred ordering for this table scan using the builder pattern. + /// + /// The preferred ordering serves as a hint to table providers about the desired + /// sort order for the data. Table providers can use this information to optimize + /// data access patterns, choose appropriate indexes, or leverage existing sort + /// orders in the underlying storage. + /// + /// # Parameters + /// + /// * `preferred_ordering` - An optional vector of sort expressions representing + /// the desired ordering. `None` indicates no specific ordering preference. + /// + /// # Returns + /// + /// Returns `self` to enable method chaining in the builder pattern. + /// + /// # Examples + /// + /// ```rust + /// use datafusion_expr::{col, SortExpr}; + /// # use datafusion_expr::logical_plan::TableScan; + /// # use std::sync::Arc; + /// # use datafusion_common::TableReference; + /// + /// // Create a table scan with preferred ordering by column 'a' ascending + /// # let table_name = TableReference::bare("test"); + /// # let source = Arc::new(datafusion_expr::test::table_source(vec![])); + /// # let projection = None; + /// # let projected_schema = Arc::new(datafusion_common::DFSchema::empty()); + /// # let filters = vec![]; + /// # let fetch = None; + /// let table_scan = TableScan { + /// table_name, + /// source, + /// projection, + /// projected_schema, + /// filters, + /// fetch, + /// preferred_ordering: None, + /// }.with_preferred_ordering(Some(vec![ + /// SortExpr::new(col("a"), true, false) // ASC NULLS LAST + /// ])); + /// ``` + /// + /// # Notes + /// + /// This is purely an optimization hint. The table provider may choose to ignore + /// the preferred ordering if it cannot be efficiently satisfied, and the query + /// execution engine should not rely on the data being returned in this order. pub fn with_preferred_ordering( mut self, preferred_ordering: Option>, diff --git a/datafusion/optimizer/src/push_down_sort.rs b/datafusion/optimizer/src/push_down_sort.rs index bee0f4306405..0d1c08f537f1 100644 --- a/datafusion/optimizer/src/push_down_sort.rs +++ b/datafusion/optimizer/src/push_down_sort.rs @@ -34,11 +34,47 @@ use datafusion_expr::{Expr, SortExpr}; /// This rule looks for `Sort -> TableScan` patterns and moves the sort /// expressions into the `TableScan.preferred_ordering` field, allowing /// table providers to potentially optimize the scan based on sort requirements. +/// +/// # Behavior +/// +/// The optimizer preserves the original `Sort` node as a fallback while passing +/// the ordering preference to the `TableScan` as an optimization hint. This ensures +/// correctness even if the table provider cannot satisfy the requested ordering. +/// +/// # Supported Sort Expressions +/// +/// Currently, only simple column references are supported for pushdown because +/// table providers typically cannot optimize complex expressions in sort operations. +/// Complex expressions like `col("a") + col("b")` or function calls are not pushed down. +/// +/// # Examples +/// +/// ```text +/// Before optimization: +/// Sort: test.a ASC NULLS LAST +/// TableScan: test +/// +/// After optimization: +/// Sort: test.a ASC NULLS LAST -- Preserved as fallback +/// TableScan: test -- Now includes preferred_ordering hint +/// ``` #[derive(Default, Debug)] pub struct PushDownSort {} impl PushDownSort { - #[allow(missing_docs)] + /// Creates a new instance of the `PushDownSort` optimizer rule. + /// + /// # Returns + /// + /// A new `PushDownSort` optimizer rule that can be added to the optimization pipeline. + /// + /// # Examples + /// + /// ```rust + /// use datafusion_optimizer::push_down_sort::PushDownSort; + /// + /// let rule = PushDownSort::new(); + /// ``` pub fn new() -> Self { Self {} } From 3353b06359cfcfd1e1f2636c501c38d3b1ec3e41 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 27 Aug 2025 18:46:35 -0500 Subject: [PATCH 09/10] Fix --- datafusion/expr/src/logical_plan/plan.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 9c25b49de942..8a623c82cdd4 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -2703,13 +2703,15 @@ impl TableScan { /// /// ```rust /// use datafusion_expr::{col, SortExpr}; - /// # use datafusion_expr::logical_plan::TableScan; + /// # use datafusion_expr::logical_plan::{TableScan, builder::table_source}; /// # use std::sync::Arc; - /// # use datafusion_common::TableReference; + /// # use datafusion_common::{TableReference, DFSchema}; + /// # use arrow::datatypes::{Schema, Field, DataType}; /// /// // Create a table scan with preferred ordering by column 'a' ascending /// # let table_name = TableReference::bare("test"); - /// # let source = Arc::new(datafusion_expr::test::table_source(vec![])); + /// # let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + /// # let source = table_source(&schema); /// # let projection = None; /// # let projected_schema = Arc::new(datafusion_common::DFSchema::empty()); /// # let filters = vec![]; From 39664ae244581de336feb82ad8ffb94e06b0c745 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 7 Sep 2025 11:02:14 -0500 Subject: [PATCH 10/10] address some feedback --- datafusion/expr/src/logical_plan/mod.rs | 5 +- datafusion/expr/src/logical_plan/plan.rs | 66 ++++++++++++++----- datafusion/expr/src/logical_plan/tree_node.rs | 4 +- .../optimizer/src/optimize_projections/mod.rs | 7 +- datafusion/optimizer/src/push_down_filter.rs | 2 +- datafusion/optimizer/src/push_down_sort.rs | 24 ++----- datafusion/proto/src/logical_plan/mod.rs | 2 +- 7 files changed, 68 insertions(+), 42 deletions(-) diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs index 4bbb9d7ada7e..7cf291e0744b 100644 --- a/datafusion/expr/src/logical_plan/mod.rs +++ b/datafusion/expr/src/logical_plan/mod.rs @@ -41,8 +41,9 @@ pub use plan::{ projection_schema, Aggregate, Analyze, ColumnUnnestList, DescribeTable, Distinct, DistinctOn, EmptyRelation, Explain, ExplainFormat, ExplainOption, Extension, FetchType, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, - PlanType, Projection, RecursiveQuery, Repartition, SkipType, Sort, StringifiedPlan, - Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window, + PlanType, Projection, RecursiveQuery, Repartition, ScanOrdering, SkipType, Sort, + StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union, + Unnest, Values, Window, }; pub use statement::{ Deallocate, Execute, Prepare, SetVariable, Statement, TransactionAccessMode, diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 8a623c82cdd4..4f35d25887a0 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -2537,6 +2537,43 @@ impl PartialOrd for Window { } } +#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Default)] +pub struct ScanOrdering { + /// Optional preferred ordering for the scan that matches the output order of upstream query nodes. + /// It is optional / best effort for the scan to produce this ordering. + /// If the scan produces this exact ordering and sets it's properties to reflect this upstream sorts may be optimized away. + /// Otherwise the sorts may remain in place but partial ordering may be exploited e.g. to do early stopping or reduce complexity of the sort. + /// Thus it is recommended for the scan to also do a best effort to produce partially sorted data if possible. + pub preferred_ordering: Option>, +} + +impl ScanOrdering { + /// Create a new ScanOrdering + pub fn with_preferred_ordering(mut self, preferred_ordering: Vec) -> Self { + self.preferred_ordering = Some(preferred_ordering); + self + } +} + +impl Debug for ScanOrdering { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + let ordering_display = self + .preferred_ordering + .as_ref() + .map(|ordering| { + ordering + .iter() + .map(|e| e.to_string()) + .collect::>() + .join(", ") + }) + .unwrap_or_else(|| "None".to_string()); + f.debug_struct("ScanOrdering") + .field("preferred_ordering", &ordering_display) + .finish_non_exhaustive() + } +} + /// Produces rows from a table provider by reference or from the context #[derive(Clone)] pub struct TableScan { @@ -2552,8 +2589,8 @@ pub struct TableScan { pub filters: Vec, /// Optional number of rows to read pub fetch: Option, - /// Optional preferred ordering for the scan - pub preferred_ordering: Option>, + /// Ordering for the scan + pub ordering: Option, } impl Debug for TableScan { @@ -2565,7 +2602,7 @@ impl Debug for TableScan { .field("projected_schema", &self.projected_schema) .field("filters", &self.filters) .field("fetch", &self.fetch) - .field("preferred_ordering", &self.preferred_ordering) + .field("ordering", &self.ordering) .finish_non_exhaustive() } } @@ -2577,7 +2614,7 @@ impl PartialEq for TableScan { && self.projected_schema == other.projected_schema && self.filters == other.filters && self.fetch == other.fetch - && self.preferred_ordering == other.preferred_ordering + && self.ordering == other.ordering } } @@ -2598,21 +2635,21 @@ impl PartialOrd for TableScan { /// Optional number of rows to read pub fetch: &'a Option, /// Optional preferred ordering for the scan - pub preferred_ordering: &'a Option>, + pub ordering: &'a Option, } let comparable_self = ComparableTableScan { table_name: &self.table_name, projection: &self.projection, filters: &self.filters, fetch: &self.fetch, - preferred_ordering: &self.preferred_ordering, + ordering: &self.ordering, }; let comparable_other = ComparableTableScan { table_name: &other.table_name, projection: &other.projection, filters: &other.filters, fetch: &other.fetch, - preferred_ordering: &other.preferred_ordering, + ordering: &other.ordering, }; comparable_self.partial_cmp(&comparable_other) } @@ -2625,7 +2662,7 @@ impl Hash for TableScan { self.projected_schema.hash(state); self.filters.hash(state); self.fetch.hash(state); - self.preferred_ordering.hash(state); + self.ordering.hash(state); } } @@ -2679,7 +2716,7 @@ impl TableScan { projected_schema, filters, fetch, - preferred_ordering: None, + ordering: None, }) } @@ -2734,11 +2771,8 @@ impl TableScan { /// This is purely an optimization hint. The table provider may choose to ignore /// the preferred ordering if it cannot be efficiently satisfied, and the query /// execution engine should not rely on the data being returned in this order. - pub fn with_preferred_ordering( - mut self, - preferred_ordering: Option>, - ) -> Self { - self.preferred_ordering = preferred_ordering; + pub fn with_ordering(mut self, ordering: ScanOrdering) -> Self { + self.ordering = Some(ordering); self } } @@ -4965,7 +4999,7 @@ mod tests { projected_schema: Arc::clone(&schema), filters: vec![], fetch: None, - preferred_ordering: None, + ordering: None, })); let col = schema.field_names()[0].clone(); @@ -4996,7 +5030,7 @@ mod tests { projected_schema: Arc::clone(&unique_schema), filters: vec![], fetch: None, - preferred_ordering: None, + ordering: None, })); let col = schema.field_names()[0].clone(); diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index 37244ebf9437..dfc216db403f 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -599,7 +599,7 @@ impl LogicalPlan { projected_schema, filters, fetch, - preferred_ordering, + ordering, }) => filters.map_elements(f)?.update_data(|filters| { LogicalPlan::TableScan(TableScan { table_name, @@ -608,7 +608,7 @@ impl LogicalPlan { projected_schema, filters, fetch, - preferred_ordering, + ordering, }) }), LogicalPlan::Distinct(Distinct::On(DistinctOn { diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index d99543993f65..6b038d897498 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -242,7 +242,7 @@ fn optimize_projections( filters, fetch, projected_schema: _, - preferred_ordering, + ordering, } = table_scan; // Get indices referred to in the original (schema with all fields) @@ -258,7 +258,10 @@ fn optimize_projections( filters, fetch, ) - .map(|s| s.with_preferred_ordering(preferred_ordering)) + .map(|s| match ordering { + Some(ordering) => s.with_ordering(ordering), + None => s, + }) .map(LogicalPlan::TableScan) .map(Transformed::yes); } diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 50e7d00b7788..f4230154544a 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -3055,7 +3055,7 @@ mod tests { projection, source: Arc::new(test_provider), fetch: None, - preferred_ordering: None, + ordering: None, }); Ok(LogicalPlanBuilder::from(table_scan)) diff --git a/datafusion/optimizer/src/push_down_sort.rs b/datafusion/optimizer/src/push_down_sort.rs index 0d1c08f537f1..1916e21e2e72 100644 --- a/datafusion/optimizer/src/push_down_sort.rs +++ b/datafusion/optimizer/src/push_down_sort.rs @@ -25,8 +25,8 @@ use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::tree_node::Transformed; use datafusion_common::Result; -use datafusion_expr::logical_plan::{LogicalPlan, TableScan}; -use datafusion_expr::{Expr, SortExpr}; +use datafusion_expr::logical_plan::LogicalPlan; +use datafusion_expr::{Expr, ScanOrdering, SortExpr}; /// Optimization rule that pushes sort expressions down to table scans /// when the sort can potentially be optimized by the table provider. @@ -123,22 +123,10 @@ impl OptimizerRule for PushDownSort { return Ok(Transformed::no(plan)); } - // If the table scan already has preferred ordering, don't overwrite it - // This preserves any existing sort preferences from other optimizations - if table_scan.preferred_ordering.is_some() { - return Ok(Transformed::no(plan)); - } - // Create new TableScan with preferred ordering - let new_table_scan = TableScan { - table_name: table_scan.table_name.clone(), - source: Arc::clone(&table_scan.source), - projection: table_scan.projection.clone(), - projected_schema: Arc::clone(&table_scan.projected_schema), - filters: table_scan.filters.clone(), - fetch: table_scan.fetch, - preferred_ordering: Some(sort.expr.clone()), - }; + let new_table_scan = table_scan.clone().with_ordering( + ScanOrdering::default().with_preferred_ordering(sort.expr.clone()), + ); // Preserve the Sort node as a fallback while passing the ordering // preference to the TableScan as an optimization hint @@ -275,7 +263,7 @@ mod tests { LogicalPlan::Sort(sort) => { // Check that TableScan has preferred_ordering if let LogicalPlan::TableScan(ts) = sort.input.as_ref() { - assert!(ts.preferred_ordering.is_some()); + assert!(ts.ordering.is_some()); } else { panic!("Expected TableScan input"); } diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 0d6f495d392f..960a3a65c3a2 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -271,7 +271,7 @@ fn from_table_source( projected_schema, filters: vec![], fetch: None, - preferred_ordering: None, + ordering: None, }); LogicalPlanNode::try_from_logical_plan(&r, extension_codec)