Skip to content

Commit a61a9c2

Browse files
authored
fix: Ensure ListingTable partitions are pruned when filters are not used (#17958)
* fix: Prune partitions when no filters are defined * fix: Formatting * chore: Cargo fmt * chore: Clippy
1 parent 424b73d commit a61a9c2

File tree

2 files changed

+66
-4
lines changed

2 files changed

+66
-4
lines changed

datafusion/catalog-listing/src/helpers.rs

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ pub fn split_files(
156156
chunks
157157
}
158158

159+
#[derive(Debug)]
159160
pub struct Partition {
160161
/// The path to the partition, including the table prefix
161162
path: Path,
@@ -245,7 +246,16 @@ async fn prune_partitions(
245246
partition_cols: &[(String, DataType)],
246247
) -> Result<Vec<Partition>> {
247248
if filters.is_empty() {
248-
return Ok(partitions);
249+
// prune partitions which don't contain the partition columns
250+
return Ok(partitions
251+
.into_iter()
252+
.filter(|p| {
253+
let cols = partition_cols.iter().map(|x| x.0.as_str());
254+
!parse_partitions_for_path(table_path, &p.path, cols)
255+
.unwrap_or_default()
256+
.is_empty()
257+
})
258+
.collect());
249259
}
250260

251261
let mut builders: Vec<_> = (0..partition_cols.len())
@@ -432,6 +442,7 @@ pub async fn pruned_partition_list<'a>(
432442
}
433443

434444
let partition_prefix = evaluate_partition_prefix(partition_cols, filters);
445+
435446
let partitions =
436447
list_partitions(store, table_path, partition_cols.len(), partition_prefix)
437448
.await?;
@@ -502,12 +513,12 @@ where
502513
let subpath = table_path.strip_prefix(file_path)?;
503514

504515
let mut part_values = vec![];
505-
for (part, pn) in subpath.zip(table_partition_cols) {
516+
for (part, expected_partition) in subpath.zip(table_partition_cols) {
506517
match part.split_once('=') {
507-
Some((name, val)) if name == pn => part_values.push(val),
518+
Some((name, val)) if name == expected_partition => part_values.push(val),
508519
_ => {
509520
debug!(
510-
"Ignoring file: file_path='{file_path}', table_path='{table_path}', part='{part}', partition_col='{pn}'",
521+
"Ignoring file: file_path='{file_path}', table_path='{table_path}', part='{part}', partition_col='{expected_partition}'",
511522
);
512523
return None;
513524
}
@@ -594,6 +605,8 @@ mod tests {
594605
("tablepath/mypartition=val1/notparquetfile", 100),
595606
("tablepath/mypartition=val1/ignoresemptyfile.parquet", 0),
596607
("tablepath/file.parquet", 100),
608+
("tablepath/notapartition/file.parquet", 100),
609+
("tablepath/notmypartition=val1/file.parquet", 100),
597610
]);
598611
let filter = Expr::eq(col("mypartition"), lit("val1"));
599612
let pruned = pruned_partition_list(
@@ -619,6 +632,8 @@ mod tests {
619632
("tablepath/mypartition=val2/file.parquet", 100),
620633
("tablepath/mypartition=val1/ignoresemptyfile.parquet", 0),
621634
("tablepath/mypartition=val1/other=val3/file.parquet", 100),
635+
("tablepath/notapartition/file.parquet", 100),
636+
("tablepath/notmypartition=val1/file.parquet", 100),
622637
]);
623638
let filter = Expr::eq(col("mypartition"), lit("val1"));
624639
let pruned = pruned_partition_list(

datafusion/core/src/datasource/listing/table.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2732,6 +2732,52 @@ mod tests {
27322732
Ok(())
27332733
}
27342734

2735+
#[tokio::test]
2736+
async fn test_listing_table_prunes_extra_files_in_hive() -> Result<()> {
2737+
let files = [
2738+
"bucket/test/pid=1/file1",
2739+
"bucket/test/pid=1/file2",
2740+
"bucket/test/pid=2/file3",
2741+
"bucket/test/pid=2/file4",
2742+
"bucket/test/other/file5",
2743+
];
2744+
2745+
let ctx = SessionContext::new();
2746+
register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::<Vec<_>>());
2747+
2748+
let opt = ListingOptions::new(Arc::new(JsonFormat::default()))
2749+
.with_file_extension_opt(Some(""))
2750+
.with_table_partition_cols(vec![("pid".to_string(), DataType::Int32)]);
2751+
2752+
let table_path = ListingTableUrl::parse("test:///bucket/test/").unwrap();
2753+
let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
2754+
let config = ListingTableConfig::new(table_path)
2755+
.with_listing_options(opt)
2756+
.with_schema(Arc::new(schema));
2757+
2758+
let table = ListingTable::try_new(config)?;
2759+
2760+
let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?;
2761+
assert_eq!(file_list.len(), 1);
2762+
2763+
let files = file_list[0].clone();
2764+
2765+
assert_eq!(
2766+
files
2767+
.iter()
2768+
.map(|f| f.path().to_string())
2769+
.collect::<Vec<_>>(),
2770+
vec![
2771+
"bucket/test/pid=1/file1",
2772+
"bucket/test/pid=1/file2",
2773+
"bucket/test/pid=2/file3",
2774+
"bucket/test/pid=2/file4",
2775+
]
2776+
);
2777+
2778+
Ok(())
2779+
}
2780+
27352781
#[cfg(feature = "parquet")]
27362782
#[tokio::test]
27372783
async fn test_table_stats_behaviors() -> Result<()> {
@@ -2750,6 +2796,7 @@ mod tests {
27502796
let config_default = ListingTableConfig::new(table_path.clone())
27512797
.with_listing_options(opt_default)
27522798
.with_schema(schema_default);
2799+
27532800
let table_default = ListingTable::try_new(config_default)?;
27542801

27552802
let exec_default = table_default.scan(&state, None, &[], None).await?;

0 commit comments

Comments
 (0)