diff --git a/datafusion-examples/examples/data_io/parquet_encrypted.rs b/datafusion-examples/examples/data_io/parquet_encrypted.rs index d3cc6a121f8ea..26361e9b52be0 100644 --- a/datafusion-examples/examples/data_io/parquet_encrypted.rs +++ b/datafusion-examples/examples/data_io/parquet_encrypted.rs @@ -55,7 +55,7 @@ pub async fn parquet_encrypted() -> datafusion::common::Result<()> { // Create a temporary file location for the encrypted parquet file let tmp_source = TempDir::new()?; - let tempfile = tmp_source.path().join("cars_encrypted"); + let tempfile = tmp_source.path().join("cars_encrypted.parquet"); // Write encrypted parquet let mut options = TableParquetOptions::default(); diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index 38456944075fc..a01e4c5a1f72e 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -674,6 +674,7 @@ impl TableProvider for ListingTable { insert_op, keep_partition_by_columns, file_extension: self.options().format.get_ext(), + single_file_output: None, // Use extension heuristic for table inserts }; // For writes, we only use user-specified ordering (no file groups to derive from) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 1e9f72501e4cc..96c57049fd35d 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -2048,11 +2048,17 @@ impl DataFrame { .build()? }; + // Build copy options, including single_file_output if explicitly set + let mut copy_options: HashMap = HashMap::new(); + if options.single_file_output { + copy_options.insert("single_file_output".to_string(), "true".to_string()); + } + let plan = LogicalPlanBuilder::copy_to( plan, path.into(), file_type, - HashMap::new(), + copy_options, options.partition_by, )? .build()?; @@ -2116,11 +2122,17 @@ impl DataFrame { .build()? }; + // Build copy options, including single_file_output if explicitly set + let mut copy_options: HashMap = HashMap::new(); + if options.single_file_output { + copy_options.insert("single_file_output".to_string(), "true".to_string()); + } + let plan = LogicalPlanBuilder::copy_to( plan, path.into(), file_type, - Default::default(), + copy_options, options.partition_by, )? .build()?; diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index 6edf628e2d6d6..da2bddb623476 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; use std::sync::Arc; use crate::datasource::file_format::{ @@ -84,11 +85,17 @@ impl DataFrame { .build()? }; + // Build copy options, including single_file_output if explicitly set + let mut copy_options = HashMap::::new(); + if options.single_file_output { + copy_options.insert("single_file_output".to_string(), "true".to_string()); + } + let plan = LogicalPlanBuilder::copy_to( plan, path.into(), file_type, - Default::default(), + copy_options, options.partition_by, )? .build()?; @@ -324,4 +331,52 @@ mod tests { Ok(()) } + + /// Test that single_file_output works for paths WITHOUT file extensions. + /// This verifies the fix for the regression where extension heuristics + /// ignored the explicit with_single_file_output(true) setting. + #[tokio::test] + async fn test_single_file_output_without_extension() -> Result<()> { + use arrow::array::Int32Array; + use arrow::datatypes::{DataType, Field, Schema}; + use arrow::record_batch::RecordBatch; + + let ctx = SessionContext::new(); + let tmp_dir = TempDir::new()?; + + // Path WITHOUT .parquet extension - this is the key scenario + let output_path = tmp_dir.path().join("data_no_ext"); + let output_path_str = output_path.to_str().unwrap(); + + let df = ctx.read_batch(RecordBatch::try_new( + Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])), + vec![Arc::new(Int32Array::from(vec![1, 2, 3]))], + )?)?; + + // Explicitly request single file output + df.write_parquet( + output_path_str, + DataFrameWriteOptions::new().with_single_file_output(true), + None, + ) + .await?; + + // Verify: output should be a FILE, not a directory + assert!( + output_path.is_file(), + "Expected single file at {:?}, but got is_file={}, is_dir={}", + output_path, + output_path.is_file(), + output_path.is_dir() + ); + + // Verify the file is readable as parquet + let file = std::fs::File::open(&output_path)?; + let reader = parquet::file::reader::SerializedFileReader::new(file)?; + let metadata = reader.metadata(); + assert_eq!(metadata.num_row_groups(), 1); + assert_eq!(metadata.file_metadata().num_rows(), 3); + + Ok(()) + } } diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 47ce519f01289..dd63440ff3359 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -1547,6 +1547,7 @@ mod tests { insert_op: InsertOp::Overwrite, keep_partition_by_columns: false, file_extension: "parquet".into(), + single_file_output: None, }; let parquet_sink = Arc::new(ParquetSink::new( file_sink_config, @@ -1638,6 +1639,7 @@ mod tests { insert_op: InsertOp::Overwrite, keep_partition_by_columns: false, file_extension: "parquet".into(), + single_file_output: None, }; let parquet_sink = Arc::new(ParquetSink::new( file_sink_config, @@ -1728,6 +1730,7 @@ mod tests { insert_op: InsertOp::Overwrite, keep_partition_by_columns: false, file_extension: "parquet".into(), + single_file_output: None, }; let parquet_sink = Arc::new(ParquetSink::new( file_sink_config, diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 94c8fd510a382..c062166312920 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -549,8 +549,30 @@ impl DefaultPhysicalPlanner { } }; + // Parse single_file_output option if explicitly set + let single_file_output = match source_option_tuples + .get("single_file_output") + .map(|v| v.trim()) + { + None => None, + Some("true") => Some(true), + Some("false") => Some(false), + Some(value) => { + return Err(DataFusionError::Configuration(format!( + "provided value for 'single_file_output' was not recognized: \"{value}\"" + ))); + } + }; + + // Filter out sink-related options that are not format options + let format_options: HashMap = source_option_tuples + .iter() + .filter(|(k, _)| k.as_str() != "single_file_output") + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + let sink_format = file_type_to_format(file_type)? - .create(session_state, source_option_tuples)?; + .create(session_state, &format_options)?; // Determine extension based on format extension and compression let file_extension = match sink_format.compression_type() { @@ -571,6 +593,7 @@ impl DefaultPhysicalPlanner { insert_op: InsertOp::Append, keep_partition_by_columns, file_extension, + single_file_output, }; let ordering = input_exec.properties().output_ordering().cloned(); diff --git a/datafusion/datasource/src/file_sink_config.rs b/datafusion/datasource/src/file_sink_config.rs index 643831a1199f8..8c5bc560780f6 100644 --- a/datafusion/datasource/src/file_sink_config.rs +++ b/datafusion/datasource/src/file_sink_config.rs @@ -112,6 +112,11 @@ pub struct FileSinkConfig { pub keep_partition_by_columns: bool, /// File extension without a dot(.) pub file_extension: String, + /// Override for single file output behavior. + /// - `None`: use extension heuristic (path with extension = single file) + /// - `Some(true)`: force single file output at exact path + /// - `Some(false)`: force directory output with generated filenames + pub single_file_output: Option, } impl FileSinkConfig { diff --git a/datafusion/datasource/src/write/demux.rs b/datafusion/datasource/src/write/demux.rs index bec5b8b0bff0e..921c1f3b41b55 100644 --- a/datafusion/datasource/src/write/demux.rs +++ b/datafusion/datasource/src/write/demux.rs @@ -106,8 +106,11 @@ pub(crate) fn start_demuxer_task( let file_extension = config.file_extension.clone(); let base_output_path = config.table_paths[0].clone(); let task = if config.table_partition_cols.is_empty() { - let single_file_output = !base_output_path.is_collection() - && base_output_path.file_extension().is_some(); + // Use explicit single_file_output if set, otherwise fall back to extension heuristic + let single_file_output = config.single_file_output.unwrap_or_else(|| { + !base_output_path.is_collection() + && base_output_path.file_extension().is_some() + }); SpawnedTask::spawn(async move { row_count_demuxer( tx, diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 2b5e2368c1fa1..626b520188062 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -771,6 +771,11 @@ message FileSinkConfig { bool keep_partition_by_columns = 9; InsertOp insert_op = 10; string file_extension = 11; + // Optional override for single file output behavior. + // When not set, uses extension heuristic (path with extension = single file). + // When set to true, forces single file output at exact path. + // When set to false, forces directory output with generated filenames. + optional bool single_file_output = 12; } enum InsertOp { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 842dc7f6326dd..1939218cdb548 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -6382,6 +6382,9 @@ impl serde::Serialize for FileSinkConfig { if !self.file_extension.is_empty() { len += 1; } + if self.single_file_output.is_some() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.FileSinkConfig", len)?; if !self.object_store_url.is_empty() { struct_ser.serialize_field("objectStoreUrl", &self.object_store_url)?; @@ -6409,6 +6412,9 @@ impl serde::Serialize for FileSinkConfig { if !self.file_extension.is_empty() { struct_ser.serialize_field("fileExtension", &self.file_extension)?; } + if let Some(v) = self.single_file_output.as_ref() { + struct_ser.serialize_field("singleFileOutput", v)?; + } struct_ser.end() } } @@ -6435,6 +6441,8 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { "insertOp", "file_extension", "fileExtension", + "single_file_output", + "singleFileOutput", ]; #[allow(clippy::enum_variant_names)] @@ -6447,6 +6455,7 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { KeepPartitionByColumns, InsertOp, FileExtension, + SingleFileOutput, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -6476,6 +6485,7 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { "keepPartitionByColumns" | "keep_partition_by_columns" => Ok(GeneratedField::KeepPartitionByColumns), "insertOp" | "insert_op" => Ok(GeneratedField::InsertOp), "fileExtension" | "file_extension" => Ok(GeneratedField::FileExtension), + "singleFileOutput" | "single_file_output" => Ok(GeneratedField::SingleFileOutput), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -6503,6 +6513,7 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { let mut keep_partition_by_columns__ = None; let mut insert_op__ = None; let mut file_extension__ = None; + let mut single_file_output__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::ObjectStoreUrl => { @@ -6553,6 +6564,12 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { } file_extension__ = Some(map_.next_value()?); } + GeneratedField::SingleFileOutput => { + if single_file_output__.is_some() { + return Err(serde::de::Error::duplicate_field("singleFileOutput")); + } + single_file_output__ = map_.next_value()?; + } } } Ok(FileSinkConfig { @@ -6564,6 +6581,7 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { keep_partition_by_columns: keep_partition_by_columns__.unwrap_or_default(), insert_op: insert_op__.unwrap_or_default(), file_extension: file_extension__.unwrap_or_default(), + single_file_output: single_file_output__, }) } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 3a7b35509eaa1..d53c7327941b7 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1186,6 +1186,12 @@ pub struct FileSinkConfig { pub insert_op: i32, #[prost(string, tag = "11")] pub file_extension: ::prost::alloc::string::String, + /// Optional override for single file output behavior. + /// When not set, uses extension heuristic (path with extension = single file). + /// When set to true, forces single file output at exact path. + /// When set to false, forces directory output with generated filenames. + #[prost(bool, optional, tag = "12")] + pub single_file_output: ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct JsonSink { diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 3cfc796700dae..d12e145f113c6 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -737,6 +737,8 @@ impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig { insert_op, keep_partition_by_columns: conf.keep_partition_by_columns, file_extension: conf.file_extension.clone(), + // Read from proto; None if not present (backward compatible with old plans) + single_file_output: conf.single_file_output, }) } } diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 9558effb8a2a6..26fcf32e6eefd 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -704,6 +704,7 @@ impl TryFrom<&FileSinkConfig> for protobuf::FileSinkConfig { keep_partition_by_columns: conf.keep_partition_by_columns, insert_op: conf.insert_op as i32, file_extension: conf.file_extension.to_string(), + single_file_output: conf.single_file_output, }) } } diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index b54b7030fc52a..33f7ec6b881e8 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -1475,6 +1475,7 @@ fn roundtrip_json_sink() -> Result<()> { insert_op: InsertOp::Overwrite, keep_partition_by_columns: true, file_extension: "json".into(), + single_file_output: None, }; let data_sink = Arc::new(JsonSink::new( file_sink_config, @@ -1513,6 +1514,7 @@ fn roundtrip_csv_sink() -> Result<()> { insert_op: InsertOp::Overwrite, keep_partition_by_columns: true, file_extension: "csv".into(), + single_file_output: None, }; let data_sink = Arc::new(CsvSink::new( file_sink_config, @@ -1570,6 +1572,7 @@ fn roundtrip_parquet_sink() -> Result<()> { insert_op: InsertOp::Overwrite, keep_partition_by_columns: true, file_extension: "parquet".into(), + single_file_output: None, }; let data_sink = Arc::new(ParquetSink::new( file_sink_config,