Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion-examples/examples/data_io/parquet_encrypted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub async fn parquet_encrypted() -> datafusion::common::Result<()> {

// Create a temporary file location for the encrypted parquet file
let tmp_source = TempDir::new()?;
let tempfile = tmp_source.path().join("cars_encrypted");
let tempfile = tmp_source.path().join("cars_encrypted.parquet");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it need to be called .parquet even though the dataframe explicitly says write_parquet? Or is this just to clean up the code?

Copy link
Member

@martin-g martin-g Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is no file extension and there is no configuration (Some(true)) then the heuristics decide that this is a folder and creates a Parquet partition file.
If there is no config and the path has file extension then it writes all the content in this file (no partitions)


// Write encrypted parquet
let mut options = TableParquetOptions::default();
Expand Down
1 change: 1 addition & 0 deletions datafusion/catalog-listing/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,7 @@ impl TableProvider for ListingTable {
insert_op,
keep_partition_by_columns,
file_extension: self.options().format.get_ext(),
single_file_output: None, // Use extension heuristic for table inserts
};

// For writes, we only use user-specified ordering (no file groups to derive from)
Expand Down
16 changes: 14 additions & 2 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2048,11 +2048,17 @@ impl DataFrame {
.build()?
};

// Build copy options, including single_file_output if explicitly set
let mut copy_options: HashMap<String, String> = HashMap::new();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this seems to be a common piece of functionality, what do you think about creating a common function that takes a DataFrameWriteOptions and returns a CopyOptions?

if options.single_file_output {
copy_options.insert("single_file_output".to_string(), "true".to_string());
}

let plan = LogicalPlanBuilder::copy_to(
plan,
path.into(),
file_type,
HashMap::new(),
copy_options,
options.partition_by,
)?
.build()?;
Expand Down Expand Up @@ -2116,11 +2122,17 @@ impl DataFrame {
.build()?
};

// Build copy options, including single_file_output if explicitly set
let mut copy_options: HashMap<String, String> = HashMap::new();
if options.single_file_output {
copy_options.insert("single_file_output".to_string(), "true".to_string());
}

let plan = LogicalPlanBuilder::copy_to(
plan,
path.into(),
file_type,
Default::default(),
copy_options,
options.partition_by,
)?
.build()?;
Expand Down
57 changes: 56 additions & 1 deletion datafusion/core/src/dataframe/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;
use std::sync::Arc;

use crate::datasource::file_format::{
Expand Down Expand Up @@ -84,11 +85,17 @@ impl DataFrame {
.build()?
};

// Build copy options, including single_file_output if explicitly set
let mut copy_options = HashMap::<String, String>::new();
if options.single_file_output {
copy_options.insert("single_file_output".to_string(), "true".to_string());
}

let plan = LogicalPlanBuilder::copy_to(
plan,
path.into(),
file_type,
Default::default(),
copy_options,
options.partition_by,
)?
.build()?;
Expand Down Expand Up @@ -324,4 +331,52 @@ mod tests {

Ok(())
}

/// Test that single_file_output works for paths WITHOUT file extensions.
/// This verifies the fix for the regression where extension heuristics
/// ignored the explicit with_single_file_output(true) setting.
#[tokio::test]
async fn test_single_file_output_without_extension() -> Result<()> {
use arrow::array::Int32Array;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;

let ctx = SessionContext::new();
let tmp_dir = TempDir::new()?;

// Path WITHOUT .parquet extension - this is the key scenario
let output_path = tmp_dir.path().join("data_no_ext");
let output_path_str = output_path.to_str().unwrap();

let df = ctx.read_batch(RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)?)?;

// Explicitly request single file output
df.write_parquet(
output_path_str,
DataFrameWriteOptions::new().with_single_file_output(true),
None,
)
.await?;

// Verify: output should be a FILE, not a directory
assert!(
output_path.is_file(),
"Expected single file at {:?}, but got is_file={}, is_dir={}",
output_path,
output_path.is_file(),
output_path.is_dir()
);

// Verify the file is readable as parquet
let file = std::fs::File::open(&output_path)?;
let reader = parquet::file::reader::SerializedFileReader::new(file)?;
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 1);
assert_eq!(metadata.file_metadata().num_rows(), 3);

Ok(())
}
}
3 changes: 3 additions & 0 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1547,6 +1547,7 @@ mod tests {
insert_op: InsertOp::Overwrite,
keep_partition_by_columns: false,
file_extension: "parquet".into(),
single_file_output: None,
};
let parquet_sink = Arc::new(ParquetSink::new(
file_sink_config,
Expand Down Expand Up @@ -1638,6 +1639,7 @@ mod tests {
insert_op: InsertOp::Overwrite,
keep_partition_by_columns: false,
file_extension: "parquet".into(),
single_file_output: None,
};
let parquet_sink = Arc::new(ParquetSink::new(
file_sink_config,
Expand Down Expand Up @@ -1728,6 +1730,7 @@ mod tests {
insert_op: InsertOp::Overwrite,
keep_partition_by_columns: false,
file_extension: "parquet".into(),
single_file_output: None,
};
let parquet_sink = Arc::new(ParquetSink::new(
file_sink_config,
Expand Down
25 changes: 24 additions & 1 deletion datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,8 +549,30 @@ impl DefaultPhysicalPlanner {
}
};

// Parse single_file_output option if explicitly set
let single_file_output = match source_option_tuples
.get("single_file_output")
.map(|v| v.trim())
{
None => None,
Some("true") => Some(true),
Some("false") => Some(false),
Some(value) => {
return Err(DataFusionError::Configuration(format!(
"provided value for 'single_file_output' was not recognized: \"{value}\""
)));
}
};

// Filter out sink-related options that are not format options
let format_options: HashMap<String, String> = source_option_tuples
.iter()
.filter(|(k, _)| k.as_str() != "single_file_output")
.map(|(k, v)| (k.clone(), v.clone()))
.collect();

let sink_format = file_type_to_format(file_type)?
.create(session_state, source_option_tuples)?;
.create(session_state, &format_options)?;

// Determine extension based on format extension and compression
let file_extension = match sink_format.compression_type() {
Expand All @@ -571,6 +593,7 @@ impl DefaultPhysicalPlanner {
insert_op: InsertOp::Append,
keep_partition_by_columns,
file_extension,
single_file_output,
};

let ordering = input_exec.properties().output_ordering().cloned();
Expand Down
5 changes: 5 additions & 0 deletions datafusion/datasource/src/file_sink_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ pub struct FileSinkConfig {
pub keep_partition_by_columns: bool,
/// File extension without a dot(.)
pub file_extension: String,
/// Override for single file output behavior.
/// - `None`: use extension heuristic (path with extension = single file)
/// - `Some(true)`: force single file output at exact path
/// - `Some(false)`: force directory output with generated filenames
pub single_file_output: Option<bool>,
}

impl FileSinkConfig {
Expand Down
7 changes: 5 additions & 2 deletions datafusion/datasource/src/write/demux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,11 @@ pub(crate) fn start_demuxer_task(
let file_extension = config.file_extension.clone();
let base_output_path = config.table_paths[0].clone();
let task = if config.table_partition_cols.is_empty() {
let single_file_output = !base_output_path.is_collection()
&& base_output_path.file_extension().is_some();
// Use explicit single_file_output if set, otherwise fall back to extension heuristic
let single_file_output = config.single_file_output.unwrap_or_else(|| {
!base_output_path.is_collection()
&& base_output_path.file_extension().is_some()
});
SpawnedTask::spawn(async move {
row_count_demuxer(
tx,
Expand Down
5 changes: 5 additions & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,11 @@ message FileSinkConfig {
bool keep_partition_by_columns = 9;
InsertOp insert_op = 10;
string file_extension = 11;
// Optional override for single file output behavior.
// When not set, uses extension heuristic (path with extension = single file).
// When set to true, forces single file output at exact path.
// When set to false, forces directory output with generated filenames.
optional bool single_file_output = 12;
}

enum InsertOp {
Expand Down
18 changes: 18 additions & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions datafusion/proto/src/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,8 @@ impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig {
insert_op,
keep_partition_by_columns: conf.keep_partition_by_columns,
file_extension: conf.file_extension.clone(),
// Read from proto; None if not present (backward compatible with old plans)
single_file_output: conf.single_file_output,
})
}
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/src/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,7 @@ impl TryFrom<&FileSinkConfig> for protobuf::FileSinkConfig {
keep_partition_by_columns: conf.keep_partition_by_columns,
insert_op: conf.insert_op as i32,
file_extension: conf.file_extension.to_string(),
single_file_output: conf.single_file_output,
})
}
}
3 changes: 3 additions & 0 deletions datafusion/proto/tests/cases/roundtrip_physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1475,6 +1475,7 @@ fn roundtrip_json_sink() -> Result<()> {
insert_op: InsertOp::Overwrite,
keep_partition_by_columns: true,
file_extension: "json".into(),
single_file_output: None,
};
let data_sink = Arc::new(JsonSink::new(
file_sink_config,
Expand Down Expand Up @@ -1513,6 +1514,7 @@ fn roundtrip_csv_sink() -> Result<()> {
insert_op: InsertOp::Overwrite,
keep_partition_by_columns: true,
file_extension: "csv".into(),
single_file_output: None,
};
let data_sink = Arc::new(CsvSink::new(
file_sink_config,
Expand Down Expand Up @@ -1570,6 +1572,7 @@ fn roundtrip_parquet_sink() -> Result<()> {
insert_op: InsertOp::Overwrite,
keep_partition_by_columns: true,
file_extension: "parquet".into(),
single_file_output: None,
};
let data_sink = Arc::new(ParquetSink::new(
file_sink_config,
Expand Down
Loading