Skip to content
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
4198db9
feat: wrong version which did weird stuff
discord9 Dec 19, 2025
3bf503a
feat: gather filter support alias
discord9 Dec 25, 2025
f4f9b19
feat: add support for detecting unknown columns in filter pushdown & …
discord9 Dec 25, 2025
df19fc2
feat: update projection alias handling and enhance PhysicalColumnRewr…
discord9 Dec 25, 2025
85a3c12
feat: update deeply nested expression helper function and enhance tes…
discord9 Dec 25, 2025
c60bf05
chore: clippy
discord9 Dec 25, 2025
5c51005
typo
discord9 Dec 25, 2025
31df516
feat: update test assertions for filter pushdown to reflect expected …
discord9 Dec 25, 2025
ae6a63c
c
discord9 Dec 25, 2025
085e2ba
c
discord9 Dec 25, 2025
e7e3a7a
clippy
discord9 Dec 26, 2025
8b3990f
test: update sqllogic test result
discord9 Dec 26, 2025
1dbd0a4
test: more complex dyn filter
discord9 Dec 26, 2025
392ecb9
c
discord9 Dec 26, 2025
49945bf
refactor: rename function have_unknown_columns to has_unknown_columns…
discord9 Jan 4, 2026
f42c2a3
test: topk with projection
discord9 Jan 5, 2026
f6489f1
test: slt test for projection dyn filter
discord9 Jan 5, 2026
4680cfa
chore
discord9 Jan 5, 2026
747f2b9
test: ignore time
discord9 Jan 5, 2026
98afab7
chore: fmt
discord9 Jan 5, 2026
fa2ac40
test: more slt test
discord9 Jan 5, 2026
5175fd0
test: fix
discord9 Jan 7, 2026
17adf3f
test: more ignore
discord9 Jan 7, 2026
64adaae
test: more ignore&proper sql
discord9 Jan 7, 2026
94b762b
feat: unmap column not pushdown
discord9 Jan 9, 2026
3fb15e4
clippy
discord9 Jan 9, 2026
2ed6fe0
chore
discord9 Jan 9, 2026
5a43014
test: add pushdown assert
discord9 Jan 9, 2026
1839cd1
refactor: ref column map
discord9 Jan 9, 2026
d3a9259
refactor: per review
discord9 Jan 20, 2026
cbb0ab5
clippy
discord9 Jan 20, 2026
a2cebc3
what
discord9 Jan 20, 2026
6e8588b
test: slt update
discord9 Jan 20, 2026
1cb3fbb
refactor: rename per review
discord9 Jan 22, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ env_logger = { workspace = true }
glob = { workspace = true }
insta = { workspace = true }
paste = { workspace = true }
pretty_assertions = "1.0"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This appears to already be used elsewhere (this is not a net new depednecy), so I think it is ok to add

rand = { workspace = true, features = ["small_rng"] }
rand_distr = "0.5"
recursive = { workspace = true }
Expand Down
229 changes: 229 additions & 0 deletions datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ use datafusion_physical_plan::{
coalesce_partitions::CoalescePartitionsExec,
collect,
filter::{FilterExec, FilterExecBuilder},
projection::ProjectionExec,
repartition::RepartitionExec,
sorts::sort::SortExec,
};
Expand Down Expand Up @@ -1826,6 +1827,234 @@ fn schema() -> SchemaRef {
Arc::clone(&TEST_SCHEMA)
}

struct ProjectionDynFilterTestCase {
schema: SchemaRef,
batches: Vec<RecordBatch>,
projection: Vec<(Arc<dyn PhysicalExpr>, String)>,
sort_expr: PhysicalSortExpr,
expected_plans: Vec<String>,
}

async fn run_projection_dyn_filter_case(case: ProjectionDynFilterTestCase) {
let ProjectionDynFilterTestCase {
schema,
batches,
projection,
sort_expr,
expected_plans,
} = case;

let scan = TestScanBuilder::new(Arc::clone(&schema))
.with_support(true)
.with_batches(batches)
.build();

let projection_exec = Arc::new(ProjectionExec::try_new(projection, scan).unwrap());

let sort = Arc::new(
SortExec::new(LexOrdering::new(vec![sort_expr]).unwrap(), projection_exec)
.with_fetch(Some(2)),
) as Arc<dyn ExecutionPlan>;

let mut config = ConfigOptions::default();
config.execution.parquet.pushdown_filters = true;
config.optimizer.enable_dynamic_filter_pushdown = true;

let optimized_plan = FilterPushdown::new_post_optimization()
.optimize(Arc::clone(&sort), &config)
.unwrap();

pretty_assertions::assert_eq!(
format_plan_for_test(&optimized_plan).trim(),
expected_plans[0].trim()
);

let config = SessionConfig::new().with_batch_size(2);
let session_ctx = SessionContext::new_with_config(config);
session_ctx.register_object_store(
ObjectStoreUrl::parse("test://").unwrap().as_ref(),
Arc::new(InMemory::new()),
);
let state = session_ctx.state();
let task_ctx = state.task_ctx();
let mut stream = optimized_plan.execute(0, Arc::clone(&task_ctx)).unwrap();
for (idx, expected_plan) in expected_plans.iter().enumerate().skip(1) {
stream.next().await.unwrap().unwrap();
let formatted_plan = format_plan_for_test(&optimized_plan);
pretty_assertions::assert_eq!(
formatted_plan.trim(),
expected_plan.trim(),
"Mismatch at iteration {}",
idx
);
}
}

#[tokio::test]
async fn test_topk_with_projection_transformation_on_dyn_filter() {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Utf8, false),
Field::new("c", DataType::Float64, false),
]));
let simple_abc = vec![
record_batch!(
("a", Int32, [1, 2, 3]),
("b", Utf8, ["x", "y", "z"]),
("c", Float64, [1.0, 2.0, 3.0])
)
.unwrap(),
];

// Case 1: Reordering [b, a]
run_projection_dyn_filter_case(ProjectionDynFilterTestCase {
schema: Arc::clone(&schema),
batches: simple_abc.clone(),
projection: vec![
(col("b", &schema).unwrap(), "b".to_string()),
(col("a", &schema).unwrap(), "a".to_string()),
],
sort_expr: PhysicalSortExpr::new(
Arc::new(Column::new("a", 1)),
SortOptions::default(),
),
expected_plans: vec![
r#" - SortExec: TopK(fetch=2), expr=[a@1 ASC], preserve_partitioning=[false]
- ProjectionExec: expr=[b@1 as b, a@0 as a]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]"#.to_string(),
r#" - SortExec: TopK(fetch=2), expr=[a@1 ASC], preserve_partitioning=[false], filter=[a@1 IS NULL OR a@1 < 2]
- ProjectionExec: expr=[b@1 as b, a@0 as a]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 IS NULL OR a@0 < 2 ]"#.to_string()]
})
.await;

// Case 2: Pruning [a]
run_projection_dyn_filter_case(ProjectionDynFilterTestCase {
schema: Arc::clone(&schema),
batches: simple_abc.clone(),
projection: vec![(col("a", &schema).unwrap(), "a".to_string())],
sort_expr: PhysicalSortExpr::new(
Arc::new(Column::new("a", 0)),
SortOptions::default(),
),
expected_plans: vec![
r#" - SortExec: TopK(fetch=2), expr=[a@0 ASC], preserve_partitioning=[false]
- ProjectionExec: expr=[a@0 as a]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]"#.to_string(),
r#" - SortExec: TopK(fetch=2), expr=[a@0 ASC], preserve_partitioning=[false], filter=[a@0 IS NULL OR a@0 < 2]
- ProjectionExec: expr=[a@0 as a]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 IS NULL OR a@0 < 2 ]"#.to_string(),
],
})
.await;

// Case 3: Identity [a, b]
run_projection_dyn_filter_case(ProjectionDynFilterTestCase {
schema: Arc::clone(&schema),
batches: simple_abc.clone(),
projection: vec![
(col("a", &schema).unwrap(), "a".to_string()),
(col("b", &schema).unwrap(), "b".to_string()),
],
sort_expr: PhysicalSortExpr::new(
Arc::new(Column::new("a", 0)),
SortOptions::default(),
),
expected_plans: vec![
r#" - SortExec: TopK(fetch=2), expr=[a@0 ASC], preserve_partitioning=[false]
- ProjectionExec: expr=[a@0 as a, b@1 as b]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]"#.to_string(),
r#" - SortExec: TopK(fetch=2), expr=[a@0 ASC], preserve_partitioning=[false], filter=[a@0 IS NULL OR a@0 < 2]
- ProjectionExec: expr=[a@0 as a, b@1 as b]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 IS NULL OR a@0 < 2 ]"#.to_string(),
],
})
.await;

// Case 4: Expressions [a + 1, b]
run_projection_dyn_filter_case(ProjectionDynFilterTestCase {
schema: Arc::clone(&schema),
batches: simple_abc.clone(),
projection: vec![
(
Arc::new(BinaryExpr::new(
col("a", &schema).unwrap(),
Operator::Plus,
Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
)),
"a_plus_1".to_string(),
),
(col("b", &schema).unwrap(), "b".to_string()),
],
sort_expr: PhysicalSortExpr::new(
Arc::new(Column::new("a_plus_1", 0)),
SortOptions::default(),
),
expected_plans: vec![
r#" - SortExec: TopK(fetch=2), expr=[a_plus_1@0 ASC], preserve_partitioning=[false]
- ProjectionExec: expr=[a@0 + 1 as a_plus_1, b@1 as b]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]"#.to_string(),
r#" - SortExec: TopK(fetch=2), expr=[a_plus_1@0 ASC], preserve_partitioning=[false], filter=[a_plus_1@0 IS NULL OR a_plus_1@0 < 3]
- ProjectionExec: expr=[a@0 + 1 as a_plus_1, b@1 as b]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 + 1 IS NULL OR a@0 + 1 < 3 ]"#.to_string(),
],
})
.await;

// Case 5: [a as b, b as a] (swapped columns)
run_projection_dyn_filter_case(ProjectionDynFilterTestCase {
schema: Arc::clone(&schema),
batches: simple_abc.clone(),
projection: vec![
(col("a", &schema).unwrap(), "b".to_string()),
(col("b", &schema).unwrap(), "a".to_string()),
],
sort_expr: PhysicalSortExpr::new(
Arc::new(Column::new("b", 0)),
SortOptions::default(),
),
expected_plans: vec![
r#" - SortExec: TopK(fetch=2), expr=[b@0 ASC], preserve_partitioning=[false]
- ProjectionExec: expr=[a@0 as b, b@1 as a]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]"#.to_string(),
r#" - SortExec: TopK(fetch=2), expr=[b@0 ASC], preserve_partitioning=[false], filter=[b@0 IS NULL OR b@0 < 2]
- ProjectionExec: expr=[a@0 as b, b@1 as a]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 IS NULL OR a@0 < 2 ]"#.to_string(),
],
})
.await;

// Case 6: Confusing expr [a + 1 as a, b]
run_projection_dyn_filter_case(ProjectionDynFilterTestCase {
schema: Arc::clone(&schema),
batches: simple_abc.clone(),
projection: vec![
(
Arc::new(BinaryExpr::new(
col("a", &schema).unwrap(),
Operator::Plus,
Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
)),
"a".to_string(),
),
(col("b", &schema).unwrap(), "b".to_string()),
],
sort_expr: PhysicalSortExpr::new(
Arc::new(Column::new("a", 0)),
SortOptions::default(),
),
expected_plans: vec![
r#" - SortExec: TopK(fetch=2), expr=[a@0 ASC], preserve_partitioning=[false]
- ProjectionExec: expr=[a@0 + 1 as a, b@1 as b]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]"#.to_string(),
r#" - SortExec: TopK(fetch=2), expr=[a@0 ASC], preserve_partitioning=[false], filter=[a@0 IS NULL OR a@0 < 3]
- ProjectionExec: expr=[a@0 + 1 as a, b@1 as b]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 + 1 IS NULL OR a@0 + 1 < 3 ]"#.to_string(),
],
})
.await;
}

/// Returns a predicate that is a binary expression col = lit
fn col_lit_predicate(
column_name: &str,
Expand Down
76 changes: 59 additions & 17 deletions datafusion/physical-plan/src/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,12 @@
use std::collections::HashSet;
use std::sync::Arc;

use datafusion_common::Result;
use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns};
use arrow_schema::Schema;
use datafusion_common::{
Result,
tree_node::{TreeNode, TreeNodeRecursion},
};
use datafusion_physical_expr::{expressions::Column, utils::reassign_expr_columns};
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use itertools::Itertools;

Expand Down Expand Up @@ -306,6 +310,56 @@ pub struct ChildFilterDescription {
pub(crate) self_filters: Vec<Arc<dyn PhysicalExpr>>,
}

/// A utility for checking whether a filter expression can be pushed down
/// to a child node based on column availability.
///
/// This checker validates that all columns referenced in a filter expression
/// exist in the target schema. If any column in the filter is not present
/// in the schema, the filter cannot be pushed down to that child.
pub struct FilterColumnChecker<'a> {
column_names: HashSet<&'a str>,
}

impl<'a> FilterColumnChecker<'a> {
/// Creates a new [`FilterColumnChecker`] from the given schema.
///
/// Extracts all column names from the schema's fields to build
/// a lookup set for efficient column existence checks.
pub(crate) fn new(input_schema: &'a Schema) -> Self {
let column_names: HashSet<&str> = input_schema
.fields()
.iter()
.map(|f| f.name().as_str())
.collect();
Self { column_names }
}

/// Checks whether a filter expression can be pushed down to the child
/// whose schema was used to create this checker.
///
/// Returns `true` if all [`Column`] references in the filter expression
/// exist in the target schema, `false` otherwise.
///
/// This method traverses the entire expression tree, checking each
/// column reference against the available column names.
pub(crate) fn can_pushdown(&self, filter: &Arc<dyn PhysicalExpr>) -> bool {
let mut can_apply = true;
filter
.apply(|expr| {
if let Some(column) = expr.as_any().downcast_ref::<Column>()
&& !self.column_names.contains(column.name())
{
can_apply = false;
return Ok(TreeNodeRecursion::Stop);
}

Ok(TreeNodeRecursion::Continue)
})
.expect("infallible traversal");
can_apply
}
}

impl ChildFilterDescription {
/// Build a child filter description by analyzing which parent filters can be pushed to a specific child.
///
Expand All @@ -320,26 +374,14 @@ impl ChildFilterDescription {
) -> Result<Self> {
let child_schema = child.schema();

// Get column names from child schema for quick lookup
let child_column_names: HashSet<&str> = child_schema
.fields()
.iter()
.map(|f| f.name().as_str())
.collect();
// Build a set of column names in the child schema for quick lookup
let checker = FilterColumnChecker::new(&child_schema);

// Analyze each parent filter
let mut child_parent_filters = Vec::with_capacity(parent_filters.len());

for filter in parent_filters {
// Check which columns the filter references
let referenced_columns = collect_columns(filter);

// Check if all referenced columns exist in the child schema
let all_columns_exist = referenced_columns
.iter()
.all(|col| child_column_names.contains(col.name()));

if all_columns_exist {
if checker.can_pushdown(filter) {
// All columns exist in child - we can push down
// Need to reassign column indices to match child schema
let reassigned_filter =
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ pub mod streaming;
pub mod tree_node;
pub mod union;
pub mod unnest;
pub mod util;
pub mod windows;
pub mod work_table;
pub mod udaf {
Expand Down
Loading
Loading