Skip to content

feat: make parquet native scan schema case insensitive #1575

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 28, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions native/core/src/parquet/parquet_exec.rs
Original file line number Diff line number Diff line change
@@ -110,6 +110,7 @@ fn get_options(session_timezone: &str) -> (TableParquetOptions, SparkParquetOpti
let mut spark_parquet_options =
SparkParquetOptions::new(EvalMode::Legacy, session_timezone, false);
spark_parquet_options.allow_cast_unsigned_ints = true;
spark_parquet_options.case_sensitive = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if it makes sense to take the value from spark.sql.caseSensitive although this is an internal config and false by default

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed there is a TODO comment above, maybe we can make them configurable in the future

// TODO: Maybe these are configs?

(table_parquet_options, spark_parquet_options)
}

4 changes: 4 additions & 0 deletions native/core/src/parquet/parquet_support.rs
Original file line number Diff line number Diff line change
@@ -64,6 +64,8 @@ pub struct SparkParquetOptions {
pub use_decimal_128: bool,
/// Whether to read dates/timestamps that were written in the legacy hybrid Julian + Gregorian calendar as it is. If false, throw exceptions instead. If the spark type is TimestampNTZ, this should be true.
pub use_legacy_date_timestamp_or_ntz: bool,
// Whether schema field names are case sensitive
pub case_sensitive: bool,
}

impl SparkParquetOptions {
@@ -76,6 +78,7 @@ impl SparkParquetOptions {
is_adapting_schema: false,
use_decimal_128: false,
use_legacy_date_timestamp_or_ntz: false,
case_sensitive: false,
}
}

@@ -88,6 +91,7 @@ impl SparkParquetOptions {
is_adapting_schema: false,
use_decimal_128: false,
use_legacy_date_timestamp_or_ntz: false,
case_sensitive: false,
}
}
}
51 changes: 38 additions & 13 deletions native/core/src/parquet/schema_adapter.rs
Original file line number Diff line number Diff line change
@@ -84,7 +84,20 @@ impl SchemaAdapter for SparkSchemaAdapter {
/// Panics if index is not in range for the table schema
fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> {
let field = self.required_schema.field(index);
Some(file_schema.fields.find(field.name())?.0)
Some(
file_schema
.fields
.iter()
.enumerate()
.find(|(_, b)| {
if self.parquet_options.case_sensitive {
b.name() == field.name()
} else {
b.name().to_lowercase() == field.name().to_lowercase()
}
})?
.0,
)
}

/// Creates a `SchemaMapping` for casting or mapping the columns from the
@@ -104,8 +117,18 @@ impl SchemaAdapter for SparkSchemaAdapter {
let mut field_mappings = vec![None; self.required_schema.fields().len()];

for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
if let Some((table_idx, _table_field)) =
self.required_schema.fields().find(file_field.name())
if let Some((table_idx, _table_field)) = self
.required_schema
.fields()
.iter()
.enumerate()
.find(|(_, b)| {
if self.parquet_options.case_sensitive {
b.name() == file_field.name()
} else {
b.name().to_lowercase() == file_field.name().to_lowercase()
}
})
{
field_mappings[table_idx] = Some(projection.len());
projection.push(file_idx);
@@ -234,16 +257,18 @@ impl SchemaMapper for SchemaMapping {
.zip(batch_cols.iter())
.flat_map(|(field, batch_col)| {
self.table_schema
// try to get the same field from the table schema that we have stored in self
.field_with_name(field.name())
// and if we don't have it, that's fine, ignore it. This may occur when we've
// created an external table whose fields are a subset of the fields in this
// file, then tried to read data from the file into this table. If that is the
// case here, it's fine to ignore because we don't care about this field
// anyways
.ok()
.fields()
.iter()
.enumerate()
.find(|(_, b)| {
if self.parquet_options.case_sensitive {
b.name() == field.name()
} else {
b.name().to_lowercase() == field.name().to_lowercase()
}
})
// but if we do have it,
.map(|table_field| {
.map(|(_, table_field)| {
// try to cast it into the correct output type. we don't want to ignore this
// error, though, so it's propagated.
spark_parquet_convert(
@@ -253,7 +278,7 @@ impl SchemaMapper for SchemaMapping {
)?
.into_array(batch_col.len())
// and if that works, return the field and column.
.map(|new_col| (new_col, table_field.clone()))
.map(|new_col| (new_col, table_field.as_ref().clone()))
})
})
.collect::<Result<Vec<_>, _>>()?
Original file line number Diff line number Diff line change
@@ -1460,6 +1460,25 @@ class ParquetReadV1Suite extends ParquetReadSuite with AdaptiveSparkPlanHelper {
v1 = Some("parquet"))
}
}

test("test V1 parquet native scan -- case insensitive") {
withTempPath { path =>
spark.range(10).toDF("a").write.parquet(path.toString)
Seq(CometConf.SCAN_NATIVE_DATAFUSION, CometConf.SCAN_NATIVE_ICEBERG_COMPAT).foreach(
scanMode => {
withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanMode) {
withTable("test") {
sql("create table test (A long) using parquet options (path '" + path + "')")
val df = sql("select A from test")
checkSparkAnswer(df)
// TODO: pushed down filters do not used schema adapter in datafusion, will cause empty result
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should fix this in the parquet reader (parquet by itself does not specify whether the field names are case sensitive/insensitive).
Comet is the right place to fix this, I feel.

// val df = sql("select * from test where A > 5")
// checkSparkAnswer(df)
}
}
})
}
}
}

class ParquetReadV2Suite extends ParquetReadSuite with AdaptiveSparkPlanHelper {