From a01e36a7b163d1c87f0ea9a863465771f48a3488 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 30 Mar 2025 15:46:14 -0500 Subject: [PATCH 1/3] Add mechanism for pushing down filters though ExecutionPlan & implement dynamic filter pushdown for TopK operator --- datafusion/common/src/config.rs | 7 + .../src/datasource/physical_plan/parquet.rs | 365 +++++++++++++++-- datafusion/core/tests/fuzz_cases/mod.rs | 2 + .../tests/fuzz_cases/topk_filter_pushdown.rs | 324 +++++++++++++++ datafusion/datasource-parquet/src/opener.rs | 69 +++- datafusion/datasource-parquet/src/source.rs | 54 +-- datafusion/datasource/src/file.rs | 9 +- datafusion/datasource/src/file_scan_config.rs | 16 +- datafusion/datasource/src/source.rs | 23 +- .../physical-expr-common/src/physical_expr.rs | 69 ++++ datafusion/physical-expr/src/lib.rs | 6 +- datafusion/physical-expr/src/utils/mod.rs | 16 + datafusion/physical-optimizer/src/pruning.rs | 5 +- .../physical-plan/src/coalesce_batches.rs | 15 + .../physical-plan/src/dynamic_filters.rs | 320 +++++++++++++++ .../physical-plan/src/execution_plan.rs | 7 + datafusion/physical-plan/src/filter.rs | 20 +- datafusion/physical-plan/src/lib.rs | 3 +- datafusion/physical-plan/src/projection.rs | 14 + .../physical-plan/src/repartition/mod.rs | 14 + datafusion/physical-plan/src/sorts/sort.rs | 73 +++- datafusion/physical-plan/src/topk/mod.rs | 372 +++++++++++++++++- .../proto/src/physical_plan/to_proto.rs | 7 +- .../test_files/information_schema.slt | 2 + docs/source/user-guide/configs.md | 1 + 25 files changed, 1712 insertions(+), 101 deletions(-) create mode 100644 datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs create mode 100644 datafusion/physical-plan/src/dynamic_filters.rs diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index b0f17630c910..6c0a329f6bf3 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -590,6 +590,13 @@ config_namespace! { /// during aggregations, if possible pub enable_topk_aggregation: bool, default = true + /// When set to true attempts to push down dynamic filters generated by operators into the file scan phase. + /// For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer + /// will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. + /// This means that if we already have 10 timestamps in the year 2025 + /// any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. + pub enable_dynamic_filter_pushdown: bool, default = true + /// When set to true, the optimizer will insert filters before a join between /// a nullable and non-nullable column to filter out nulls on the nullable side. This /// filter can add additional overhead when the file format does not fully support diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 9e1b2822e854..2ad00637e8bf 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -43,11 +43,12 @@ mod tests { }; use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaBuilder}; use arrow::record_batch::RecordBatch; + use arrow::util::pretty::pretty_format_batches; use arrow_schema::SchemaRef; use bytes::{BufMut, BytesMut}; use datafusion_common::config::TableParquetOptions; use datafusion_common::test_util::{batches_to_sort_string, batches_to_string}; - use datafusion_common::{assert_contains, Result, ScalarValue}; + use datafusion_common::{assert_batches_eq, assert_contains, Result, ScalarValue}; use datafusion_datasource::file_format::FileFormat; use datafusion_datasource::file_meta::FileMeta; use datafusion_datasource::file_scan_config::FileScanConfigBuilder; @@ -1455,6 +1456,7 @@ mod tests { .await; // should have a pruning predicate + #[expect(deprecated)] let pruning_predicate = rt.parquet_source.pruning_predicate(); assert!(pruning_predicate.is_some()); @@ -1496,6 +1498,7 @@ mod tests { .round_trip(vec![batches.clone()]) .await; + #[expect(deprecated)] let pruning_predicate = rt0.parquet_source.pruning_predicate(); assert!(pruning_predicate.is_some()); @@ -1538,6 +1541,7 @@ mod tests { .await; // should have a pruning predicate + #[expect(deprecated)] let pruning_predicate = rt1.parquet_source.pruning_predicate(); assert!(pruning_predicate.is_some()); let pruning_predicate = rt2.parquet_source.predicate(); @@ -1581,6 +1585,7 @@ mod tests { .await; // Should not contain a pruning predicate (since nothing can be pruned) + #[expect(deprecated)] let pruning_predicate = rt.parquet_source.pruning_predicate(); assert!( pruning_predicate.is_none(), @@ -1616,6 +1621,7 @@ mod tests { .await; // Should have a pruning predicate + #[expect(deprecated)] let pruning_predicate = rt.parquet_source.pruning_predicate(); assert!(pruning_predicate.is_some()); } @@ -1769,13 +1775,13 @@ mod tests { let sql = "select * from base_table where name='test02'"; let batch = ctx.sql(sql).await.unwrap().collect().await.unwrap(); assert_eq!(batch.len(), 1); - insta::assert_snapshot!(batches_to_string(&batch),@r###" - +---------------------+----+--------+ - | struct | id | name | - +---------------------+----+--------+ - | {id: 4, name: aaa2} | 2 | test02 | - +---------------------+----+--------+ - "###); + insta::assert_snapshot!(batches_to_string(&batch),@r" + +--------------------+----+--------+ + | struct | id | name | + +--------------------+----+--------+ + | {id: 3, name: zzz} | 2 | test02 | + +--------------------+----+--------+ + "); Ok(()) } @@ -1798,13 +1804,13 @@ mod tests { let sql = "select * from base_table where name='test02'"; let batch = ctx.sql(sql).await.unwrap().collect().await.unwrap(); assert_eq!(batch.len(), 1); - insta::assert_snapshot!(batches_to_string(&batch),@r###" - +---------------------+----+--------+ - | struct | id | name | - +---------------------+----+--------+ - | {id: 4, name: aaa2} | 2 | test02 | - +---------------------+----+--------+ - "###); + insta::assert_snapshot!(batches_to_string(&batch),@r" + +--------------------+----+--------+ + | struct | id | name | + +--------------------+----+--------+ + | {id: 3, name: zzz} | 2 | test02 | + +--------------------+----+--------+ + "); Ok(()) } @@ -1818,14 +1824,14 @@ mod tests { Field::new("id", DataType::Int64, true), Field::new("name", DataType::Utf8, false), ]); - let id_array = Int64Array::from(vec![Some(1), Some(2)]); + let id_array = Int64Array::from(vec![Some(2), Some(1)]); let columns = vec![ Arc::new(Int64Array::from(vec![3, 4])) as _, - Arc::new(StringArray::from(vec!["aaa1", "aaa2"])) as _, + Arc::new(StringArray::from(vec!["zzz", "aaa"])) as _, ]; let struct_array = StructArray::new(struct_fields, columns, None); - let name_array = StringArray::from(vec![Some("test01"), Some("test02")]); + let name_array = StringArray::from(vec![Some("test02"), Some("test01")]); let schema = Arc::new(schema); let batch = RecordBatch::try_new( @@ -1837,12 +1843,53 @@ mod tests { ], ) .unwrap(); - let file = File::create(file).unwrap(); - let w_opt = WriterProperties::builder().build(); - let mut writer = ArrowWriter::try_new(file, schema, Some(w_opt)).unwrap(); - writer.write(&batch).unwrap(); - writer.flush().unwrap(); - writer.close().unwrap(); + write_record_batch(file, batch).unwrap(); + } + + fn write_file_with_non_null_ids(file: &String, value: i64) { + let schema = Schema::new(vec![ + Field::new("id", DataType::Int64, true), + Field::new("name", DataType::Utf8, false), + ]); + let id_array = Int64Array::from(vec![Some(value)]); + let name_array = StringArray::from(vec![Some("test")]); + let schema = Arc::new(schema); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(id_array), Arc::new(name_array)], + ) + .unwrap(); + write_record_batch(file, batch).unwrap(); + } + + fn write_file_with_null_ids(file: &String) { + let schema = Schema::new(vec![ + Field::new("id", DataType::Int64, true), + Field::new("name", DataType::Utf8, false), + ]); + let id_array = Int64Array::from(vec![None]); + let name_array = StringArray::from(vec![Some(format!("test{:02}", "null"))]); + let schema = Arc::new(schema); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(id_array), Arc::new(name_array)], + ) + .unwrap(); + write_record_batch(file, batch).unwrap(); + } + + fn write_record_batch(file: &String, batch: RecordBatch) -> Result<()> { + let file = File::create(file)?; + let w_opt = WriterProperties::builder() + .set_max_row_group_size(1) + .build(); + let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(w_opt))?; + writer.write(&batch)?; + writer.flush()?; + writer.close()?; + Ok(()) } /// Write out a batch to a parquet file and return the total size of the file @@ -1904,6 +1951,49 @@ mod tests { } } + struct DynamicFilterTestCase { + query: String, + path: String, + } + + impl DynamicFilterTestCase { + fn new(query: String, path: String) -> Self { + Self { query, path } + } + + async fn _run_query(&self, query: &str) -> Vec { + // Force 1 partition and 1 rg per partition because if we widen the plan + // and read all batches at once we won't get any dynamic pushdown. + let mut cfg = SessionConfig::new(); + cfg = cfg.set_u64("datafusion.execution.parquet.max_row_group_size", 1); + let ctx = SessionContext::new_with_config(cfg); + + let mut pq_options = TableParquetOptions::default(); + pq_options.global.max_row_group_size = 1; + pq_options.global.pushdown_filters = true; + let fmt = ParquetFormat::default().with_options(pq_options); + let opt = ListingOptions::new(Arc::new(fmt)).with_target_partitions(1); + ctx.register_listing_table("base_table", &self.path, opt, None, None) + .await + .unwrap(); + + ctx.sql(query).await.unwrap().collect().await.unwrap() + } + + async fn results(&self) -> Vec { + self._run_query(&self.query).await + } + + async fn explain_plan(&self) -> String { + let query = format!("EXPLAIN ANALYZE {}", self.query); + let batches = self._run_query(&query).await; + + pretty_format_batches(&batches) + .map(|s| format!("{}", s)) + .unwrap_or_else(|_| "No explain plan generated".to_string()) + } + } + /// Test passing `metadata_size_hint` to either a single file or the whole exec #[tokio::test] async fn test_metadata_size_hint() { @@ -1976,4 +2066,231 @@ mod tests { assert_eq!(calls.len(), 2); assert_eq!(calls, vec![Some(123), Some(456)]); } + + #[tokio::test] + async fn test_topk_predicate_pushdown() { + let tmp_dir = TempDir::new().unwrap(); + let path = tmp_dir.path().to_str().unwrap().to_string(); + + for file in 0..5 { + // write 2 files so that one is processed before the other + let name = format!("test{:02}.parquet", file); + write_file(&format!("{path}/{name}")); + } + + let query = "select name from base_table order by id desc limit 3"; + + let test_case = DynamicFilterTestCase::new(query.to_string(), path); + + let batches = test_case.results().await; + #[rustfmt::skip] + let expected = [ + "+--------+", + "| name |", + "+--------+", + "| test02 |", + "| test02 |", + "| test02 |", + "+--------+", + ]; + assert_batches_eq!(expected, &batches); + + let plan = test_case.explain_plan().await; + assert_contains!(&plan, "row_groups_pruned_statistics=3"); + } + + #[tokio::test] + async fn test_topk_predicate_pushdown_nulls_first() { + let tmp_dir = TempDir::new().unwrap(); + let path = tmp_dir.path().to_str().unwrap().to_string(); + + for file in 0..5 { + // write multiple files to ensure we get pushdown of dynamic filters from one file to another + let name = format!("test{:02}.parquet", file); + write_file(&format!("{path}/{name}")); + } + + let name = format!("test{:02}.parquet", 100); + write_file_with_null_ids(&format!("{path}/{name}")); + + // nulls first by default + let query = "select name from base_table order by id desc limit 3"; + let test_case = DynamicFilterTestCase::new(query.to_string(), path); + + let batches = test_case.results().await; + #[rustfmt::skip] + let expected = [ + "+----------+", + "| name |", + "+----------+", + "| testnull |", + "| test02 |", + "| test02 |", + "+----------+", + ]; + assert_batches_eq!(expected, &batches); + + let plan = test_case.explain_plan().await; + assert_contains!(&plan, "row_groups_pruned_statistics=3"); + } + + #[tokio::test] + async fn test_topk_predicate_pushdown_multi_key() { + let tmp_dir = TempDir::new().unwrap(); + let path = tmp_dir.path().to_str().unwrap().to_string(); + for file in 0..5 { + // write multiple files to ensure we get pushdown of dynamic filters from one file to another + // Ensure files are read in order + let name = format!("test{:02}.parquet", file); + write_file_with_non_null_ids(&format!("{path}/{name}"), file); + } + + let query = "select id from base_table order by name desc, id limit 3"; + let test_case = DynamicFilterTestCase::new(query.to_string(), path.clone()); + + let batches = test_case.results().await; + #[rustfmt::skip] + let expected = [ + "+----+", + "| id |", + "+----+", + "| 0 |", + "| 1 |", + "| 2 |", + "+----+", + ]; + assert_batches_eq!(expected, &batches); + + let plan = test_case.explain_plan().await; + assert_contains!(&plan, "row_groups_pruned_statistics=1"); + + let query1 = "select id from base_table order by name desc, id desc limit 3"; + let test_case = DynamicFilterTestCase::new(query1.to_string(), path); + + let batches = test_case.results().await; + #[rustfmt::skip] + let expected = [ + "+----+", + "| id |", + "+----+", + "| 4 |", + "| 3 |", + "| 2 |", + "+----+", + ]; + assert_batches_eq!(expected, &batches); + + let plan = test_case.explain_plan().await; + assert_contains!(&plan, "row_groups_pruned_statistics=0"); + } + + #[tokio::test] + async fn test_topk_predicate_pushdown_nulls_last() { + let tmp_dir = TempDir::new().unwrap(); + let path = tmp_dir.path().to_str().unwrap().to_string(); + + for file in 0..5 { + let name = format!("test{:02}.parquet", file); + write_file(&format!("{path}/{name}")); + } + let name = format!("test{:02}.parquet", 100); + write_file_with_null_ids(&format!("{path}/{name}")); + + let query = "select name from base_table order by id desc nulls last limit 3"; + let test_case = DynamicFilterTestCase::new(query.to_string(), path); + + let batches = test_case.results().await; + #[rustfmt::skip] + let expected = [ + "+--------+", + "| name |", + "+--------+", + "| test02 |", + "| test02 |", + "| test02 |", + "+--------+", + ]; + assert_batches_eq!(expected, &batches); + + let plan = test_case.explain_plan().await; + assert_contains!(&plan, "row_groups_pruned_statistics=4"); + } + + #[tokio::test] + async fn test_topk_predicate_pushdown_single_file() { + let tmp_dir = TempDir::new().unwrap(); + let path = tmp_dir.path().to_str().unwrap().to_string(); + + write_file(&format!("{path}/test.parquet")); + + let query = "select name from base_table order by id desc nulls last limit 1"; + let test_case = DynamicFilterTestCase::new(query.to_string(), path); + + let batches = test_case.results().await; + #[rustfmt::skip] + let expected = [ + "+--------+", + "| name |", + "+--------+", + "| test02 |", + "+--------+", + ]; + assert_batches_eq!(expected, &batches); + + let plan = test_case.explain_plan().await; + assert_contains!(&plan, "pushdown_rows_pruned=1"); + } + + #[tokio::test] + async fn test_topk_predicate_pushdown_ignores_partition_columns() { + // The TopK operator will try to push down predicates on `file_id`. + // But since `file_id` is a partition column and not part of the file itself + // we cannot actually do any filtering on it at the file level. + // Thus it has to be ignored by `ParquetSource`. + // This test only shows that this does not result in any errors or panics, + // it is expected that "nothing exciting" happens here. + // I do think in the future it would be interesting to re-design how partition columns + // get handled, in particular by pushing them into SchemaAdapter so that the table schema == file schema + // and we can do predicate pushdown on them as well without relying on each TableProvider to + // do special handling of partition columns. + + let ctx = SessionContext::new(); + let opt = ListingOptions::new(Arc::new(ParquetFormat::default())) + .with_table_partition_cols(vec![("file_id".to_string(), DataType::UInt32)]) + // We need to force 1 partition because TopK predicate pushdown happens on a per-partition basis + // If we had 1 file per partition (as an example) no pushdown would happen + .with_target_partitions(1); + + let tmp_dir = TempDir::new().unwrap(); + let path = tmp_dir.path().to_str().unwrap().to_string(); + for file in 0..5 { + // crete a directory for the partition + fs::create_dir_all(format!("{path}/file_id={file}")).unwrap(); + let name = format!("file_id={file}/test.parquet"); + write_file(&format!("{path}/{name}")); + } + ctx.register_listing_table("base_table", path, opt, None, None) + .await + .unwrap(); + + let query = "select file_id from base_table order by file_id asc limit 3"; + + let batches = ctx.sql(query).await.unwrap().collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+---------+", + "| file_id |", + "+---------+", + "| 0 |", + "| 0 |", + "| 1 |", + "+---------+", + ]; + assert_batches_eq!(expected, &batches); + + let sql = format!("explain analyze {query}"); + let batches = ctx.sql(&sql).await.unwrap().collect().await.unwrap(); + let explain_plan = format!("{}", pretty_format_batches(&batches).unwrap()); + assert_contains!(explain_plan, "row_groups_pruned_statistics=0"); // just documenting current behavior + } } diff --git a/datafusion/core/tests/fuzz_cases/mod.rs b/datafusion/core/tests/fuzz_cases/mod.rs index d5511e2970f4..11bf29431e90 100644 --- a/datafusion/core/tests/fuzz_cases/mod.rs +++ b/datafusion/core/tests/fuzz_cases/mod.rs @@ -29,3 +29,5 @@ mod pruning; mod limit_fuzz; mod sort_preserving_repartition_fuzz; mod window_fuzz; + +mod topk_filter_pushdown; diff --git a/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs b/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs new file mode 100644 index 000000000000..aafb38a5d542 --- /dev/null +++ b/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs @@ -0,0 +1,324 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::sync::{Arc, LazyLock}; + +use arrow::array::{Int32Array, StringArray, StringDictionaryBuilder}; +use arrow::datatypes::Int32Type; +use arrow::record_batch::RecordBatch; +use arrow_schema::{DataType, Field, Schema}; +use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig}; +use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_datasource::ListingTableUrl; +use datafusion_datasource_parquet::ParquetFormat; +use datafusion_execution::object_store::ObjectStoreUrl; +use itertools::Itertools; +use object_store::memory::InMemory; +use object_store::path::Path; +use object_store::{ObjectStore, PutPayload}; +use parquet::arrow::ArrowWriter; +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; +use tokio::sync::Mutex; +use tokio::task::JoinSet; + +#[derive(Clone)] +struct TestDataSet { + store: Arc, + schema: Arc, +} + +/// List of in memory parquet files with UTF8 data +// Use a mutex rather than LazyLock to allow for async initialization +static TESTFILES: LazyLock>> = + LazyLock::new(|| Mutex::new(vec![])); + +async fn test_files() -> Vec { + let files_mutex = &TESTFILES; + let mut files = files_mutex.lock().await; + if !files.is_empty() { + return (*files).clone(); + } + + let mut rng = StdRng::seed_from_u64(0); + + for nulls_in_ids in [false, true] { + for nulls_in_names in [false, true] { + for nulls_in_departments in [false, true] { + let store = Arc::new(InMemory::new()); + + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, nulls_in_ids), + Field::new("name", DataType::Utf8, nulls_in_names), + Field::new( + "department", + DataType::Dictionary( + Box::new(DataType::Int32), + Box::new(DataType::Utf8), + ), + nulls_in_departments, + ), + ])); + + let name_choices = if nulls_in_names { + [Some("Alice"), Some("Bob"), None, Some("David"), None] + } else { + [ + Some("Alice"), + Some("Bob"), + Some("Charlie"), + Some("David"), + Some("Eve"), + ] + }; + + let department_choices = if nulls_in_departments { + [ + Some("Theater"), + Some("Engineering"), + None, + Some("Arts"), + None, + ] + } else { + [ + Some("Theater"), + Some("Engineering"), + Some("Healthcare"), + Some("Arts"), + Some("Music"), + ] + }; + + // Generate 5 files, some with overlapping or repeated ids some without + for i in 0..5 { + let num_batches = rng.gen_range(1..3); + let mut batches = Vec::with_capacity(num_batches); + for _ in 0..num_batches { + let num_rows = 25; + let ids = Int32Array::from_iter((0..num_rows).map(|file| { + if nulls_in_ids { + if rng.gen_bool(1.0 / 10.0) { + None + } else { + Some(rng.gen_range(file..file + 5)) + } + } else { + Some(rng.gen_range(file..file + 5)) + } + })); + let names = StringArray::from_iter((0..num_rows).map(|_| { + // randomly select a name + let idx = rng.gen_range(0..name_choices.len()); + name_choices[idx].map(|s| s.to_string()) + })); + let mut departments = StringDictionaryBuilder::::new(); + for _ in 0..num_rows { + // randomly select a department + let idx = rng.gen_range(0..department_choices.len()); + departments.append_option(department_choices[idx].as_ref()); + } + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(ids), + Arc::new(names), + Arc::new(departments.finish()), + ], + ) + .unwrap(); + batches.push(batch); + } + let mut buf = vec![]; + { + let mut writer = + ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap(); + for batch in batches { + writer.write(&batch).unwrap(); + writer.flush().unwrap(); + } + writer.flush().unwrap(); + writer.finish().unwrap(); + } + let payload = PutPayload::from(buf); + let path = Path::from(format!("file_{i}.parquet")); + store.put(&path, payload).await.unwrap(); + } + files.push(TestDataSet { store, schema }); + } + } + } + (*files).clone() +} + +async fn run_query_with_config( + query: &str, + config: SessionConfig, + dataset: TestDataSet, +) -> Vec { + let store = dataset.store; + let schema = dataset.schema; + let ctx = SessionContext::new_with_config(config); + let url = ObjectStoreUrl::parse("memory://").unwrap(); + ctx.register_object_store(url.as_ref(), store.clone()); + + let format = Arc::new( + ParquetFormat::default() + .with_options(ctx.state().table_options().parquet.clone()), + ); + let options = ListingOptions::new(format); + let table_path = ListingTableUrl::parse("memory:///").unwrap(); + let config = ListingTableConfig::new(table_path) + .with_listing_options(options) + .with_schema(schema); + let table = Arc::new(ListingTable::try_new(config).unwrap()); + + ctx.register_table("test_table", table).unwrap(); + + ctx.sql(query).await.unwrap().collect().await.unwrap() +} + +#[derive(Debug)] +struct RunQueryResult { + query: String, + result: Vec, + expected: Vec, +} + +async fn run_query( + query: String, + cfg: SessionConfig, + dataset: TestDataSet, +) -> RunQueryResult { + let cfg_with_dynamic_filters = cfg + .clone() + .set_bool("datafusion.optimizer.enable_dynamic_filter_pushdown", true); + let cfg_without_dynamic_filters = cfg + .clone() + .set_bool("datafusion.optimizer.enable_dynamic_filter_pushdown", false); + + let expected_result = + run_query_with_config(&query, cfg_without_dynamic_filters, dataset.clone()).await; + let result = + run_query_with_config(&query, cfg_with_dynamic_filters, dataset.clone()).await; + + RunQueryResult { + query: query.to_string(), + result, + expected: expected_result, + } +} + +struct TestCase { + query: String, + cfg: SessionConfig, + dataset: TestDataSet, +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_fuzz_topk_filter_pushdown() { + let order_columns = ["id", "name", "department"]; + let order_directions = ["ASC", "DESC"]; + let null_orders = ["NULLS FIRST", "NULLS LAST"]; + + let start = datafusion_common::instant::Instant::now(); + let mut orders: HashMap> = HashMap::new(); + for order_column in &order_columns { + for order_direction in &order_directions { + for null_order in &null_orders { + // if there is a vec for this column insert the order, otherwise create a new vec + let ordering = + format!("{} {} {}", order_column, order_direction, null_order); + match orders.get_mut(*order_column) { + Some(order_vec) => { + order_vec.push(ordering); + } + None => { + orders.insert(order_column.to_string(), vec![ordering]); + } + } + } + } + } + + let mut queries = vec![]; + + for limit in [1, 10] { + for num_order_by_columns in [1, 2, 3] { + for order_columns in ["id", "name", "department"] + .iter() + .combinations(num_order_by_columns) + { + for orderings in order_columns + .iter() + .map(|col| orders.get(**col).unwrap()) + .multi_cartesian_product() + { + let query = format!( + "SELECT * FROM test_table ORDER BY {} LIMIT {}", + orderings.into_iter().join(", "), + limit + ); + queries.push(query); + } + } + } + } + + queries.sort_unstable(); + println!( + "Generated {} queries in {:?}", + queries.len(), + start.elapsed() + ); + + let start = datafusion_common::instant::Instant::now(); + let datasets = test_files().await; + println!("Generated test files in {:?}", start.elapsed()); + + let mut test_cases = vec![]; + for enable_filter_pushdown in [true, false] { + for query in &queries { + for dataset in &datasets { + let mut cfg = SessionConfig::new(); + cfg = cfg.set_bool( + "datafusion.optimizer.enable_dynamic_filter_pushdown", + enable_filter_pushdown, + ); + test_cases.push(TestCase { + query: query.to_string(), + cfg, + dataset: dataset.clone(), + }); + } + } + } + + let start = datafusion_common::instant::Instant::now(); + let mut join_set = JoinSet::new(); + for tc in test_cases { + join_set.spawn(run_query(tc.query, tc.cfg, tc.dataset)); + } + let mut results = join_set.join_all().await; + results.sort_unstable_by(|a, b| a.query.cmp(&b.query)); + println!("Ran {} test cases in {:?}", results.len(), start.elapsed()); + + for result in results { + assert_eq!(result.result, result.expected, "Query: {}", result.query); + } +} diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 732fef47d5a7..4752aaadee1d 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -34,7 +34,7 @@ use arrow::error::ArrowError; use datafusion_common::{exec_err, Result}; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_optimizer::pruning::PruningPredicate; -use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; +use datafusion_physical_plan::metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder}; use futures::{StreamExt, TryStreamExt}; use log::debug; @@ -54,10 +54,6 @@ pub(super) struct ParquetOpener { pub limit: Option, /// Optional predicate to apply during the scan pub predicate: Option>, - /// Optional pruning predicate applied to row group statistics - pub pruning_predicate: Option>, - /// Optional pruning predicate applied to data page statistics - pub page_pruning_predicate: Option>, /// Schema of the output table pub table_schema: SchemaRef, /// Optional hint for how large the initial request to read parquet metadata @@ -109,18 +105,32 @@ impl FileOpener for ParquetOpener { .schema_adapter_factory .create(projected_schema, Arc::clone(&self.table_schema)); let predicate = self.predicate.clone(); - let pruning_predicate = self.pruning_predicate.clone(); - let page_pruning_predicate = self.page_pruning_predicate.clone(); let table_schema = Arc::clone(&self.table_schema); let reorder_predicates = self.reorder_filters; let pushdown_filters = self.pushdown_filters; - let enable_page_index = should_enable_page_index( - self.enable_page_index, - &self.page_pruning_predicate, - ); let enable_bloom_filter = self.enable_bloom_filter; let limit = self.limit; + let predicate_creation_errors = MetricBuilder::new(&self.metrics) + .global_counter("num_predicate_creation_errors"); + + let (pruning_predicate, page_pruning_predicate) = + if let Some(predicate) = &predicate { + let pruning_predicate = build_pruning_predicate( + Arc::clone(predicate), + &table_schema, + &predicate_creation_errors, + ); + let page_pruning_predicate = + build_page_pruning_predicate(predicate, &table_schema); + (pruning_predicate, Some(page_pruning_predicate)) + } else { + (None, None) + }; + + let enable_page_index = + should_enable_page_index(self.enable_page_index, &page_pruning_predicate); + Ok(Box::pin(async move { let options = ArrowReaderOptions::new().with_page_index(enable_page_index); @@ -295,3 +305,40 @@ fn create_initial_plan( // default to scanning all row groups Ok(ParquetAccessPlan::new_all(row_group_count)) } + +/// Build a pruning predicate from an optional predicate expression. +/// If the predicate is None or the predicate cannot be converted to a pruning +/// predicate, return None. +/// If there is an error creating the pruning predicate it is recorded by incrementing +/// the `predicate_creation_errors` counter. +pub(crate) fn build_pruning_predicate( + predicate: Arc, + file_schema: &SchemaRef, + predicate_creation_errors: &Count, +) -> Option> { + match PruningPredicate::try_new(predicate, Arc::clone(file_schema)) { + Ok(pruning_predicate) => { + if !pruning_predicate.always_true() { + return Some(Arc::new(pruning_predicate)); + } + } + Err(e) => { + debug!("Could not create pruning predicate for: {e}"); + predicate_creation_errors.add(1); + } + } + None +} + +/// Build a page pruning predicate from an optional predicate expression. +/// If the predicate is None or the predicate cannot be converted to a page pruning +/// predicate, return None. +pub(crate) fn build_page_pruning_predicate( + predicate: &Arc, + file_schema: &SchemaRef, +) -> Arc { + Arc::new(PagePruningAccessPlanFilter::new( + predicate, + Arc::clone(file_schema), + )) +} diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 66d4d313d5a6..8331c0074e44 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -20,6 +20,8 @@ use std::any::Any; use std::fmt::Formatter; use std::sync::Arc; +use crate::opener::build_page_pruning_predicate; +use crate::opener::build_pruning_predicate; use crate::opener::ParquetOpener; use crate::page_filter::PagePruningAccessPlanFilter; use crate::DefaultParquetFileReaderFactory; @@ -34,6 +36,7 @@ use datafusion_common::config::TableParquetOptions; use datafusion_common::Statistics; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; +use datafusion_physical_expr::conjunction; use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_optimizer::pruning::PruningPredicate; @@ -41,7 +44,6 @@ use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder}; use datafusion_physical_plan::DisplayFormatType; use itertools::Itertools; -use log::debug; use object_store::ObjectStore; /// Execution plan for reading one or more Parquet files. @@ -316,24 +318,10 @@ impl ParquetSource { conf = conf.with_metrics(metrics); conf.predicate = Some(Arc::clone(&predicate)); - match PruningPredicate::try_new(Arc::clone(&predicate), Arc::clone(&file_schema)) - { - Ok(pruning_predicate) => { - if !pruning_predicate.always_true() { - conf.pruning_predicate = Some(Arc::new(pruning_predicate)); - } - } - Err(e) => { - debug!("Could not create pruning predicate for: {e}"); - predicate_creation_errors.add(1); - } - }; - - let page_pruning_predicate = Arc::new(PagePruningAccessPlanFilter::new( - &predicate, - Arc::clone(&file_schema), - )); - conf.page_pruning_predicate = Some(page_pruning_predicate); + conf.page_pruning_predicate = + Some(build_page_pruning_predicate(&predicate, &file_schema)); + conf.pruning_predicate = + build_pruning_predicate(predicate, &file_schema, &predicate_creation_errors); conf } @@ -349,11 +337,13 @@ impl ParquetSource { } /// Optional reference to this parquet scan's pruning predicate + #[deprecated(note = "ParquetDataSource no longer constructs a PruningPredicate.")] pub fn pruning_predicate(&self) -> Option<&Arc> { self.pruning_predicate.as_ref() } /// Optional reference to this parquet scan's page pruning predicate + #[deprecated(note = "ParquetDataSource no longer constructs a PruningPredicate.")] pub fn page_pruning_predicate(&self) -> Option<&Arc> { self.page_pruning_predicate.as_ref() } @@ -488,8 +478,6 @@ impl FileSource for ParquetSource { .expect("Batch size must set before creating ParquetOpener"), limit: base_config.limit, predicate: self.predicate.clone(), - pruning_predicate: self.pruning_predicate.clone(), - page_pruning_predicate: self.page_pruning_predicate.clone(), table_schema: Arc::clone(&base_config.file_schema), metadata_size_hint: self.metadata_size_hint, metrics: self.metrics().clone(), @@ -537,11 +525,10 @@ impl FileSource for ParquetSource { .expect("projected_statistics must be set"); // When filters are pushed down, we have no way of knowing the exact statistics. // Note that pruning predicate is also a kind of filter pushdown. - // (bloom filters use `pruning_predicate` too) - if self.pruning_predicate().is_some() - || self.page_pruning_predicate().is_some() - || (self.predicate().is_some() && self.pushdown_filters()) - { + // (bloom filters use `pruning_predicate` too). + // Because filter pushdown may happen dynamically as long as there is a predicate + // if we have *any* predicate applied, we can't guarantee the statistics are exact. + if self.predicate().is_some() { Ok(statistics.to_inexact()) } else { Ok(statistics) @@ -559,6 +546,7 @@ impl FileSource for ParquetSource { .predicate() .map(|p| format!(", predicate={p}")) .unwrap_or_default(); + #[expect(deprecated)] let pruning_predicate_string = self .pruning_predicate() .map(|pre| { @@ -586,4 +574,18 @@ impl FileSource for ParquetSource { } } } + + fn push_down_filter( + &self, + expr: Arc, + ) -> datafusion_common::Result>> { + let mut conf = self.clone(); + conf.predicate = match self.predicate.as_ref() { + Some(existing_predicate) => { + Some(conjunction([Arc::clone(existing_predicate), expr])) + } + None => Some(expr), + }; + Ok(Some(Arc::new(conf))) + } } diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index 0066f39801a1..b0b9fe4ce905 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -27,7 +27,7 @@ use crate::file_scan_config::FileScanConfig; use crate::file_stream::FileOpener; use arrow::datatypes::SchemaRef; use datafusion_common::Statistics; -use datafusion_physical_expr::LexOrdering; +use datafusion_physical_expr::{LexOrdering, PhysicalExpr}; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_physical_plan::DisplayFormatType; @@ -93,4 +93,11 @@ pub trait FileSource: Send + Sync { } Ok(None) } + + fn push_down_filter( + &self, + _expr: Arc, + ) -> datafusion_common::Result>> { + Ok(None) + } } diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 5172dafb1f91..aab89c46bec0 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -37,7 +37,7 @@ use datafusion_execution::{ object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext, }; use datafusion_physical_expr::{ - expressions::Column, EquivalenceProperties, LexOrdering, Partitioning, + expressions::Column, EquivalenceProperties, LexOrdering, Partitioning, PhysicalExpr, PhysicalSortExpr, }; use datafusion_physical_plan::{ @@ -584,6 +584,20 @@ impl DataSource for FileScanConfig { ) as _ })) } + + fn push_down_filter( + &self, + expr: Arc, + ) -> Result>> { + // Try to push down to the file source + if let Some(file_source) = self.file_source.push_down_filter(expr)? { + return Ok(Some(Arc::new(Self { + file_source, + ..self.clone() + }))); + } + Ok(None) + } } impl FileScanConfig { diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 6c9122ce1ac1..a3bbc153c535 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -33,7 +33,7 @@ use crate::file_scan_config::FileScanConfig; use datafusion_common::config::ConfigOptions; use datafusion_common::{Constraints, Statistics}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; -use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; +use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr}; use datafusion_physical_expr_common::sort_expr::LexOrdering; /// Common behaviors in Data Sources for both from Files and Memory. @@ -79,6 +79,13 @@ pub trait DataSource: Send + Sync + Debug { &self, _projection: &ProjectionExec, ) -> datafusion_common::Result>>; + + fn push_down_filter( + &self, + _expr: Arc, + ) -> datafusion_common::Result>> { + Ok(None) + } } /// [`ExecutionPlan`] handles different file formats like JSON, CSV, AVRO, ARROW, PARQUET @@ -192,6 +199,20 @@ impl ExecutionPlan for DataSourceExec { ) -> datafusion_common::Result>> { self.data_source.try_swapping_with_projection(projection) } + + fn push_down_filter( + &self, + expr: Arc, + ) -> datafusion_common::Result>> { + // Try to push down to the data source + if let Some(data_source) = self.data_source.push_down_filter(expr)? { + return Ok(Some(Arc::new(Self { + data_source, + ..self.clone() + }))); + } + Ok(None) + } } impl DataSourceExec { diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index 43f214607f9f..22f671d349e2 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -27,6 +27,7 @@ use arrow::array::BooleanArray; use arrow::compute::filter_record_batch; use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{internal_err, not_impl_err, Result, ScalarValue}; use datafusion_expr_common::columnar_value::ColumnarValue; use datafusion_expr_common::interval_arithmetic::Interval; @@ -283,6 +284,47 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug + DynEq + DynHash { /// See the [`fmt_sql`] function for an example of printing `PhysicalExpr`s as SQL. /// fn fmt_sql(&self, f: &mut Formatter<'_>) -> fmt::Result; + + /// Take a snapshot of this `PhysicalExpr` if it is dynamic. + /// This is used to capture the current state of `PhysicalExpr`s that may contain + /// dynamic references to other operators in order to serialize it over the wire + /// or treat it via downcast matching. + /// + /// You should not call this method directly as it does not handle recursion. + /// Instead use `shapshot_physical_expr` to handle recursion and capture the + /// full state of the `PhysicalExpr`. + /// + /// This is expected to return "simple" expressions that do not have mutable state + /// and are composed of DataFusion's built-in `PhysicalExpr` implementations. + /// Callers however should *not* assume anything about the returned expressions + /// since callers and implementers may not agree on what "simple" or "built-in" + /// means. + /// In other words, if you need to searlize a `PhysicalExpr` across the wire + /// you should call this method and then try to serialize the result, + /// but you should handle unknown or unexpected `PhysicalExpr` implementations gracefully + /// just as if you had not called this method at all. + /// + /// In particular, consider: + /// * A `PhysicalExpr` that references the current state of a `datafusion::physical_plan::TopK` + /// that is involved in a query with `SELECT * FROM t1 ORDER BY a LIMIT 10`. + /// This function may return something like `a >= 12`. + /// * A `PhysicalExpr` that references the current state of a `datafusion::physical_plan::joins::HashJoinExec` + /// from a query such as `SELECT * FROM t1 JOIN t2 ON t1.a = t2.b`. + /// This function may return something like `t2.b IN (1, 5, 7)`. + /// + /// A system or function that can only deal with a hardcoded set of `PhysicalExpr` implementations + /// or needs to serialize this state to bytes may not be able to handle these dynamic references. + /// In such cases, we should return a simplified version of the `PhysicalExpr` that does not + /// contain these dynamic references. + /// + /// Note for implementers: this method should *not* handle recursion. + /// Recursion is handled in `shapshot_physical_expr`. + fn snapshot(&self) -> Result>> { + // By default, we return None to indicate that this PhysicalExpr does not + // have any dynamic references or state. + // This is a safe default behavior. + Ok(None) + } } /// [`PhysicalExpr`] can't be constrained by [`Eq`] directly because it must remain object @@ -446,3 +488,30 @@ pub fn fmt_sql(expr: &dyn PhysicalExpr) -> impl Display + '_ { Wrapper { expr } } + +/// Take a snapshot of the given `PhysicalExpr` if it is dynamic. +/// +/// Take a snapshot of this `PhysicalExpr` if it is dynamic. +/// This is used to capture the current state of `PhysicalExpr`s that may contain +/// dynamic references to other operators in order to serialize it over the wire +/// or treat it via downcast matching. +/// +/// See the documentation of [`PhysicalExpr::snapshot`] for more details. +/// +/// # Returns +/// +/// Returns an `Option>` which is the snapshot of the +/// `PhysicalExpr` if it is dynamic. If the `PhysicalExpr` does not have +/// any dynamic references or state, it returns `None`. +pub fn snasphot_physical_expr( + expr: Arc, +) -> Result> { + expr.transform_up(|e| { + if let Some(snapshot) = e.snapshot()? { + Ok(Transformed::yes(snapshot)) + } else { + Ok(Transformed::no(Arc::clone(&e))) + } + }) + .data() +} diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index 93ced2eb628d..551ee97a8783 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -59,7 +59,9 @@ pub use physical_expr::{ PhysicalExprRef, }; -pub use datafusion_physical_expr_common::physical_expr::PhysicalExpr; +pub use datafusion_physical_expr_common::physical_expr::{ + snasphot_physical_expr, PhysicalExpr, +}; pub use datafusion_physical_expr_common::sort_expr::{ LexOrdering, LexRequirement, PhysicalSortExpr, PhysicalSortRequirement, }; @@ -68,7 +70,7 @@ pub use planner::{create_physical_expr, create_physical_exprs}; pub use scalar_function::ScalarFunctionExpr; pub use datafusion_physical_expr_common::utils::reverse_order_bys; -pub use utils::split_conjunction; +pub use utils::{conjunction, split_conjunction}; // For backwards compatibility pub mod tree_node { diff --git a/datafusion/physical-expr/src/utils/mod.rs b/datafusion/physical-expr/src/utils/mod.rs index 7e4c7f0e10ba..21496c5edef5 100644 --- a/datafusion/physical-expr/src/utils/mod.rs +++ b/datafusion/physical-expr/src/utils/mod.rs @@ -47,6 +47,22 @@ pub fn split_conjunction( split_impl(Operator::And, predicate, vec![]) } +/// Create a conjunction of the given predicates. +/// If the input is empty, return a literal true. +/// If the input contains a single predicate, return the predicate. +/// Otherwise, return a conjunction of the predicates (e.g. `a AND b AND c`). +pub fn conjunction( + predicates: impl IntoIterator>, +) -> Arc { + predicates + .into_iter() + .fold(None, |acc, predicate| match acc { + None => Some(predicate), + Some(acc) => Some(Arc::new(BinaryExpr::new(acc, Operator::And, predicate))), + }) + .unwrap_or_else(|| crate::expressions::lit(true)) +} + /// Assume the predicate is in the form of DNF, split the predicate to a Vec of PhysicalExprs. /// /// For example, split "a1 = a2 OR b1 <= b2 OR c1 != c2" into ["a1 = a2", "b1 <= b2", "c1 != c2"] diff --git a/datafusion/physical-optimizer/src/pruning.rs b/datafusion/physical-optimizer/src/pruning.rs index b5287f3d33f3..8084772b90de 100644 --- a/datafusion/physical-optimizer/src/pruning.rs +++ b/datafusion/physical-optimizer/src/pruning.rs @@ -40,7 +40,9 @@ use datafusion_common::{ use datafusion_common::{Column, DFSchema}; use datafusion_expr_common::operator::Operator; use datafusion_physical_expr::utils::{collect_columns, Guarantee, LiteralGuarantee}; -use datafusion_physical_expr::{expressions as phys_expr, PhysicalExprRef}; +use datafusion_physical_expr::{ + expressions as phys_expr, snasphot_physical_expr, PhysicalExprRef, +}; use datafusion_physical_plan::{ColumnarValue, PhysicalExpr}; /// A source of runtime statistical information to [`PruningPredicate`]s. @@ -527,6 +529,7 @@ impl PruningPredicate { /// See the struct level documentation on [`PruningPredicate`] for more /// details. pub fn try_new(expr: Arc, schema: SchemaRef) -> Result { + let expr = snasphot_physical_expr(expr)?; let unhandled_hook = Arc::new(ConstantUnhandledPredicateHook::default()) as _; // build predicate expression once diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index 5244038b9ae2..f2e91dbbbe4e 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -32,6 +32,7 @@ use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use datafusion_common::Result; use datafusion_execution::TaskContext; +use datafusion_physical_expr::PhysicalExpr; use crate::coalesce::{BatchCoalescer, CoalescerState}; use crate::execution_plan::CardinalityEffect; @@ -212,6 +213,20 @@ impl ExecutionPlan for CoalesceBatchesExec { fn cardinality_effect(&self) -> CardinalityEffect { CardinalityEffect::Equal } + + fn push_down_filter( + &self, + expr: Arc, + ) -> Result>> { + // Try to push down to the input + if let Some(input) = self.input.push_down_filter(expr)? { + return Ok(Some(Arc::new(Self { + input, + ..self.clone() + }))); + } + Ok(None) + } } /// Stream for [`CoalesceBatchesExec`]. See [`CoalesceBatchesExec`] for more details. diff --git a/datafusion/physical-plan/src/dynamic_filters.rs b/datafusion/physical-plan/src/dynamic_filters.rs new file mode 100644 index 000000000000..4bfad498788e --- /dev/null +++ b/datafusion/physical-plan/src/dynamic_filters.rs @@ -0,0 +1,320 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ + any::Any, + hash::Hash, + sync::{Arc, RwLock}, +}; + +use datafusion_common::{ + tree_node::{Transformed, TransformedResult, TreeNode}, + Result, +}; +use datafusion_expr::ColumnarValue; +use datafusion_physical_expr::{expressions::lit, utils::conjunction, PhysicalExpr}; + +/// A source of dynamic runtime filters. +/// +/// During query execution, operators implementing this trait can provide +/// filter expressions that other operators can use to dynamically prune data. +/// +/// See `TopKDynamicFilterSource` in datafusion/physical-plan/src/topk/mod.rs for examples. +pub trait DynamicFilterSource: Send + Sync + std::fmt::Debug + 'static { + /// Take a snapshot of the current state of filtering, returning a non-dynamic PhysicalExpr. + /// This is used to e.g. serialize dynamic filters across the wire or to pass them into systems + /// that won't use the `PhysicalExpr` API (e.g. matching on the concrete types of the expressions like `PruningPredicate` does). + /// For example, it is expected that this returns a relatively simple expression such as `col1 > 5` for a TopK operator or + /// `col2 IN (1, 2, ... N)` for a HashJoin operator. + fn snapshot_current_filters(&self) -> Result>>; +} + +#[derive(Debug)] +pub struct DynamicFilterPhysicalExpr { + /// The children of this expression. + /// In particular, it is important that if the dynamic expression will reference any columns + /// those columns be marked as children of this expression so that the expression can be properly + /// bound to the schema. + children: Vec>, + /// Remapped children, if `PhysicalExpr::with_new_children` was called. + /// This is used to ensure that the children of the expression are always the same + /// as the children of the dynamic filter source. + remapped_children: Option>>, + /// The source of dynamic filters. + inner: Arc, + /// For testing purposes track the data type and nullability to make sure they don't change. + /// If they do, there's a bug in the implementation. + /// But this can have overhead in production, so it's only included in tests. + data_type: Arc>>, + nullable: Arc>>, +} + +impl std::fmt::Display for DynamicFilterPhysicalExpr { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "DynamicFilterPhysicalExpr") + } +} + +// Manually derive PartialEq and Hash to work around https://github.com/rust-lang/rust/issues/78808 +impl PartialEq for DynamicFilterPhysicalExpr { + fn eq(&self, other: &Self) -> bool { + self.current().eq(&other.current()) + } +} + +impl Eq for DynamicFilterPhysicalExpr {} + +impl Hash for DynamicFilterPhysicalExpr { + fn hash(&self, state: &mut H) { + self.current().hash(state) + } +} + +impl DynamicFilterPhysicalExpr { + pub fn new( + children: Vec>, + inner: Arc, + ) -> Self { + Self { + children, + remapped_children: None, + inner, + data_type: Arc::new(RwLock::new(None)), + nullable: Arc::new(RwLock::new(None)), + } + } + + fn current(&self) -> Arc { + let current = if let Ok(current) = self.inner.snapshot_current_filters() { + conjunction(current) + } else { + lit(false) + }; + if let Some(remapped_children) = &self.remapped_children { + // Remap children to the current children + // of the expression. + current + .transform_up(|expr| { + // Check if this is any of our original children + if let Some(pos) = self + .children + .iter() + .position(|c| c.as_ref() == expr.as_ref()) + { + // If so, remap it to the current children + // of the expression. + let new_child = Arc::clone(&remapped_children[pos]); + Ok(Transformed::yes(new_child)) + } else { + // Otherwise, just return the expression + Ok(Transformed::no(expr)) + } + }) + .data() + .expect("transformation is infallible") + } else { + current + } + } +} + +impl PhysicalExpr for DynamicFilterPhysicalExpr { + fn as_any(&self) -> &dyn Any { + self + } + + fn children(&self) -> Vec<&Arc> { + self.remapped_children + .as_ref() + .unwrap_or(&self.children) + .iter() + .collect() + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + Ok(Arc::new(Self { + children: self.children.clone(), + remapped_children: Some(children), + inner: Arc::clone(&self.inner), + data_type: Arc::clone(&self.data_type), + nullable: Arc::clone(&self.nullable), + })) + } + + fn data_type( + &self, + input_schema: &arrow::datatypes::Schema, + ) -> Result { + let res = self.current().data_type(input_schema)?; + #[cfg(test)] + { + use datafusion_common::internal_err; + // Check if the data type has changed. + let mut data_type_lock = self + .data_type + .write() + .expect("Failed to acquire write lock for data_type"); + if let Some(existing) = &*data_type_lock { + if existing != &res { + // If the data type has changed, we have a bug. + return internal_err!( + "DynamicFilterPhysicalExpr data type has changed unexpectedly. \ + Expected: {existing:?}, Actual: {res:?}" + ); + } + } else { + *data_type_lock = Some(res.clone()); + } + } + Ok(res) + } + + fn nullable(&self, input_schema: &arrow::datatypes::Schema) -> Result { + let res = self.current().nullable(input_schema)?; + #[cfg(test)] + { + use datafusion_common::internal_err; + // Check if the nullability has changed. + let mut nullable_lock = self + .nullable + .write() + .expect("Failed to acquire write lock for nullable"); + if let Some(existing) = *nullable_lock { + if existing != res { + // If the nullability has changed, we have a bug. + return internal_err!( + "DynamicFilterPhysicalExpr nullability has changed unexpectedly. \ + Expected: {existing}, Actual: {res}" + ); + } + } else { + *nullable_lock = Some(res); + } + } + Ok(res) + } + + fn evaluate( + &self, + batch: &arrow::record_batch::RecordBatch, + ) -> Result { + let current = self.current(); + #[cfg(test)] + { + // Ensure that we are not evaluating after the expression has changed. + let schema = batch.schema(); + self.nullable(&schema)?; + self.data_type(&schema)?; + }; + current.evaluate(batch) + } + + fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if let Ok(inner) = self.inner.snapshot_current_filters() { + conjunction(inner).fmt_sql(f) + } else { + write!(f, "dynamic_filter_expr()") // What do we want to do here? + } + } + + fn snapshot(&self) -> Result>> { + // Return the current expression as a snapshot. + Ok(Some(self.current())) + } +} + +#[cfg(test)] +mod test { + use arrow::array::RecordBatch; + use datafusion_common::ScalarValue; + + use super::*; + + #[test] + fn test_dynamic_filter_physical_expr_misbehaves_data_type_nullable() { + #[derive(Debug)] + struct MockDynamicFilterSource { + current_expr: Arc>>, + } + + impl DynamicFilterSource for MockDynamicFilterSource { + fn snapshot_current_filters(&self) -> Result>> { + let expr = self.current_expr.read().unwrap().clone(); + Ok(vec![expr]) + } + } + + let source = Arc::new(MockDynamicFilterSource { + current_expr: Arc::new(RwLock::new(lit(42) as Arc)), + }); + let dynamic_filter = DynamicFilterPhysicalExpr::new( + vec![], + Arc::clone(&source) as Arc, + ); + + // First call to data_type and nullable should set the initial values. + let initial_data_type = dynamic_filter + .data_type(&arrow::datatypes::Schema::empty()) + .unwrap(); + let initial_nullable = dynamic_filter + .nullable(&arrow::datatypes::Schema::empty()) + .unwrap(); + + // Call again and expect no change. + let second_data_type = dynamic_filter + .data_type(&arrow::datatypes::Schema::empty()) + .unwrap(); + let second_nullable = dynamic_filter + .nullable(&arrow::datatypes::Schema::empty()) + .unwrap(); + assert_eq!( + initial_data_type, second_data_type, + "Data type should not change on second call." + ); + assert_eq!( + initial_nullable, second_nullable, + "Nullability should not change on second call." + ); + + // Now change the current expression to something else. + { + let mut current = source.current_expr.write().unwrap(); + *current = lit(ScalarValue::Utf8(None)) as Arc; + } + // Check that we error if we call data_type, nullable or evaluate after changing the expression. + assert!( + dynamic_filter + .data_type(&arrow::datatypes::Schema::empty()) + .is_err(), + "Expected err when data_type is called after changing the expression." + ); + assert!( + dynamic_filter + .nullable(&arrow::datatypes::Schema::empty()) + .is_err(), + "Expected err when nullable is called after changing the expression." + ); + let batch = RecordBatch::new_empty(Arc::new(arrow::datatypes::Schema::empty())); + assert!( + dynamic_filter.evaluate(&batch).is_err(), + "Expected err when evaluate is called after changing the expression." + ); + } +} diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 2bc5706ee0e1..20becb330737 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -467,6 +467,13 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { ) -> Result>> { Ok(None) } + + fn push_down_filter( + &self, + _expr: Arc, + ) -> Result>> { + Ok(None) + } } /// [`ExecutionPlan`] Invariant Level diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index a8a9973ea043..e60c70e3d3d3 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -50,8 +50,8 @@ use datafusion_physical_expr::expressions::BinaryExpr; use datafusion_physical_expr::intervals::utils::check_support; use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::{ - analyze, split_conjunction, AcrossPartitions, AnalysisContext, ConstExpr, - ExprBoundaries, PhysicalExpr, + analyze, conjunction, split_conjunction, AcrossPartitions, AnalysisContext, + ConstExpr, ExprBoundaries, PhysicalExpr, }; use datafusion_physical_expr_common::physical_expr::fmt_sql; @@ -433,6 +433,22 @@ impl ExecutionPlan for FilterExec { } try_embed_projection(projection, self) } + + fn push_down_filter( + &self, + expr: Arc, + ) -> Result>> { + let mut input = Arc::clone(&self.input); + if let Some(new_input) = input.push_down_filter(Arc::clone(&expr))? { + input = new_input; + } + let new_predicate = conjunction([Arc::clone(&self.predicate), expr]); + Ok(Some(Arc::new(Self { + input, + predicate: Arc::clone(&new_predicate), + ..self.clone() + }))) + } } impl EmbeddedProjection for FilterExec { diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 04fbd06fabcd..2ccfe0b73ed6 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -36,7 +36,7 @@ pub use datafusion_expr::{Accumulator, ColumnarValue}; pub use datafusion_physical_expr::window::WindowExpr; use datafusion_physical_expr::PhysicalSortExpr; pub use datafusion_physical_expr::{ - expressions, Distribution, Partitioning, PhysicalExpr, + expressions, snasphot_physical_expr, Distribution, Partitioning, PhysicalExpr, }; pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay}; @@ -51,6 +51,7 @@ pub use crate::stream::EmptyRecordBatchStream; pub use crate::topk::TopK; pub use crate::visitor::{accept, visit_execution_plan, ExecutionPlanVisitor}; +mod dynamic_filters; mod ordering; mod render_tree; mod topk; diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index 1d3e23ea9097..9a9e0db9a3fc 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -271,6 +271,20 @@ impl ExecutionPlan for ProjectionExec { Ok(Some(Arc::new(projection.clone()))) } } + + fn push_down_filter( + &self, + expr: Arc, + ) -> Result>> { + // Try to push down to the input + if let Some(input) = self.input.push_down_filter(expr)? { + return Ok(Some(Arc::new(Self { + input, + ..self.clone() + }))); + } + Ok(None) + } } /// If 'e' is a direct column reference, returns the field level diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index ebc751201378..6e7d885f58e1 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -723,6 +723,20 @@ impl ExecutionPlan for RepartitionExec { new_partitioning, )?))) } + + fn push_down_filter( + &self, + expr: Arc, + ) -> Result>> { + // Try to push down to the input + if let Some(input) = self.input.push_down_filter(expr)? { + return Ok(Some(Arc::new(Self { + input, + ..self.clone() + }))); + } + Ok(None) + } } impl RepartitionExec { diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 1072e9abf437..56262aed6e0f 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -57,7 +57,7 @@ use datafusion_execution::disk_manager::RefCountedTempFile; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_execution::TaskContext; -use datafusion_physical_expr::LexOrdering; +use datafusion_physical_expr::{LexOrdering, PhysicalExpr}; use datafusion_physical_expr_common::sort_expr::LexRequirement; use futures::{StreamExt, TryStreamExt}; @@ -1197,35 +1197,55 @@ impl ExecutionPlan for SortExec { ) -> Result { trace!("Start SortExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id()); - let mut input = self.input.execute(partition, Arc::clone(&context))?; - - let execution_options = &context.session_config().options().execution; - - trace!("End SortExec's input.execute for partition: {}", partition); - let sort_satisfied = self .input .equivalence_properties() .ordering_satisfy_requirement(&LexRequirement::from(self.expr.clone())); + let input_exec = Arc::clone(&self.input); + + let execution_options = &context.session_config().options().execution; + + trace!("End SortExec's input.execute for partition: {}", partition); + match (sort_satisfied, self.fetch.as_ref()) { - (true, Some(fetch)) => Ok(Box::pin(LimitStream::new( - input, - 0, - Some(*fetch), - BaselineMetrics::new(&self.metrics_set, partition), - ))), - (true, None) => Ok(input), + (true, Some(fetch)) => { + let input = input_exec.execute(partition, Arc::clone(&context))?; + Ok(Box::pin(LimitStream::new( + input, + 0, + Some(*fetch), + BaselineMetrics::new(&self.metrics_set, partition), + ))) + } + (true, None) => self.input.execute(partition, Arc::clone(&context)), (false, Some(fetch)) => { + let schema = input_exec.schema(); let mut topk = TopK::try_new( partition, - input.schema(), + schema, self.expr.clone(), *fetch, context.session_config().batch_size(), context.runtime_env(), &self.metrics_set, )?; + let input_exec = if context + .session_config() + .options() + .optimizer + .enable_dynamic_filter_pushdown + { + // Try to push down the dynamic filter. If the execution plan doesn't + // support it, push_down_filter will return None and we'll + // keep the original input_exec. + input_exec + .push_down_filter(topk.dynamic_filter_source())? + .unwrap_or(input_exec) + } else { + input_exec + }; + let mut input = input_exec.execute(partition, Arc::clone(&context))?; Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema(), futures::stream::once(async move { @@ -1239,6 +1259,7 @@ impl ExecutionPlan for SortExec { ))) } (false, None) => { + let mut input = input_exec.execute(partition, Arc::clone(&context))?; let mut sorter = ExternalSorter::new( partition, input.schema(), @@ -1319,6 +1340,28 @@ impl ExecutionPlan for SortExec { .with_preserve_partitioning(self.preserve_partitioning()), ))) } + + // Pass though filter pushdown. + // This often happens in partitioned plans with a TopK because we end up with 1 TopK per partition + a final TopK at the end. + // Implementing this pass-through allows global/top/final TopK to push down filters to the partitions. + fn push_down_filter( + &self, + expr: Arc, + ) -> Result>> { + let new_input = self.input.push_down_filter(expr)?; + if let Some(new_input) = new_input { + Ok(Some(Arc::new(SortExec { + input: new_input, + expr: self.expr.clone(), + metrics_set: self.metrics_set.clone(), + preserve_partitioning: self.preserve_partitioning, + fetch: self.fetch, + cache: self.cache.clone(), + }))) + } else { + Ok(None) + } + } } #[cfg(test)] diff --git a/datafusion/physical-plan/src/topk/mod.rs b/datafusion/physical-plan/src/topk/mod.rs index 85de1eefce2e..3d45b5277ecf 100644 --- a/datafusion/physical-plan/src/topk/mod.rs +++ b/datafusion/physical-plan/src/topk/mod.rs @@ -17,27 +17,34 @@ //! TopK: Combination of Sort / LIMIT -use arrow::{ - compute::interleave, - row::{RowConverter, Rows, SortField}, -}; use std::mem::size_of; -use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc}; +use std::sync::{Arc, RwLock}; +use std::{cmp::Ordering, collections::BinaryHeap}; -use super::metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder}; -use crate::spill::get_record_batch_memory_size; -use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream}; use arrow::array::{Array, ArrayRef, RecordBatch}; use arrow::datatypes::SchemaRef; -use datafusion_common::HashMap; +use arrow::{ + compute::interleave, + row::{RowConverter, Rows, SortField}, +}; +use arrow_schema::SortOptions; use datafusion_common::Result; +use datafusion_common::{internal_err, DataFusionError, HashMap}; use datafusion_execution::{ memory_pool::{MemoryConsumer, MemoryReservation}, runtime_env::RuntimeEnv, }; -use datafusion_physical_expr::PhysicalSortExpr; +use datafusion_expr::ColumnarValue; +use datafusion_expr::Operator; +use datafusion_physical_expr::expressions::{is_not_null, is_null, lit, BinaryExpr}; +use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; use datafusion_physical_expr_common::sort_expr::LexOrdering; +use super::metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder}; +use crate::dynamic_filters::{DynamicFilterPhysicalExpr, DynamicFilterSource}; +use crate::spill::get_record_batch_memory_size; +use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream}; + /// Global TopK /// /// # Background @@ -90,6 +97,18 @@ pub struct TopK { scratch_rows: Rows, /// stores the top k values and their sort key values, in order heap: TopKHeap, + /// stores the current filters derived from this TopK that can be pushed down + filters: Option>, +} + +impl std::fmt::Debug for TopK { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TopK") + .field("schema", &self.schema) + .field("batch_size", &self.batch_size) + .field("expr", &self.expr) + .finish() + } } impl TopK { @@ -137,9 +156,26 @@ impl TopK { row_converter, scratch_rows, heap: TopKHeap::new(k, batch_size, schema), + filters: None, }) } + pub(crate) fn dynamic_filter_source(&mut self) -> Arc { + match self.filters { + Some(ref filters) => filters.as_dynamic_physical_expr(), + None => { + let children = self + .expr + .iter() + .map(|e| Arc::clone(&e.expr)) + .collect::>(); + let filters = Arc::new(TopKDynamicFilterSource::new(children)); + self.filters = Some(Arc::clone(&filters)); + filters.as_dynamic_physical_expr() + } + } + } + /// Insert `batch`, remembering if any of its values are among /// the top k seen so far. pub fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> { @@ -164,6 +200,7 @@ impl TopK { // Idea: filter out rows >= self.heap.max() early (before passing to `RowConverter`) // this avoids some work and also might be better vectorizable. let mut batch_entry = self.heap.register_batch(batch); + let mut need_to_update_dynamic_filters = false; for (index, row) in rows.iter().enumerate() { match self.heap.max() { // heap has k items, and the new row is greater than the @@ -173,11 +210,23 @@ impl TopK { None | Some(_) => { self.heap.add(&mut batch_entry, row, index); self.metrics.row_replacements.add(1); + need_to_update_dynamic_filters = true; } } } self.heap.insert_batch_entry(batch_entry); + if need_to_update_dynamic_filters { + if let Some(filters) = self.filters.as_ref() { + if let Some(threasholds) = self.heap.get_threshold_values(&self.expr)? { + if let Some(predicate) = Self::calculate_dynamic_filters(threasholds)? + { + filters.update_filters(predicate)?; + } + } + } + } + // conserve memory self.heap.maybe_compact()?; @@ -186,6 +235,90 @@ impl TopK { Ok(()) } + fn calculate_dynamic_filters( + thresholds: Vec, + ) -> Result>> { + // Create filter expressions for each threshold + let mut filters: Vec> = + Vec::with_capacity(thresholds.len()); + + let mut prev_sort_expr: Option> = None; + for threshold in thresholds { + // Create the appropriate operator based on sort order + let op = if threshold.sort_options.descending { + // For descending sort, we want col > threshold (exclude smaller values) + Operator::Gt + } else { + // For ascending sort, we want col < threshold (exclude larger values) + Operator::Lt + }; + + let value_null = threshold.value.is_null(); + + let comparison = Arc::new(BinaryExpr::new( + Arc::clone(&threshold.expr), + op, + lit(threshold.value.clone()), + )); + + let comparison_with_null = + match (threshold.sort_options.nulls_first, value_null) { + // For nulls first, transform to (threshold.value is not null) and (threshold.expr is null or comparison) + (true, true) => lit(false), + (true, false) => Arc::new(BinaryExpr::new( + is_null(Arc::clone(&threshold.expr))?, + Operator::Or, + comparison, + )), + // For nulls last, transform to (threshold.value is null and threshold.expr is not null) + // or (threshold.value is not null and comparison) + (false, true) => is_not_null(Arc::clone(&threshold.expr))?, + (false, false) => comparison, + }; + + let mut eq_expr = Arc::new(BinaryExpr::new( + Arc::clone(&threshold.expr), + Operator::Eq, + lit(threshold.value.clone()), + )); + + if value_null { + eq_expr = Arc::new(BinaryExpr::new( + is_null(Arc::clone(&threshold.expr))?, + Operator::Or, + eq_expr, + )); + } + + // For a query like order by a, b, the filter for column `b` is only applied if + // the condition a = threshold.value (considering null equality) is met. + // Therefore, we add equality predicates for all preceding fields to the filter logic of the current field, + // and include the current field's equality predicate in `prev_sort_expr` for use with subsequent fields. + match prev_sort_expr.take() { + None => { + prev_sort_expr = Some(eq_expr); + filters.push(comparison_with_null); + } + Some(p) => { + filters.push(Arc::new(BinaryExpr::new( + Arc::clone(&p), + Operator::And, + comparison_with_null, + ))); + + prev_sort_expr = + Some(Arc::new(BinaryExpr::new(p, Operator::And, eq_expr))); + } + } + } + + let dynamic_predicate = filters + .into_iter() + .reduce(|a, b| Arc::new(BinaryExpr::new(a, Operator::Or, b))); + + Ok(dynamic_predicate) + } + /// Returns the top k results broken into `batch_size` [`RecordBatch`]es, consuming the heap pub fn emit(self) -> Result { let Self { @@ -197,6 +330,7 @@ impl TopK { row_converter: _, scratch_rows: _, mut heap, + filters: _, } = self; let _timer = metrics.baseline.elapsed_compute().timer(); // time updated on drop @@ -270,8 +404,18 @@ struct TopKHeap { owned_bytes: usize, } +/// Holds threshold value and sort order information for a column +struct ColumnThreshold { + /// The column expression + pub expr: Arc, + /// The threshold value + pub value: datafusion_common::ScalarValue, + /// Sort options + pub sort_options: SortOptions, +} + impl TopKHeap { - fn new(k: usize, batch_size: usize, schema: SchemaRef) -> Self { + pub fn new(k: usize, batch_size: usize, schema: SchemaRef) -> Self { assert!(k > 0); Self { k, @@ -282,6 +426,54 @@ impl TopKHeap { } } + /// Get threshold values for all columns in the given sort expressions. + /// If the heap does not yet have k items, returns None. + /// Otherwise, returns the threshold values from the max row in the heap. + pub fn get_threshold_values( + &self, + sort_exprs: &[PhysicalSortExpr], + ) -> Result>> { + // If the heap doesn't have k elements yet, we can't create thresholds + let max_row = match self.max() { + Some(row) => row, + None => return Ok(None), + }; + + // Get the batch that contains the max row + let batch_entry = match self.store.get(max_row.batch_id) { + Some(entry) => entry, + None => return internal_err!("Invalid batch ID in TopKRow"), + }; + + // Extract threshold values for each sort expression + let mut thresholds = Vec::with_capacity(sort_exprs.len()); + for sort_expr in sort_exprs { + // Extract the value for this column from the max row + let expr = Arc::clone(&sort_expr.expr); + let value = expr.evaluate(&batch_entry.batch.slice(max_row.index, 1))?; + + // Convert to scalar value - should be a single value since we're evaluating on a single row batch + let scalar = match value { + ColumnarValue::Scalar(scalar) => scalar, + ColumnarValue::Array(array) if array.len() == 1 => { + // Extract the first (and only) value from the array + datafusion_common::ScalarValue::try_from_array(&array, 0)? + } + array => { + return internal_err!("Expected a scalar value, got {:?}", array) + } + }; + + thresholds.push(ColumnThreshold { + expr, + value: scalar, + sort_options: sort_expr.options, + }); + } + + Ok(Some(thresholds)) + } + /// Register a [`RecordBatch`] with the heap, returning the /// appropriate entry pub fn register_batch(&mut self, batch: RecordBatch) -> RecordBatchEntry { @@ -297,7 +489,7 @@ impl TopKHeap { /// Returns the largest value stored by the heap if there are k /// items, otherwise returns None. Remember this structure is /// keeping the "smallest" k values - fn max(&self) -> Option<&TopKRow> { + pub fn max(&self) -> Option<&TopKRow> { if self.inner.len() < self.k { None } else { @@ -509,7 +701,7 @@ impl TopKRow { } /// Returns a slice to the owned row value - fn row(&self) -> &[u8] { + pub fn row(&self) -> &[u8] { self.row.as_slice() } } @@ -529,7 +721,7 @@ impl Ord for TopKRow { } #[derive(Debug)] -struct RecordBatchEntry { +pub struct RecordBatchEntry { id: u32, batch: RecordBatch, // for this batch, how many times has it been used @@ -644,10 +836,101 @@ impl RecordBatchStore { } } +/// Pushdown of dynamic fitlers from TopK operators is used to speed up queries +/// such as `SELECT * FROM table ORDER BY col DESC LIMIT 10` by pushing down the +/// threshold values for the sort columns to the data source. +/// That is, the TopK operator will keep track of the top 10 values for the sort +/// and before a new file is opened it's statitics will be checked against the +/// threshold values to determine if the file can be skipped and predicate pushdown +/// will use these to skip rows during the scan. +/// +/// For example, imagine this data gets created if multiple sources with clock skews, +/// network delays, etc. are writing data and you don't do anything fancy to guarantee +/// perfect sorting by `timestamp` (i.e. you naively write out the data to Parquet, maybe do some compaction, etc.). +/// The point is that 99% of yesterday's files have a `timestamp` smaller than 99% of today's files +/// but there may be a couple seconds of overlap between files. +/// To be concrete, let's say this is our data: +// +// | file | min | max | +// |------|-----|-----| +// | 1 | 1 | 10 | +// | 2 | 9 | 19 | +// | 3 | 20 | 31 | +// | 4 | 30 | 35 | +// +// Ideally a [`TableProvider`] is able to use file level stats or other methods to roughly order the files +// within each partition / file group such that we start with the newest / largest `timestamp`s. +// If this is not possible the optimization still works but is less efficient and harder to visualize, +// so for this example let's assume that we process 1 file at a time and we started with file 4. +// After processing file 4 let's say we have 10 values in our TopK heap, the smallest of which is 30. +// The TopK operator will then push down the filter `timestamp < 30` down the tree of [`ExecutionPlan`]s +// and if the data source supports dynamic filter pushdown it will accept a reference to this [`DynamicPhysicalExprSource`] +// and when it goes to open file 3 it will ask the [`DynamicPhysicalExprSource`] for the current filters. +// Since file 3 may contain values larger than 30 we cannot skip it entirely, +// but scanning it may still be more efficient due to page pruning and other optimizations. +// Once we get to file 2 however we can skip it entirely because we know that all values in file 2 are smaller than 30. +// The same goes for file 1. +// So this optimization just saved us 50% of the work of scanning the data. +#[derive(Debug, Clone)] +struct TopKDynamicFilterSource { + /// The children of the dynamic filters produced by this TopK. + /// In particular, this is the columns that are being sorted, derived from the sorting expressions. + children: Vec>, + /// The current filters derived from this TopK + predicate: Arc>>, +} + +impl TopKDynamicFilterSource { + fn new(children: Vec>) -> Self { + Self { + children, + predicate: Arc::new(RwLock::new(lit(true))), + } + } + + fn update_filters(&self, predicate: Arc) -> Result<()> { + let mut current_predicate = self.predicate.write().map_err(|_| { + DataFusionError::Internal( + "Failed to acquire write lock on TopKDynamicPhysicalExprSource" + .to_string(), + ) + })?; + *current_predicate = predicate; + Ok(()) + } +} + +impl TopKDynamicFilterSource { + fn as_dynamic_physical_expr(&self) -> Arc { + let new = self.clone(); + // Transform the sort expresions into referenced columns + let children = self.children.clone(); + Arc::new(DynamicFilterPhysicalExpr::new(children, Arc::new(new))) + } +} + +impl DynamicFilterSource for TopKDynamicFilterSource { + fn snapshot_current_filters(&self) -> Result>> { + let predicate = self + .predicate + .read() + .map_err(|_| { + DataFusionError::Internal( + "Failed to acquire read lock on TopKDynamicPhysicalExprSource" + .to_string(), + ) + })? + .clone(); + Ok(vec![predicate]) + } +} + #[cfg(test)] mod tests { use super::*; + use crate::expressions::col; use arrow::array::{Float64Array, Int32Array, RecordBatch}; + use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Schema}; /// This test ensures the size calculation is correct for RecordBatches with multiple columns. @@ -681,4 +964,65 @@ mod tests { record_batch_store.unuse(0); assert_eq!(record_batch_store.batches_size, 0); } + + #[test] + fn test_topk_as_dynamic_filter_source() { + let schema = Arc::new(Schema::new(vec![ + Field::new("col1", DataType::Int32, true), + Field::new("col2", DataType::Float64, false), + ])); + + let runtime = Arc::new(RuntimeEnv::default()); + let metrics = ExecutionPlanMetricsSet::new(); + + // Create a TopK with descending sort on col2 + let sort_expr = vec![PhysicalSortExpr { + expr: Arc::new(datafusion_physical_expr::expressions::Column::new( + "col2", 1, + )), + options: SortOptions { + descending: true, + nulls_first: false, + }, + }]; + + let mut topk = TopK::try_new( + 0, + Arc::clone(&schema), + sort_expr.into(), + 5, // k=5 + 100, // batch_size + runtime, + &metrics, + ) + .unwrap(); + + // Initially there should be no filters (empty heap) + let filter = topk.dynamic_filter_source().snapshot().unwrap().unwrap(); + assert!(filter.eq(&lit(true)), "{filter:?}"); + + // Insert some data to fill the heap + let col1 = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + let col2 = + Float64Array::from(vec![10.0, 9.0, 8.0, 7.0, 6.0, 5.0, 4.0, 3.0, 2.0, 1.0]); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(col1), Arc::new(col2)], + ) + .unwrap(); + + // Insert the data into TopK + topk.insert_batch(batch).unwrap(); + + // Now there should be a filter + let filter = topk.dynamic_filter_source().snapshot().unwrap().unwrap(); + + // We expect a filter for col2 > 6.0 (since we're doing descending sort and have 5 values) + let expected = Arc::new(BinaryExpr::new( + col("col2", &schema).unwrap(), + Operator::Gt, + lit(6.0), + )) as Arc; + assert!(filter.eq(&expected), "{filter:?}"); + } } diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index c196595eeed4..8eccf32fa3c7 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -28,7 +28,9 @@ use datafusion::physical_plan::expressions::{ }; use datafusion::physical_plan::udaf::AggregateFunctionExpr; use datafusion::physical_plan::windows::{PlainAggregateWindowExpr, WindowUDFExpr}; -use datafusion::physical_plan::{Partitioning, PhysicalExpr, WindowExpr}; +use datafusion::physical_plan::{ + snasphot_physical_expr, Partitioning, PhysicalExpr, WindowExpr, +}; use datafusion::{ datasource::{ file_format::{csv::CsvSink, json::JsonSink}, @@ -210,6 +212,7 @@ pub fn serialize_physical_expr( value: &Arc, codec: &dyn PhysicalExtensionCodec, ) -> Result { + let value = snasphot_physical_expr(value.clone())?; let expr = value.as_any(); if let Some(expr) = expr.downcast_ref::() { @@ -368,7 +371,7 @@ pub fn serialize_physical_expr( }) } else { let mut buf: Vec = vec![]; - match codec.try_encode_expr(value, &mut buf) { + match codec.try_encode_expr(&value, &mut buf) { Ok(_) => { let inputs: Vec = value .children() diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 496f24abf6ed..f709c3875a9a 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -241,6 +241,7 @@ datafusion.explain.show_statistics false datafusion.optimizer.allow_symmetric_joins_without_pruning true datafusion.optimizer.default_filter_selectivity 20 datafusion.optimizer.enable_distinct_aggregation_soft_limit true +datafusion.optimizer.enable_dynamic_filter_pushdown true datafusion.optimizer.enable_round_robin_repartition true datafusion.optimizer.enable_topk_aggregation true datafusion.optimizer.expand_views_at_output false @@ -340,6 +341,7 @@ datafusion.explain.show_statistics false When set to true, the explain statement datafusion.optimizer.allow_symmetric_joins_without_pruning true Should DataFusion allow symmetric hash joins for unbounded data sources even when its inputs do not have any ordering or filtering If the flag is not enabled, the SymmetricHashJoin operator will be unable to prune its internal buffers, resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right, RightAnti, and RightSemi - being produced only at the end of the execution. This is not typical in stream processing. Additionally, without proper design for long runner execution, all types of joins may encounter out-of-memory errors. datafusion.optimizer.default_filter_selectivity 20 The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. +datafusion.optimizer.enable_dynamic_filter_pushdown true When set to true attempts to push down dynamic filters generated by operators into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. datafusion.optimizer.enable_round_robin_repartition true When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores datafusion.optimizer.enable_topk_aggregation true When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible datafusion.optimizer.expand_views_at_output false When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 68e21183938b..ea18318dd699 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -99,6 +99,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. | | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | | datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible | +| datafusion.optimizer.enable_dynamic_filter_pushdown | true | When set to true attempts to push down dynamic filters generated by operators into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. | | datafusion.optimizer.filter_null_join_keys | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. | | datafusion.optimizer.repartition_aggregations | true | Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level | | datafusion.optimizer.repartition_file_min_size | 10485760 | Minimum total files size in bytes to perform file scan repartitioning. | From 74813118d229922b948672c876fbbb94861649a7 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 30 Mar 2025 16:13:21 -0500 Subject: [PATCH 2/3] lint --- datafusion/proto/src/physical_plan/to_proto.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 8eccf32fa3c7..1e5a27ec4eb6 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -212,7 +212,7 @@ pub fn serialize_physical_expr( value: &Arc, codec: &dyn PhysicalExtensionCodec, ) -> Result { - let value = snasphot_physical_expr(value.clone())?; + let value = snasphot_physical_expr(Arc::clone(value))?; let expr = value.as_any(); if let Some(expr) = expr.downcast_ref::() { From 204479372c1c2d4f43da5924bd13e14d768a1f98 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 31 Mar 2025 21:06:52 -0500 Subject: [PATCH 3/3] wip --- datafusion/physical-optimizer/src/lib.rs | 1 + .../physical-plan/src/coalesce_batches.rs | 16 +---- .../physical-plan/src/execution_plan.rs | 54 +++++++++++++++- datafusion/physical-plan/src/filter.rs | 61 +++++++++++++++---- datafusion/physical-plan/src/projection.rs | 14 ----- .../physical-plan/src/repartition/mod.rs | 14 +---- 6 files changed, 105 insertions(+), 55 deletions(-) diff --git a/datafusion/physical-optimizer/src/lib.rs b/datafusion/physical-optimizer/src/lib.rs index 35503f3b0b5f..5a43d7118d63 100644 --- a/datafusion/physical-optimizer/src/lib.rs +++ b/datafusion/physical-optimizer/src/lib.rs @@ -29,6 +29,7 @@ pub mod coalesce_batches; pub mod combine_partial_final_agg; pub mod enforce_distribution; pub mod enforce_sorting; +pub mod filter_pushdown; pub mod join_selection; pub mod limit_pushdown; pub mod limited_distinct_aggregation; diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index f2e91dbbbe4e..e639d170d1fd 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -35,7 +35,7 @@ use datafusion_execution::TaskContext; use datafusion_physical_expr::PhysicalExpr; use crate::coalesce::{BatchCoalescer, CoalescerState}; -use crate::execution_plan::CardinalityEffect; +use crate::execution_plan::{CardinalityEffect, ExecutionPlanFilterPushdownResult, FilterPushdownResult}; use futures::ready; use futures::stream::{Stream, StreamExt}; @@ -214,18 +214,8 @@ impl ExecutionPlan for CoalesceBatchesExec { CardinalityEffect::Equal } - fn push_down_filter( - &self, - expr: Arc, - ) -> Result>> { - // Try to push down to the input - if let Some(input) = self.input.push_down_filter(expr)? { - return Ok(Some(Arc::new(Self { - input, - ..self.clone() - }))); - } - Ok(None) + fn supports_filter_pushdown(&self) -> bool { + true } } diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 20becb330737..7aa53f874015 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -468,14 +468,62 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { Ok(None) } - fn push_down_filter( - &self, - _expr: Arc, + /// Returns a set of filters that this operator owns but would like to be pushed down. + /// For example, a `TopK` operator may produce dynamic filters that reference it's currrent state, + /// while a `FilterExec` will just hand of the filters it has as is. + /// The default implementation returns an empty vector. + fn filters_for_pushdown(&self) -> Result>> { + Ok(Vec::new()) + } + + /// After we've attempted to push down filters into this node's children + /// this will be called with the result for each filter that this node gave in `filters_for_pushdown`. + /// The node should update itself to possibly drop filters that were pushed down as `Exact`. + fn with_filter_pushdown_result( + self: Arc, + _pushdown: &[FilterPushdownResult] ) -> Result>> { Ok(None) } + + /// Push down the given filters into this `ExecutionPlan`. + /// This is called after `with_filter_pushdown_result`. + /// Operators can accept filters from their parents, either as Exact or Unsupported. + /// If the operator accepts a filter as Exact, it should return a new `ExecutionPlan` with the filter applied + /// and the parent that generated the filter might not apply it anymore. + fn push_down_filters_from_parents( + &self, + _filters: &[&Arc], + ) -> Result> { + Ok(None) + } + + /// Returns `true` if this `ExecutionPlan` allows filter pushdown to flow throught it and `false` otherwise. + /// Nodes such as aggregations cannot have filters pushed down through them, so they return `false`. + /// On the other hand nodes such as repartitions can have filters pushed down through them, so they return `true`. + /// The default implementation returns `false`. + fn supports_filter_pushdown(&self) -> bool { + false + } +} + + +pub struct FilterPushdownResult { + /// The new execution plan after the filter pushdown + pub plan: Arc, + /// The filter pushdown result for each filter + pub pushdown: Vec, +} + +pub enum FilterPushdownSupport { + /// A node such as an aggregation that does not accept filter pushdown. + Inexact, + /// A node such as a repartition that accepts filter pushdown. + /// This includes a new execution plan and a filter pushdown result for each filter. + Exact, } + /// [`ExecutionPlan`] Invariant Level /// /// What set of assertions ([Invariant]s) holds for a particular `ExecutionPlan` diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index e60c70e3d3d3..6cd7f8dab52f 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -25,7 +25,7 @@ use super::{ RecordBatchStream, SendableRecordBatchStream, Statistics, }; use crate::common::can_project; -use crate::execution_plan::CardinalityEffect; +use crate::execution_plan::{CardinalityEffect, ExecutionPlanFilterPushdownResult, FilterPushdownResult}; use crate::projection::{ make_with_child, try_embed_projection, update_expr, EmbeddedProjection, ProjectionExec, @@ -434,20 +434,55 @@ impl ExecutionPlan for FilterExec { try_embed_projection(projection, self) } - fn push_down_filter( - &self, - expr: Arc, + fn filters_for_pushdown(&self) -> Result>> { + Ok(split_conjunction(self.predicate()).iter().map(|f| Arc::clone(f)).collect()) + } + + fn with_filter_pushdown_result( + self: Arc, + pushdown: &[FilterPushdownResult] ) -> Result>> { - let mut input = Arc::clone(&self.input); - if let Some(new_input) = input.push_down_filter(Arc::clone(&expr))? { - input = new_input; + // Only keep filters who's index maps to the pushdown result Unsupported + let new_filters = self.filters_for_pushdown()?.iter().zip(pushdown.iter()).filter_map(|(f, p)| { + if let FilterPushdownResult::Unsupported = p { + Some(Arc::clone(&f)) + } else { + None + } + }).collect::>(); + + if new_filters.is_empty() { + return Ok(Some(Arc::clone(self.input()))) } - let new_predicate = conjunction([Arc::clone(&self.predicate), expr]); - Ok(Some(Arc::new(Self { - input, - predicate: Arc::clone(&new_predicate), - ..self.clone() - }))) + + let predicate = conjunction(new_filters.into_iter()); + + let new = FilterExec::try_new(predicate, Arc::clone(self.input())) + .and_then(|e| { + let selectivity = e.default_selectivity(); + e.with_default_selectivity(selectivity) + }) + .and_then(|e| e.with_projection(self.projection().cloned())) + .map(|e| Arc::new(e) as _)?; + Ok(Some(new)) + } + + fn push_down_filters( + &self, + filters: &[&Arc], + ) -> Result> { + let new_predicates = conjunction(std::iter::once(Arc::clone(&self.predicate)).chain(filters.iter().map(|f| Arc::clone(f)))); + Ok( + Some( + ExecutionPlanFilterPushdownResult { + plan: Arc::new(Self { + predicate: new_predicates, + ..self.clone() + }), + pushdown: vec![FilterPushdownResult::Exact; filters.len()], + } + ) + ) } } diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index 9a9e0db9a3fc..1d3e23ea9097 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -271,20 +271,6 @@ impl ExecutionPlan for ProjectionExec { Ok(Some(Arc::new(projection.clone()))) } } - - fn push_down_filter( - &self, - expr: Arc, - ) -> Result>> { - // Try to push down to the input - if let Some(input) = self.input.push_down_filter(expr)? { - return Ok(Some(Arc::new(Self { - input, - ..self.clone() - }))); - } - Ok(None) - } } /// If 'e' is a direct column reference, returns the field level diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 6e7d885f58e1..54cd2bd538e4 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -724,18 +724,8 @@ impl ExecutionPlan for RepartitionExec { )?))) } - fn push_down_filter( - &self, - expr: Arc, - ) -> Result>> { - // Try to push down to the input - if let Some(input) = self.input.push_down_filter(expr)? { - return Ok(Some(Arc::new(Self { - input, - ..self.clone() - }))); - } - Ok(None) + fn supports_filter_pushdown(&self) -> bool { + true } }