diff --git a/Cargo.lock b/Cargo.lock index 518e2ee919cef..5386046e92039 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1951,6 +1951,7 @@ dependencies = [ "itertools 0.14.0", "log", "object_store", + "percent-encoding", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index b9d8b1a69ef61..54c211edb1bae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -166,6 +166,7 @@ log = "^0.4" num-traits = { version = "0.2" } object_store = { version = "0.12.4", default-features = false } parking_lot = "0.12" +percent-encoding = "2.3" parquet = { version = "57.1.0", default-features = false, features = [ "arrow", "async", diff --git a/datafusion/catalog-listing/Cargo.toml b/datafusion/catalog-listing/Cargo.toml index be1374b371485..a32045c7586ca 100644 --- a/datafusion/catalog-listing/Cargo.toml +++ b/datafusion/catalog-listing/Cargo.toml @@ -46,6 +46,7 @@ futures = { workspace = true } itertools = { workspace = true } log = { workspace = true } object_store = { workspace = true } +percent-encoding = { workspace = true } [dev-dependencies] datafusion-datasource-parquet = { workspace = true } diff --git a/datafusion/catalog-listing/src/helpers.rs b/datafusion/catalog-listing/src/helpers.rs index ea016015cebd3..752b9945a0691 100644 --- a/datafusion/catalog-listing/src/helpers.rs +++ b/datafusion/catalog-listing/src/helpers.rs @@ -42,6 +42,7 @@ use datafusion_expr::{Expr, Volatility}; use datafusion_physical_expr::create_physical_expr; use object_store::path::Path; use object_store::{ObjectMeta, ObjectStore}; +use percent_encoding::percent_decode_str; /// Check whether the given expression can be resolved using only the columns `col_names`. /// This means that if this function returns true: @@ -417,12 +418,15 @@ pub async fn pruned_partition_list<'a>( } /// Extract the partition values for the given `file_path` (in the given `table_path`) -/// associated to the partitions defined by `table_partition_cols` +/// associated to the partitions defined by `table_partition_cols`. +/// +/// Partition values are URL-decoded, since object stores like S3 encode special +/// characters (e.g., `/` becomes `%2F`) in path segments. pub fn parse_partitions_for_path<'a, I>( table_path: &ListingTableUrl, file_path: &'a Path, table_partition_cols: I, -) -> Option> +) -> Option> where I: IntoIterator, { @@ -431,7 +435,10 @@ where let mut part_values = vec![]; for (part, expected_partition) in subpath.zip(table_partition_cols) { match part.split_once('=') { - Some((name, val)) if name == expected_partition => part_values.push(val), + Some((name, val)) if name == expected_partition => { + let decoded = percent_decode_str(val).decode_utf8().ok()?; + part_values.push(decoded.into_owned()); + } _ => { debug!( "Ignoring file: file_path='{file_path}', table_path='{table_path}', part='{part}', partition_col='{expected_partition}'", @@ -507,7 +514,7 @@ mod tests { #[test] fn test_parse_partitions_for_path() { assert_eq!( - Some(vec![]), + Some(vec![] as Vec), parse_partitions_for_path( &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), &Path::from("bucket/mytable/file.csv"), @@ -531,15 +538,25 @@ mod tests { ) ); assert_eq!( - Some(vec!["v1"]), + Some(vec!["v1".to_string()]), parse_partitions_for_path( &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), &Path::from("bucket/mytable/mypartition=v1/file.csv"), vec!["mypartition"] ) ); + // URL-encoded partition values should be decoded + // Use Path::parse to avoid double-encoding (Path::from encodes % as %25) assert_eq!( - Some(vec!["v1"]), + Some(vec!["v/1".to_string()]), + parse_partitions_for_path( + &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), + &Path::parse("bucket/mytable/mypartition=v%2F1/file.csv").unwrap(), + vec!["mypartition"] + ) + ); + assert_eq!( + Some(vec!["v1".to_string()]), parse_partitions_for_path( &ListingTableUrl::parse("file:///bucket/mytable/").unwrap(), &Path::from("bucket/mytable/mypartition=v1/file.csv"), @@ -556,7 +573,7 @@ mod tests { ) ); assert_eq!( - Some(vec!["v1", "v2"]), + Some(vec!["v1".to_string(), "v2".to_string()]), parse_partitions_for_path( &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), &Path::from("bucket/mytable/mypartition=v1/otherpartition=v2/file.csv"), @@ -564,13 +581,53 @@ mod tests { ) ); assert_eq!( - Some(vec!["v1"]), + Some(vec!["v1".to_string()]), parse_partitions_for_path( &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), &Path::from("bucket/mytable/mypartition=v1/otherpartition=v2/file.csv"), vec!["mypartition"] ) ); + assert_eq!( + Some(vec!["John Doe".to_string()]), + parse_partitions_for_path( + &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), + &Path::parse("bucket/mytable/name=John%20Doe/file.csv").unwrap(), + vec!["name"] + ) + ); + assert_eq!( + Some(vec!["a/b".to_string(), "c d".to_string()]), + parse_partitions_for_path( + &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), + &Path::parse("bucket/mytable/p1=a%2Fb/p2=c%20d/file.csv").unwrap(), + vec!["p1", "p2"] + ) + ); + assert_eq!( + Some(vec!["Müller".to_string()]), + parse_partitions_for_path( + &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), + &Path::parse("bucket/mytable/name=M%C3%BCller/file.csv").unwrap(), + vec!["name"] + ) + ); + assert_eq!( + Some(vec!["invalid%XX".to_string()]), + parse_partitions_for_path( + &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), + &Path::parse("bucket/mytable/p1=invalid%XX/file.csv").unwrap(), + vec!["p1"] + ) + ); + assert_eq!( + None, + parse_partitions_for_path( + &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), + &Path::parse("bucket/mytable/p1=%FF/file.csv").unwrap(), + vec!["p1"] + ) + ); } #[test]