-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Allow struct field access projections to be pushed down into scans #19538
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
85890ee to
aab1d3a
Compare
| // Check whether `expr` is trivial; i.e. it doesn't imply any computation. | ||
| fn is_expr_trivial(expr: &Expr) -> bool { | ||
| matches!(expr, Expr::Column(_) | Expr::Literal(_, _)) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As evidenced by the existing functions for both logical and physical expressions this was already a concept and implementation within the codebase, so all this PR is really doing is allowing arbitrary functions / expressions to declare themselves as trivial.
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @adriangb -- I think this PR is really nice
I don't think we should push push constants all the way down in the scan -- doing so will require passing (and filtering) Arrays of constant values through the plan. This will be expensive
Or put another way I don't think Literal is a trival physical as we have defined it. Maybe we can add special literal handling for the ordering optimziations
The only thing I think this PR needs is some more slt / explain tests that show it working (I added comments below). I think it would be ok to ad these tests as a follow on PR too
I think your formalization of the is_trivial API is a beautiful way to encapsulate the existing concept that was in the code
datafusion/expr/src/udf.rs
Outdated
| None | ||
| } | ||
|
|
||
| /// Returns true if this function is trivial (cheap to evaluate). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest that a good rule of thumb here is that the function takes constant time per RecordBatch (aka it doesn't depend on the number of rows in the batch). Struct field access and column have this property but other functions don't
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lwwmanning and the Vortex team had a suggestion here that I like:
pub trait PhysicalExpr {
fn cost(&self) -> ExpressionCost;
...
}
#[non_exhaustive]
enum ExpressionCost {
/// Does not depend on the size or amount of data.
/// Examples of this are metadata only operations:
/// - `get_field`: extracts a field from a struct array, a cheap array clone
/// - `arrow_typeof`: gets the type of an expression
/// - `count()` (without distinct): gets the number of rows (a metadata only check)
Constant,
/// Depends on the size of the data in some way.
/// Examples of this are:
/// - Literal values (which get expanded / broadcast to the size of the data)
/// - Operations applied to each row of the input
RowOriented,
}The idea being that in the future we could do something like add a Cost(f64) variant (not proposing this specifically) or otherwise expand / alter this with less breaking changes.
| self.doc() | ||
| } | ||
|
|
||
| fn is_trivial(&self) -> bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend some comments explaining the rationale -- namely to allow these accesses to be pushed down into scans
| /// - Nested combinations of field accessors (e.g., `col['a']['b']`) | ||
| /// | ||
| /// This is used to identify expressions that are cheap to duplicate or | ||
| /// don't benefit from caching/partitioning optimizations. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe also include that they will be pushed below filters so if they do per-row work, setting is_trivial may slow things down
| // If expressions are all trivial (columns, literals, or field accessors), | ||
| // then all computations in this projection are reorder or rename, | ||
| // and projection would not benefit from the repartition. | ||
| vec![!self.projection_expr().is_trivial()] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a very nice simplification and a good illustration of the power of the is_trivial API
| DataSourceExec: file_groups={1 group: [[TMP_DIR/hierarchy.parquet]]}, projection=[id], file_type=parquet, predicate=id@0 = 1, pruning_predicate=id_null_count@2 != row_count@3 AND id_min@0 <= 1 AND 1 <= id_max@1, required_guarantees=[id in (1)] | ||
| FilterExec: id@0 = level@1 | ||
| RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1 | ||
| DataSourceExec: file_groups={1 group: [[TMP_DIR/hierarchy.parquet]]}, projection=[id, 1 as level], file_type=parquet, predicate=id@0 = 1, pruning_predicate=id_null_count@2 != row_count@3 AND id_min@0 <= 1 AND 1 <= id_max@1, required_guarantees=[id in (1)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is somewhat interesting that it materializes the constant in the scan. This is probably ok, but it does mean that constant may now get carried as a constant record batch up through the plan many 🤔
74a35d9 to
3b269cf
Compare
| 01)ProjectionExec: expr=[foo@0 as foo] | ||
| 02)--SortPreservingMergeExec: [part_key@1 ASC NULLS LAST], fetch=1 | ||
| 03)----SortExec: TopK(fetch=1), expr=[part_key@1 ASC NULLS LAST], preserve_partitioning=[true] | ||
| 04)------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/test_limit_with_partitions/part-0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/test_limit_with_partitions/part-1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/test_limit_with_partitions/part-2.parquet]]}, projection=[1 as foo, part_key], file_type=parquet, predicate=DynamicFilter [ empty ] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb one issue I've run into is that it's hard to encode the logic of "don't push down literals if they're on their own or with just columns, but do push them down if they're arguments to a scalar function that will not create a RecordBatch from them" hard. I.e. select 1 as foo, get_field(struct, 'another literal') ....
Put another way: we want a scalar function to be able to look at it's arguments to decide if the whole expressions is trivial or not. A literal alone is not trivial. A literal as an argument to get_field is trivial. A call to get_field with two columnar arguments is not trivial (e.g. if that indicated getting a different key for each row based on the second argument).
I think the right thing to do here is to let get_field handle the "recursion". I.e. instead of our current logic of:
// In ScalarFunctionExpr
fn is_trivial(&self) -> bool {
if !self.fun.is_trivial() {
return false;
}
self.args.iter().all(|arg| arg.is_trivial())
}Into:
fn is_trivial(&self) -> bool {
if !self.fun.is_trivial(&self.args) {
return false;
}
}Or something like that. Realistically only the function knows if what it's going to do with the arguments is efficient or not.
But there's two issues with this:
- We need to methods on
ScalarFunctionUDFImpl, one for logical layer and one for physical (is_trivial_logical(args: &[Expr])andis_trivial_physical(args: &[Arc<dyn PhysicalExpr>])or something like that. ScalarFunctionUDFImplcan't even referencePhysicalExprbecause of crate dependency cycles 😢
Any thoughts?
|
I am sorry for not reviewing this soonr -- I hvae been caught up with the DataFusion 52 release stuff. I hope to get to it shortly |
f138ea1 to
a68ddbb
Compare
|
@alamb apologies I haven't gotten this in as good of a state as I wanted. I had some ideas that I couldn't get across the line to due to responding to some near miss prod incidents at work. |
a932c74 to
3a8acbe
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR generalizes the notion of “trivial” expressions (e.g., columns and get_field) so projection pushdown can treat struct field access like plain columns and push it into scans, including across more physical operators.
Changes:
- Introduces
ArgTrivialityand plumbs “expression triviality” through logical/physical expressions and scalar UDFs. - Updates physical projection pushdown and swapping rules (Sort/Filter/Repartition/SortPreservingMerge/etc.) to leverage triviality and enable deeper pushdown of
get_field. - Adds extensive sqllogictest + Rust test coverage for projection pushdown behavior (notably for
get_field) and updates expected plans.
Reviewed changes
Copilot reviewed 28 out of 31 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| datafusion/sqllogictest/test_files/unnest.slt | Updates expected physical plan ordering after new pushdown behavior. |
| datafusion/sqllogictest/test_files/projection_pushdown.slt | New SLT suite covering get_field pushdown across operators and partitioning. |
| datafusion/sqllogictest/test_files/limit_single_row_batches.slt | Minor expected output formatting/line numbering adjustment. |
| datafusion/sqllogictest/test_files/limit.slt | Minor expected output formatting/line numbering adjustment. |
| datafusion/physical-plan/src/sorts/sort_preserving_merge.rs | Alters projection swapping rules for SPM, adds trivial/narrow guard for fetch case. |
| datafusion/physical-plan/src/sorts/sort.rs | Uses trivial/narrow guard for projection swapping; adds unit tests for TopK/Sort swap behavior. |
| datafusion/physical-plan/src/repartition/mod.rs | Allows more projections to swap with RepartitionExec (no longer limited to column-only). |
| datafusion/physical-plan/src/projection.rs | Adds is_trivial_or_narrows_schema; updates “benefits from partitioning” logic to use triviality. |
| datafusion/physical-plan/src/filter.rs | Allows pushing “trivial or narrowing” projections through FilterExec. |
| datafusion/physical-plan/src/coalesce_partitions.rs | Removes schema-narrowing guard when swapping projections with CoalescePartitionsExec. |
| datafusion/physical-optimizer/src/projection_pushdown.rs | Adds projection splitting/extraction logic to push trivial subexpressions (e.g. get_field) down. |
| datafusion/physical-optimizer/src/output_requirements.rs | Adjusts projection swapping logic to rely on ordering updates without schema-narrowing precheck. |
| datafusion/physical-optimizer/Cargo.toml | Adds indexmap dependency for new extraction bookkeeping. |
| datafusion/physical-expr/src/scalar_function.rs | Implements PhysicalExpr::triviality() for scalar function expressions via UDF arg triviality. |
| datafusion/physical-expr/src/projection.rs | Enhances update_expr rewriting (two-pass rewrite + collision regression tests). |
| datafusion/physical-expr/src/expressions/literal.rs | Marks literals as ArgTriviality::Literal. |
| datafusion/physical-expr/src/expressions/column.rs | Marks columns as ArgTriviality::Column. |
| datafusion/physical-expr-common/src/physical_expr.rs | Adds PhysicalExpr::triviality() API with default NonTrivial. |
| datafusion/optimizer/src/optimize_projections/mod.rs | Uses expression triviality (vs hardcoded Column/Literal) when merging projections. |
| datafusion/functions/src/core/getfield.rs | Defines get_field UDF triviality based on argument triviality; adds tests. |
| datafusion/expr/src/udf.rs | Adds triviality API to ScalarUDFImpl and plumbing through ScalarUDF. |
| datafusion/expr/src/lib.rs | Re-exports ArgTriviality. |
| datafusion/expr/src/expr.rs | Adds Expr::triviality() for logical expressions. |
| datafusion/expr-common/src/triviality.rs | New ArgTriviality enum definition. |
| datafusion/expr-common/src/lib.rs | Exposes triviality module and re-export. |
| datafusion/core/tests/physical_optimizer/pushdown_utils.rs | Updates test datasource to support expression projections (ProjectionExprs) and pushdown merging. |
| datafusion/core/tests/physical_optimizer/projection_pushdown.rs | Adds regression test ensuring get_field projection can push down through Repartition+Filter. |
| datafusion/core/tests/physical_optimizer/mod.rs | Reorganizes shared optimizer test utilities module inclusion. |
| datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs | Switches filter-pushdown tests to shared utilities. |
| datafusion/common/src/column.rs | Comment capitalization tweak. |
| Cargo.lock | Adds indexmap entry for physical-optimizer. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
adriangb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Self review: some things to double check or consider clarifying.
Note for @alamb: diff is large but a lot of it is new tests and refactors to existing testing infrastructure (TestScan, etc.). I'm sorry about the large diff but I hope this is still reviewable.
| #[derive(Debug, Clone, Copy, PartialEq, Eq)] | ||
| pub enum ArgTriviality { | ||
| /// Argument is a literal constant value or an expression that can be | ||
| /// evaluated to a constant at planning time. | ||
| Literal, | ||
| /// Argument is a simple column reference. | ||
| Column, | ||
| /// Argument is a complex expression that declares itself trivial. | ||
| /// For example, if `get_field(struct_col, 'field_name')` is implemented as a | ||
| /// trivial expression, then it would return this variant. | ||
| /// Then `other_trivial_function(get_field(...), 42)` could also be classified as | ||
| /// a trivial expression using the knowledge that `get_field(...)` is trivial. | ||
| TrivialExpr, | ||
| /// Argument is a complex expression that declares itself non-trivial. | ||
| /// For example, `min(col1 + col2)` is non-trivial because it requires per-row computation. | ||
| NonTrivial, | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the main thing we need to figure out is if these are the right terms / wording for this system.
I don't love "trivial" but it does work and it is the existing terminology.
I don't like it for 2 reasons:
- It's confusing. Claude constantly got confused thinking that a column or literal expression is trivial.
- It's a non specific term. It doesn't imply something concrete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should lean into the properties / what will be done with this classification
Maybe we could highlight that the point of this pushdown is to push expressions that can be efficiently evaluated in table scans? If so, perhaps something like "PushToScan" could make the usecase the most clear?
If we are only planning to push down column extraction, we could use a term like "FieldAccess" or "SubfieldAccess" (though what about Casts?)
| // Track Arc pointers of columns created by pass 1. | ||
| // These should not be modified by pass 2. | ||
| // We use Arc pointer addresses (not name/index) to distinguish pass-1-created columns | ||
| // from original columns that happen to have the same name and index. | ||
| let mut pass1_created: HashSet<usize> = HashSet::new(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes sense but I do want to review in a bit more detail to see if there's other alternative ways of doing it, using pointer tracking is always a bit funky.
| #[derive(Debug, PartialEq)] | ||
| #[derive(PartialEq)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's try to revert this (keep Debug implemented)
| self.fun.signature().volatility == Volatility::Volatile | ||
| } | ||
|
|
||
| fn triviality(&self) -> ArgTriviality { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logical and physical representations of a function go down the same path by abstracting the arguments behind ArgTriviality
| // If the projection does not narrow the schema, we should not try to push it down: | ||
| let proj_exprs = projection.expr(); | ||
| if proj_exprs.len() >= projection.input().schema().fields().len() { | ||
| return Ok(None); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This same code was sort of sprinkled around various expressions. I'm not sure how deeply each one was evaluated for what behavior it should have. It seems to be that OutputRequirementExec is a temporary marker used by the physical optimizer and should not have any bearing in where projections are placed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to be that OutputRequirementExec is a temporary marker used by the physical optimizer and should not have any bearing in where projections are place
Yes, I think that is correct. I think OutputRequirementExec is always supposed to be the root node of a query plan and is used to communicate what the output requirements are (for example for INSERT AS SELECT(...))` the desired distribution for insert is communicated by the OutputRequirementExec
| /// Tries to split a projection to extract beneficial sub-expressions for pushdown. | ||
| /// | ||
| /// This function walks each expression in the projection and extracts beneficial | ||
| /// sub-expressions (like `get_field`) from within larger non-beneficial expressions. | ||
| /// For example: | ||
| /// - Input: `get_field(col, 'foo') + 1` | ||
| /// - Output: Inner projection: `get_field(col, 'foo') AS __extracted_0`, Outer: `__extracted_0 + 1` | ||
| /// | ||
| /// This enables the beneficial parts to be pushed down while keeping non-beneficial | ||
| /// expressions (like literals and computations) above. | ||
| fn try_split_projection( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a meaty bit worth reviewing and considering
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I couldn't help but think this is very similar to common sub expression elimination rewrite (but the conditions of what sub expression are rewritten are different)
Maybe you could make this more discoverable by abstracting it away into a structure like "SubExpressionRewriter" that does the mechanics of rewriting expressions, and is parameterized by some function that decides to do the rewrite or not
it might also be worth adding a diagram showing the plan nodes that are created as part of this rewrite)
(projection get_field(col, 'foo') + 1)
(rest of plan)
To
(projection __extracted_0 + 1)
(rest of plan, produces __extracted_0)
or something 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I would love to do that... but this is already quite complex on it's own. Given that these are internal refactors, maybe we can implement independently and then in a followup try to unite the efforts?
| if projection.benefits_from_input_partitioning()[0] | ||
| || !all_columns(projection.expr()) | ||
| { | ||
| if projection.benefits_from_input_partitioning()[0] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this should also check is_trivial_or_narrows_schema?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like the idea was that only reductions in columns would be beneficial to push down -- because otherwise you might push an expensive computation into a single partition part of the plan (e.g. a scan from a single file) rather than allowing it to be done in parallel
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would make sense (and be consistent with the rest of the changes in this PR)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alas I tried to unify with the other checks and it produces a lot of slt changes. I reviewed some of them and I think the new plans were better, but even if we want to make that change I'd suggest we do it in a followup so we can review that independently. For this PR I'd like to keep the SLT changes to a minimum.
| .all(|p| !matches!(p.expr.triviality(), ArgTriviality::NonTrivial)), | ||
| ] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only called from RepartitionExec as far as I can tell. We should consider this implementation together with https://github.com/apache/datafusion/pull/19538/changes#r2717491453
| // Push if projection narrows schema (drops columns) - saves memory | ||
| let narrows_schema = exprs.len() < projection.input().schema().fields().len(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this also add the condition that all expressions are columns or trivial expressions?
| // Don't merge if: | ||
| // 1. A non-trivial expression is referenced more than once (caching benefit) | ||
| // See discussion in: https://github.com/apache/datafusion/issues/8296 | ||
| // 2. The child projection has TrivialExpr (like get_field) that should be pushed | ||
| // down to the data source separately | ||
| for (column, count) in column_ref_map.iter() { | ||
| let triviality = child.expr()[column.index()].expr.triviality(); | ||
| // Don't merge if multi-referenced non-trivial (caching) | ||
| if (*count > 1 && matches!(triviality, ArgTriviality::NonTrivial)) | ||
| // Don't merge if child has TrivialExpr (should push to source) | ||
| || matches!(triviality, ArgTriviality::TrivialExpr) | ||
| { | ||
| return Ok(None); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is awkward interaction between optimizer rules. Need to evaluate nad see if there's a better way to do this.
|
starting to review this in detail |
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @adriangb
Test Questions
I didn't have time to go through all the .slt tests, but I had several initial questions (specifically it looks to me like too many expressions may be being pushed into the scans).
API commentary
I feel like trying to get one API to express the multiple different usecases may be making this more akward than necessary.
Since there seem to be multiple distinct uses for these APIs, what do you think about making multple APIs like
impl Expr {
/// Should DataFusion try to push this expression into scans
/// (set to true if TableProviders can more efficiently evaluate this Expr)
fn push_to_scans(&self) -> bool { ..}
/// Return true if this expression doesn't require any computation
fn requires_computation(&self) -> bool { ... }
}And then add the corresponding methods to ScalarFunctionImpl / PhysicalExpr (with the args passed to the expression )
Breaking it up
I also wonder if there is some way we can break this PR up into smaller pieces for easier review. Some thoughts:
- Pull the expression rewriting / refactoring structure into its own PR (with its own API, etc)
- Pull the .slt tests into their own PR (so we can see the effect of repartitioning on the before / after of the tests)
| ); | ||
| let batches = vec![ | ||
| RecordBatch::try_new( | ||
| Arc::new(Schema::new(vec![Field::new( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think one thing that would make the PR easier to review would be to reduce some duplication -- for example, the creation of the schema is created twice here, so it takes some cognative load to figure out what, if anything, is different between the two schemas (nothing I don't think)
Maybe defining the schema above like
let struct_fields = Fields::from(vec![Field::new("a", DataType::Int32, false)]);
let schema = Arc::new(Schema::new(vec![Field::new(
"struct",
DataType::Struct(struct_fields),
true,
)]));Would make it eaiser to follow
| .with_support(true) | ||
| .with_batches(batches) | ||
| .build(); | ||
| let scan_schema = scan.schema(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is also the same schema, right?
| ) | ||
| .unwrap(), | ||
| ]; | ||
| let build_side_schema = Arc::new(Schema::new(vec![Field::new( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the meaning of "build side" ? I don't think this test has a join in it
|
|
||
| // Projection should be pushed all the way down to the DataSource, and | ||
| // filter predicate should be rewritten to reference projection's output column | ||
| assert_snapshot!( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not for this PR, but I noticed there is a lot of duplication of this "verify and optimize" step with the other tests in this PR (though this new test is consistent)
Maybe we can simplify them like
datafusion/datafusion/core/tests/physical_optimizer/limit_pushdown.rs
Lines 186 to 208 in e8196f4
| let initial = get_plan_string(&global_limit); | |
| let expected_initial = [ | |
| "GlobalLimitExec: skip=0, fetch=5", | |
| " SortPreservingMergeExec: [c1@0 ASC]", | |
| " SortExec: expr=[c1@0 ASC], preserve_partitioning=[false]", | |
| " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", | |
| " ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]", | |
| " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true", | |
| ]; | |
| assert_eq!(initial, expected_initial); | |
| let after_optimize = | |
| LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?; | |
| let expected = [ | |
| "SortPreservingMergeExec: [c1@0 ASC], fetch=5", | |
| " SortExec: TopK(fetch=5), expr=[c1@0 ASC], preserve_partitioning=[false]", | |
| " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", | |
| " ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]", | |
| " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true", | |
| ]; | |
| assert_eq!(get_plan_string(&after_optimize), expected); |
| /// | ||
| /// This enum is used by [`ScalarUDFImpl::triviality`] to allow | ||
| /// functions to make context-dependent decisions about whether they are | ||
| /// trivial based on the nature of their arguments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can also mention "trivial expressions" are pushed down by the optimizer and may be evaluated more times than the original expression.
For column references or field access, this is is not slower, but for other expressions it may be
| @@ -0,0 +1,1001 @@ | |||
| # Licensed to the Apache Software Foundation (ASF) under one | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this file seems to duplicate datafusion/sqllogictest/test_files/parquet_pushdown.slt right? They both test pushdown in parquet with only a single target_partition... Why have both?
I had started to comment about multi partition test coverage in parquet_pushdown but then I saw this file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll look into unifying.
| ### | ||
|
|
||
| query TT | ||
| EXPLAIN SELECT id, s['value'] FROM multi_struct ORDER BY id; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you please also add a literal expression as well (like 'Foo' for example) to show it does not get pushed into the scan?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's worth clarifying: pushing literals into a scan is not a problem (maybe a good thing). The issue is pushing through FilterExec, RepartitionExec, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is a problem if repartitionexec expands out each constant, for example 🤔 We would have to check, but I suspect right now a lot more work happens if we materialize a constant low down in the plan and then carry it up through the rest of the operators
| 5 251 | ||
|
|
||
| ### | ||
| # Test 2.5: s['label'] || '_suffix' - pushed (directly above scan) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks to me like the optimizer has pushed down both the field access and the || operator
I would have expected that it only pushed the get_field down 🤔
| 01)AggregateExec: mode=FinalPartitioned, gby=[multi_struct.s[label]@0 as multi_struct.s[label]], aggr=[sum(multi_struct.s[value])] | ||
| 02)--RepartitionExec: partitioning=Hash([multi_struct.s[label]@0], 4), input_partitions=3 | ||
| 03)----AggregateExec: mode=Partial, gby=[get_field(s@0, label) as multi_struct.s[label]], aggr=[sum(multi_struct.s[value])] | ||
| 04)------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part1.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part3.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part4.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/multi/part5.parquet]]}, projection=[s], file_type=parquet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are s['label'] and s['value'] NOT pushed into the scan here? it seems like they could be?
| 03)----ProjectionExec: expr=[get_field(__unnest_placeholder(recursive_unnest_table.column3,depth=1)@0, c1) as __unnest_placeholder(UNNEST(recursive_unnest_table.column3)[c1]), column3@1 as column3] | ||
| 04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | ||
| 03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | ||
| 04)------ProjectionExec: expr=[get_field(__unnest_placeholder(recursive_unnest_table.column3,depth=1)@0, c1) as __unnest_placeholder(UNNEST(recursive_unnest_table.column3)[c1]), column3@1 as column3] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems like a good change -- the fields are extracted prior to RepartitionExec
Allow TrivialExpr as a valid base for get_field triviality check, enabling nested field access like get_field(get_field(col, 'a'), 'b') to be considered trivial. Literal base is explicitly not considered trivial since it would be constant-folded anyway. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Fix typo "expressions" -> "expression" in triviality.rs - Add missing period in coalesce_partitions.rs comment - Fix typo "expressions" -> "expression" in projection_pushdown.rs - Clarify comment in projection.rs about beneficial expressions Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
When SortExec or SortPreservingMergeExec has a fetch (TopK behavior), they act as filters reducing rows. Pushing non-trivial projections (like literals) through them causes the expression to be evaluated on all input rows instead of just the filtered output rows. Added is_trivial_or_narrows_schema check for sort operators with fetch to prevent pushing literals and computations below TopK operators. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add comprehensive sqllogictest coverage for projection pushdown optimization with struct field access (get_field) expressions. Test coverage includes: - Basic get_field pushdown into DataSourceExec - Nested struct access (s['outer']['inner']) - Projection through Filter, Sort, and TopK operators - Multi-partition scenarios with SortPreservingMergeExec - Edge cases: nullable structs, common subexpressions, literals Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Remove "literals" from filter.rs comment (not treated as trivial) - Apply is_trivial_or_narrows_schema check in SortPreservingMerge regardless of fetch - Fix test comments that incorrectly said "NOT pushed" Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
The function was too permissive - it returned true for pure column projections even when they didn't narrow the schema. This caused projections to be pushed through FilterExec when there was no benefit. Now the function only returns true when pushing is beneficial: - Schema narrows (drops columns) - saves memory - Has TrivialExpr (like get_field) - beneficial computation pushdown Pure Column refs that don't narrow the schema are no longer pushed, as they just rearrange the plan without any gain. Also refactored the sort test to verify both behaviors: - Column-narrowing projections ARE pushed through Sort - Non-narrowing column projections are NOT pushed through Sort Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Apply the same projection pushdown guard to CoalescePartitionsExec that Sort, SPM, and Filter use. This ensures consistent behavior: projections are only pushed through when beneficial (schema narrowing or TrivialExpr). Without this check, projections with computed columns (like CAST) were being pushed through CoalescePartitionsExec unnecessarily, causing TPCH query plan changes. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Fix misleading comment in parquet_pushdown.slt: literals are absorbed into DataSourceExec (acceptable when projection is directly above scan) - Add performance note to triviality() docs: TrivialExpr may be pushed below filters, so per-row work could slow things down Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
f22b996 to
693e425
Compare
The idea is to elevate the existing concept of a "trivial" expression from hardcoded in various places around the codebase to dynamically definable by expressions / functions. By default columns, literals and field accessor functions are considered trivial, but users can define their own (e.g. for variant).
This helps #19387 because then field accessors are treated the same as columns and thus get pushed down through projections, with the caveat of #19550.
My plan is to get as far as I can hear, at least getting the physical optimizer side of things and in particular nodes that exist only in the physical layer like RepartitionExec to behave well.
I think we'll need a somewhat complex logical optimizer rule (a mix of filter pushdown and CSE) to solve #19550. I'll do that as a follow up.