Skip to content

Commit 39664ae

Browse files
committed
address some feedback
1 parent 3353b06 commit 39664ae

File tree

7 files changed

+68
-42
lines changed

7 files changed

+68
-42
lines changed

datafusion/expr/src/logical_plan/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,9 @@ pub use plan::{
4141
projection_schema, Aggregate, Analyze, ColumnUnnestList, DescribeTable, Distinct,
4242
DistinctOn, EmptyRelation, Explain, ExplainFormat, ExplainOption, Extension,
4343
FetchType, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning,
44-
PlanType, Projection, RecursiveQuery, Repartition, SkipType, Sort, StringifiedPlan,
45-
Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window,
44+
PlanType, Projection, RecursiveQuery, Repartition, ScanOrdering, SkipType, Sort,
45+
StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union,
46+
Unnest, Values, Window,
4647
};
4748
pub use statement::{
4849
Deallocate, Execute, Prepare, SetVariable, Statement, TransactionAccessMode,

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 50 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2537,6 +2537,43 @@ impl PartialOrd for Window {
25372537
}
25382538
}
25392539

2540+
#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Default)]
2541+
pub struct ScanOrdering {
2542+
/// Optional preferred ordering for the scan that matches the output order of upstream query nodes.
2543+
/// It is optional / best effort for the scan to produce this ordering.
2544+
/// If the scan produces this exact ordering and sets it's properties to reflect this upstream sorts may be optimized away.
2545+
/// Otherwise the sorts may remain in place but partial ordering may be exploited e.g. to do early stopping or reduce complexity of the sort.
2546+
/// Thus it is recommended for the scan to also do a best effort to produce partially sorted data if possible.
2547+
pub preferred_ordering: Option<Vec<SortExpr>>,
2548+
}
2549+
2550+
impl ScanOrdering {
2551+
/// Create a new ScanOrdering
2552+
pub fn with_preferred_ordering(mut self, preferred_ordering: Vec<SortExpr>) -> Self {
2553+
self.preferred_ordering = Some(preferred_ordering);
2554+
self
2555+
}
2556+
}
2557+
2558+
impl Debug for ScanOrdering {
2559+
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2560+
let ordering_display = self
2561+
.preferred_ordering
2562+
.as_ref()
2563+
.map(|ordering| {
2564+
ordering
2565+
.iter()
2566+
.map(|e| e.to_string())
2567+
.collect::<Vec<String>>()
2568+
.join(", ")
2569+
})
2570+
.unwrap_or_else(|| "None".to_string());
2571+
f.debug_struct("ScanOrdering")
2572+
.field("preferred_ordering", &ordering_display)
2573+
.finish_non_exhaustive()
2574+
}
2575+
}
2576+
25402577
/// Produces rows from a table provider by reference or from the context
25412578
#[derive(Clone)]
25422579
pub struct TableScan {
@@ -2552,8 +2589,8 @@ pub struct TableScan {
25522589
pub filters: Vec<Expr>,
25532590
/// Optional number of rows to read
25542591
pub fetch: Option<usize>,
2555-
/// Optional preferred ordering for the scan
2556-
pub preferred_ordering: Option<Vec<SortExpr>>,
2592+
/// Ordering for the scan
2593+
pub ordering: Option<ScanOrdering>,
25572594
}
25582595

25592596
impl Debug for TableScan {
@@ -2565,7 +2602,7 @@ impl Debug for TableScan {
25652602
.field("projected_schema", &self.projected_schema)
25662603
.field("filters", &self.filters)
25672604
.field("fetch", &self.fetch)
2568-
.field("preferred_ordering", &self.preferred_ordering)
2605+
.field("ordering", &self.ordering)
25692606
.finish_non_exhaustive()
25702607
}
25712608
}
@@ -2577,7 +2614,7 @@ impl PartialEq for TableScan {
25772614
&& self.projected_schema == other.projected_schema
25782615
&& self.filters == other.filters
25792616
&& self.fetch == other.fetch
2580-
&& self.preferred_ordering == other.preferred_ordering
2617+
&& self.ordering == other.ordering
25812618
}
25822619
}
25832620

@@ -2598,21 +2635,21 @@ impl PartialOrd for TableScan {
25982635
/// Optional number of rows to read
25992636
pub fetch: &'a Option<usize>,
26002637
/// Optional preferred ordering for the scan
2601-
pub preferred_ordering: &'a Option<Vec<SortExpr>>,
2638+
pub ordering: &'a Option<ScanOrdering>,
26022639
}
26032640
let comparable_self = ComparableTableScan {
26042641
table_name: &self.table_name,
26052642
projection: &self.projection,
26062643
filters: &self.filters,
26072644
fetch: &self.fetch,
2608-
preferred_ordering: &self.preferred_ordering,
2645+
ordering: &self.ordering,
26092646
};
26102647
let comparable_other = ComparableTableScan {
26112648
table_name: &other.table_name,
26122649
projection: &other.projection,
26132650
filters: &other.filters,
26142651
fetch: &other.fetch,
2615-
preferred_ordering: &other.preferred_ordering,
2652+
ordering: &other.ordering,
26162653
};
26172654
comparable_self.partial_cmp(&comparable_other)
26182655
}
@@ -2625,7 +2662,7 @@ impl Hash for TableScan {
26252662
self.projected_schema.hash(state);
26262663
self.filters.hash(state);
26272664
self.fetch.hash(state);
2628-
self.preferred_ordering.hash(state);
2665+
self.ordering.hash(state);
26292666
}
26302667
}
26312668

@@ -2679,7 +2716,7 @@ impl TableScan {
26792716
projected_schema,
26802717
filters,
26812718
fetch,
2682-
preferred_ordering: None,
2719+
ordering: None,
26832720
})
26842721
}
26852722

@@ -2734,11 +2771,8 @@ impl TableScan {
27342771
/// This is purely an optimization hint. The table provider may choose to ignore
27352772
/// the preferred ordering if it cannot be efficiently satisfied, and the query
27362773
/// execution engine should not rely on the data being returned in this order.
2737-
pub fn with_preferred_ordering(
2738-
mut self,
2739-
preferred_ordering: Option<Vec<SortExpr>>,
2740-
) -> Self {
2741-
self.preferred_ordering = preferred_ordering;
2774+
pub fn with_ordering(mut self, ordering: ScanOrdering) -> Self {
2775+
self.ordering = Some(ordering);
27422776
self
27432777
}
27442778
}
@@ -4965,7 +4999,7 @@ mod tests {
49654999
projected_schema: Arc::clone(&schema),
49665000
filters: vec![],
49675001
fetch: None,
4968-
preferred_ordering: None,
5002+
ordering: None,
49695003
}));
49705004
let col = schema.field_names()[0].clone();
49715005

@@ -4996,7 +5030,7 @@ mod tests {
49965030
projected_schema: Arc::clone(&unique_schema),
49975031
filters: vec![],
49985032
fetch: None,
4999-
preferred_ordering: None,
5033+
ordering: None,
50005034
}));
50015035
let col = schema.field_names()[0].clone();
50025036

datafusion/expr/src/logical_plan/tree_node.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -599,7 +599,7 @@ impl LogicalPlan {
599599
projected_schema,
600600
filters,
601601
fetch,
602-
preferred_ordering,
602+
ordering,
603603
}) => filters.map_elements(f)?.update_data(|filters| {
604604
LogicalPlan::TableScan(TableScan {
605605
table_name,
@@ -608,7 +608,7 @@ impl LogicalPlan {
608608
projected_schema,
609609
filters,
610610
fetch,
611-
preferred_ordering,
611+
ordering,
612612
})
613613
}),
614614
LogicalPlan::Distinct(Distinct::On(DistinctOn {

datafusion/optimizer/src/optimize_projections/mod.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ fn optimize_projections(
242242
filters,
243243
fetch,
244244
projected_schema: _,
245-
preferred_ordering,
245+
ordering,
246246
} = table_scan;
247247

248248
// Get indices referred to in the original (schema with all fields)
@@ -258,7 +258,10 @@ fn optimize_projections(
258258
filters,
259259
fetch,
260260
)
261-
.map(|s| s.with_preferred_ordering(preferred_ordering))
261+
.map(|s| match ordering {
262+
Some(ordering) => s.with_ordering(ordering),
263+
None => s,
264+
})
262265
.map(LogicalPlan::TableScan)
263266
.map(Transformed::yes);
264267
}

datafusion/optimizer/src/push_down_filter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3055,7 +3055,7 @@ mod tests {
30553055
projection,
30563056
source: Arc::new(test_provider),
30573057
fetch: None,
3058-
preferred_ordering: None,
3058+
ordering: None,
30593059
});
30603060

30613061
Ok(LogicalPlanBuilder::from(table_scan))

datafusion/optimizer/src/push_down_sort.rs

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ use crate::{OptimizerConfig, OptimizerRule};
2525

2626
use datafusion_common::tree_node::Transformed;
2727
use datafusion_common::Result;
28-
use datafusion_expr::logical_plan::{LogicalPlan, TableScan};
29-
use datafusion_expr::{Expr, SortExpr};
28+
use datafusion_expr::logical_plan::LogicalPlan;
29+
use datafusion_expr::{Expr, ScanOrdering, SortExpr};
3030

3131
/// Optimization rule that pushes sort expressions down to table scans
3232
/// when the sort can potentially be optimized by the table provider.
@@ -123,22 +123,10 @@ impl OptimizerRule for PushDownSort {
123123
return Ok(Transformed::no(plan));
124124
}
125125

126-
// If the table scan already has preferred ordering, don't overwrite it
127-
// This preserves any existing sort preferences from other optimizations
128-
if table_scan.preferred_ordering.is_some() {
129-
return Ok(Transformed::no(plan));
130-
}
131-
132126
// Create new TableScan with preferred ordering
133-
let new_table_scan = TableScan {
134-
table_name: table_scan.table_name.clone(),
135-
source: Arc::clone(&table_scan.source),
136-
projection: table_scan.projection.clone(),
137-
projected_schema: Arc::clone(&table_scan.projected_schema),
138-
filters: table_scan.filters.clone(),
139-
fetch: table_scan.fetch,
140-
preferred_ordering: Some(sort.expr.clone()),
141-
};
127+
let new_table_scan = table_scan.clone().with_ordering(
128+
ScanOrdering::default().with_preferred_ordering(sort.expr.clone()),
129+
);
142130

143131
// Preserve the Sort node as a fallback while passing the ordering
144132
// preference to the TableScan as an optimization hint
@@ -275,7 +263,7 @@ mod tests {
275263
LogicalPlan::Sort(sort) => {
276264
// Check that TableScan has preferred_ordering
277265
if let LogicalPlan::TableScan(ts) = sort.input.as_ref() {
278-
assert!(ts.preferred_ordering.is_some());
266+
assert!(ts.ordering.is_some());
279267
} else {
280268
panic!("Expected TableScan input");
281269
}

datafusion/proto/src/logical_plan/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ fn from_table_source(
271271
projected_schema,
272272
filters: vec![],
273273
fetch: None,
274-
preferred_ordering: None,
274+
ordering: None,
275275
});
276276

277277
LogicalPlanNode::try_from_logical_plan(&r, extension_codec)

0 commit comments

Comments
 (0)