From e9fdb2c692c5af6c2c62485a4ecafc02afc3e612 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Thu, 27 Mar 2025 20:49:07 +0800 Subject: [PATCH] feat: make parquet native scan schema case insensitive --- native/core/src/parquet/parquet_exec.rs | 1 + native/core/src/parquet/parquet_support.rs | 4 ++ native/core/src/parquet/schema_adapter.rs | 51 ++++++++++++++----- .../comet/parquet/ParquetReadSuite.scala | 19 +++++++ 4 files changed, 62 insertions(+), 13 deletions(-) diff --git a/native/core/src/parquet/parquet_exec.rs b/native/core/src/parquet/parquet_exec.rs index 85a3d023c..a39ee259f 100644 --- a/native/core/src/parquet/parquet_exec.rs +++ b/native/core/src/parquet/parquet_exec.rs @@ -110,6 +110,7 @@ fn get_options(session_timezone: &str) -> (TableParquetOptions, SparkParquetOpti let mut spark_parquet_options = SparkParquetOptions::new(EvalMode::Legacy, session_timezone, false); spark_parquet_options.allow_cast_unsigned_ints = true; + spark_parquet_options.case_sensitive = false; (table_parquet_options, spark_parquet_options) } diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 3d09a046a..ee830b795 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -64,6 +64,8 @@ pub struct SparkParquetOptions { pub use_decimal_128: bool, /// Whether to read dates/timestamps that were written in the legacy hybrid Julian + Gregorian calendar as it is. If false, throw exceptions instead. If the spark type is TimestampNTZ, this should be true. pub use_legacy_date_timestamp_or_ntz: bool, + // Whether schema field names are case sensitive + pub case_sensitive: bool, } impl SparkParquetOptions { @@ -76,6 +78,7 @@ impl SparkParquetOptions { is_adapting_schema: false, use_decimal_128: false, use_legacy_date_timestamp_or_ntz: false, + case_sensitive: false, } } @@ -88,6 +91,7 @@ impl SparkParquetOptions { is_adapting_schema: false, use_decimal_128: false, use_legacy_date_timestamp_or_ntz: false, + case_sensitive: false, } } } diff --git a/native/core/src/parquet/schema_adapter.rs b/native/core/src/parquet/schema_adapter.rs index e9d1ff640..a387ec6f5 100644 --- a/native/core/src/parquet/schema_adapter.rs +++ b/native/core/src/parquet/schema_adapter.rs @@ -84,7 +84,20 @@ impl SchemaAdapter for SparkSchemaAdapter { /// Panics if index is not in range for the table schema fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option { let field = self.required_schema.field(index); - Some(file_schema.fields.find(field.name())?.0) + Some( + file_schema + .fields + .iter() + .enumerate() + .find(|(_, b)| { + if self.parquet_options.case_sensitive { + b.name() == field.name() + } else { + b.name().to_lowercase() == field.name().to_lowercase() + } + })? + .0, + ) } /// Creates a `SchemaMapping` for casting or mapping the columns from the @@ -104,8 +117,18 @@ impl SchemaAdapter for SparkSchemaAdapter { let mut field_mappings = vec![None; self.required_schema.fields().len()]; for (file_idx, file_field) in file_schema.fields.iter().enumerate() { - if let Some((table_idx, _table_field)) = - self.required_schema.fields().find(file_field.name()) + if let Some((table_idx, _table_field)) = self + .required_schema + .fields() + .iter() + .enumerate() + .find(|(_, b)| { + if self.parquet_options.case_sensitive { + b.name() == file_field.name() + } else { + b.name().to_lowercase() == file_field.name().to_lowercase() + } + }) { field_mappings[table_idx] = Some(projection.len()); projection.push(file_idx); @@ -234,16 +257,18 @@ impl SchemaMapper for SchemaMapping { .zip(batch_cols.iter()) .flat_map(|(field, batch_col)| { self.table_schema - // try to get the same field from the table schema that we have stored in self - .field_with_name(field.name()) - // and if we don't have it, that's fine, ignore it. This may occur when we've - // created an external table whose fields are a subset of the fields in this - // file, then tried to read data from the file into this table. If that is the - // case here, it's fine to ignore because we don't care about this field - // anyways - .ok() + .fields() + .iter() + .enumerate() + .find(|(_, b)| { + if self.parquet_options.case_sensitive { + b.name() == field.name() + } else { + b.name().to_lowercase() == field.name().to_lowercase() + } + }) // but if we do have it, - .map(|table_field| { + .map(|(_, table_field)| { // try to cast it into the correct output type. we don't want to ignore this // error, though, so it's propagated. spark_parquet_convert( @@ -253,7 +278,7 @@ impl SchemaMapper for SchemaMapping { )? .into_array(batch_col.len()) // and if that works, return the field and column. - .map(|new_col| (new_col, table_field.clone())) + .map(|new_col| (new_col, table_field.as_ref().clone())) }) }) .collect::, _>>()? diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index 6e9b731d4..a6526e5fe 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -1460,6 +1460,25 @@ class ParquetReadV1Suite extends ParquetReadSuite with AdaptiveSparkPlanHelper { v1 = Some("parquet")) } } + + test("test V1 parquet native scan -- case insensitive") { + withTempPath { path => + spark.range(10).toDF("a").write.parquet(path.toString) + Seq(CometConf.SCAN_NATIVE_DATAFUSION, CometConf.SCAN_NATIVE_ICEBERG_COMPAT).foreach( + scanMode => { + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanMode) { + withTable("test") { + sql("create table test (A long) using parquet options (path '" + path + "')") + val df = sql("select A from test") + checkSparkAnswer(df) + // TODO: pushed down filters do not used schema adapter in datafusion, will cause empty result + // val df = sql("select * from test where A > 5") + // checkSparkAnswer(df) + } + } + }) + } + } } class ParquetReadV2Suite extends ParquetReadSuite with AdaptiveSparkPlanHelper {