Skip to content

Commit 20870da

Browse files
adriangbclaudezhuqi-lucas
authored
infer parquet file order from metadata and use it to optimize scans (#19433)
The idea here is to use the metadata in parquet files to infer sort orders, thus it is not required for users to specify it manually. This should probably be split into multiple PRs: - Record sort order when writing into a table created as `WITH ORDER` - Refactor `PartitionedFile` construction - Collect ordering during statistics collection --------- Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com> Co-authored-by: Qi Zhu <821684824@qq.com>
1 parent 209a0a2 commit 20870da

File tree

7 files changed

+535
-43
lines changed

7 files changed

+535
-43
lines changed

datafusion/catalog-listing/src/table.rs

Lines changed: 274 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -337,16 +337,103 @@ impl ListingTable {
337337
self.options.format.file_source(table_schema)
338338
}
339339

340-
/// If file_sort_order is specified, creates the appropriate physical expressions
340+
/// Creates output ordering from user-specified file_sort_order or derives
341+
/// from file orderings when user doesn't specify.
342+
///
343+
/// If user specified `file_sort_order`, that takes precedence.
344+
/// Otherwise, attempts to derive common ordering from file orderings in
345+
/// the provided file groups.
341346
pub fn try_create_output_ordering(
342347
&self,
343348
execution_props: &ExecutionProps,
349+
file_groups: &[FileGroup],
344350
) -> datafusion_common::Result<Vec<LexOrdering>> {
345-
create_lex_ordering(
346-
&self.table_schema,
347-
&self.options.file_sort_order,
348-
execution_props,
349-
)
351+
// If user specified sort order, use that
352+
if !self.options.file_sort_order.is_empty() {
353+
return create_lex_ordering(
354+
&self.table_schema,
355+
&self.options.file_sort_order,
356+
execution_props,
357+
);
358+
}
359+
if let Some(ordering) = derive_common_ordering_from_files(file_groups) {
360+
return Ok(vec![ordering]);
361+
}
362+
Ok(vec![])
363+
}
364+
}
365+
366+
/// Derives a common ordering from file orderings across all file groups.
367+
///
368+
/// Returns the common ordering if all files have compatible orderings,
369+
/// otherwise returns None.
370+
///
371+
/// The function finds the longest common prefix among all file orderings.
372+
/// For example, if files have orderings `[a, b, c]` and `[a, b]`, the common
373+
/// ordering is `[a, b]`.
374+
fn derive_common_ordering_from_files(file_groups: &[FileGroup]) -> Option<LexOrdering> {
375+
enum CurrentOrderingState {
376+
/// Initial state before processing any files
377+
FirstFile,
378+
/// Some common ordering found so far
379+
SomeOrdering(LexOrdering),
380+
/// No files have ordering
381+
NoOrdering,
382+
}
383+
let mut state = CurrentOrderingState::FirstFile;
384+
385+
// Collect file orderings and track counts
386+
for group in file_groups {
387+
for file in group.iter() {
388+
state = match (&state, &file.ordering) {
389+
// If this is the first file with ordering, set it as current
390+
(CurrentOrderingState::FirstFile, Some(ordering)) => {
391+
CurrentOrderingState::SomeOrdering(ordering.clone())
392+
}
393+
(CurrentOrderingState::FirstFile, None) => {
394+
CurrentOrderingState::NoOrdering
395+
}
396+
// If we have an existing ordering, find common prefix with new ordering
397+
(CurrentOrderingState::SomeOrdering(current), Some(ordering)) => {
398+
// Find common prefix between current and new ordering
399+
let prefix_len = current
400+
.as_ref()
401+
.iter()
402+
.zip(ordering.as_ref().iter())
403+
.take_while(|(a, b)| a == b)
404+
.count();
405+
if prefix_len == 0 {
406+
log::trace!(
407+
"Cannot derive common ordering: no common prefix between orderings {current:?} and {ordering:?}"
408+
);
409+
return None;
410+
} else {
411+
let ordering =
412+
LexOrdering::new(current.as_ref()[..prefix_len].to_vec())
413+
.expect("prefix_len > 0, so ordering must be valid");
414+
CurrentOrderingState::SomeOrdering(ordering)
415+
}
416+
}
417+
// If one file has ordering and another doesn't, no common ordering
418+
// Return None and log a trace message explaining why
419+
(CurrentOrderingState::SomeOrdering(ordering), None)
420+
| (CurrentOrderingState::NoOrdering, Some(ordering)) => {
421+
log::trace!(
422+
"Cannot derive common ordering: some files have ordering {ordering:?}, others don't"
423+
);
424+
return None;
425+
}
426+
// Both have no ordering, remain in NoOrdering state
427+
(CurrentOrderingState::NoOrdering, None) => {
428+
CurrentOrderingState::NoOrdering
429+
}
430+
};
431+
}
432+
}
433+
434+
match state {
435+
CurrentOrderingState::SomeOrdering(ordering) => Some(ordering),
436+
_ => None,
350437
}
351438
}
352439

@@ -439,7 +526,10 @@ impl TableProvider for ListingTable {
439526
return Ok(ScanResult::new(Arc::new(EmptyExec::new(projected_schema))));
440527
}
441528

442-
let output_ordering = self.try_create_output_ordering(state.execution_props())?;
529+
let output_ordering = self.try_create_output_ordering(
530+
state.execution_props(),
531+
&partitioned_file_lists,
532+
)?;
443533
match state
444534
.config_options()
445535
.execution
@@ -586,7 +676,8 @@ impl TableProvider for ListingTable {
586676
file_extension: self.options().format.get_ext(),
587677
};
588678

589-
let orderings = self.try_create_output_ordering(state.execution_props())?;
679+
// For writes, we only use user-specified ordering (no file groups to derive from)
680+
let orderings = self.try_create_output_ordering(state.execution_props(), &[])?;
590681
// It is sufficient to pass only one of the equivalent orderings:
591682
let order_requirements = orderings.into_iter().next().map(Into::into);
592683

@@ -635,16 +726,19 @@ impl ListingTable {
635726
let meta_fetch_concurrency =
636727
ctx.config_options().execution.meta_fetch_concurrency;
637728
let file_list = stream::iter(file_list).flatten_unordered(meta_fetch_concurrency);
638-
// collect the statistics if required by the config
729+
// collect the statistics and ordering if required by the config
639730
let files = file_list
640731
.map(|part_file| async {
641732
let part_file = part_file?;
642-
let statistics = if self.options.collect_stat {
643-
self.do_collect_statistics(ctx, &store, &part_file).await?
733+
let (statistics, ordering) = if self.options.collect_stat {
734+
self.do_collect_statistics_and_ordering(ctx, &store, &part_file)
735+
.await?
644736
} else {
645-
Arc::new(Statistics::new_unknown(&self.file_schema))
737+
(Arc::new(Statistics::new_unknown(&self.file_schema)), None)
646738
};
647-
Ok(part_file.with_statistics(statistics))
739+
Ok(part_file
740+
.with_statistics(statistics)
741+
.with_ordering(ordering))
648742
})
649743
.boxed()
650744
.buffer_unordered(ctx.config_options().execution.meta_fetch_concurrency);
@@ -699,53 +793,50 @@ impl ListingTable {
699793
})
700794
}
701795

702-
/// Collects statistics for a given partitioned file.
796+
/// Collects statistics and ordering for a given partitioned file.
703797
///
704-
/// This method first checks if the statistics for the given file are already cached.
705-
/// If they are, it returns the cached statistics.
706-
/// If they are not, it infers the statistics from the file and stores them in the cache.
707-
async fn do_collect_statistics(
798+
/// This method checks if statistics are cached. If cached, it returns the
799+
/// cached statistics and infers ordering separately. If not cached, it infers
800+
/// both statistics and ordering in a single metadata read for efficiency.
801+
async fn do_collect_statistics_and_ordering(
708802
&self,
709803
ctx: &dyn Session,
710804
store: &Arc<dyn ObjectStore>,
711805
part_file: &PartitionedFile,
712-
) -> datafusion_common::Result<Arc<Statistics>> {
806+
) -> datafusion_common::Result<(Arc<Statistics>, Option<LexOrdering>)> {
713807
use datafusion_execution::cache::cache_manager::CachedFileMetadata;
714808

715-
// Check cache first
716-
if let Some(cached) = self
717-
.collected_statistics
718-
.get(&part_file.object_meta.location)
809+
let path = &part_file.object_meta.location;
810+
let meta = &part_file.object_meta;
811+
812+
// Check cache first - if we have valid cached statistics and ordering
813+
if let Some(cached) = self.collected_statistics.get(path)
814+
&& cached.is_valid_for(meta)
719815
{
720-
// Validate that cached entry is still valid
721-
if cached.is_valid_for(&part_file.object_meta) {
722-
return Ok(cached.statistics);
723-
}
816+
// Return cached statistics and ordering
817+
return Ok((Arc::clone(&cached.statistics), cached.ordering.clone()));
724818
}
725819

726-
// Cache miss or invalid - infer statistics
727-
let statistics = self
820+
// Cache miss or invalid: fetch both statistics and ordering in a single metadata read
821+
let file_meta = self
728822
.options
729823
.format
730-
.infer_stats(
731-
ctx,
732-
store,
733-
Arc::clone(&self.file_schema),
734-
&part_file.object_meta,
735-
)
824+
.infer_stats_and_ordering(ctx, store, Arc::clone(&self.file_schema), meta)
736825
.await?;
737-
let statistics = Arc::new(statistics);
826+
827+
let statistics = Arc::new(file_meta.statistics);
738828

739829
// Store in cache
740830
self.collected_statistics.put(
741-
&part_file.object_meta.location,
831+
path,
742832
CachedFileMetadata::new(
743-
part_file.object_meta.clone(),
833+
meta.clone(),
744834
Arc::clone(&statistics),
745-
None, // No ordering information in this PR
835+
file_meta.ordering.clone(),
746836
),
747837
);
748-
Ok(statistics)
838+
839+
Ok((statistics, file_meta.ordering))
749840
}
750841
}
751842

@@ -821,3 +912,146 @@ async fn get_files_with_limit(
821912
let inexact_stats = all_files.next().await.is_some();
822913
Ok((file_group, inexact_stats))
823914
}
915+
916+
#[cfg(test)]
917+
mod tests {
918+
use super::*;
919+
use arrow::compute::SortOptions;
920+
use datafusion_physical_expr::expressions::Column;
921+
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
922+
use std::sync::Arc;
923+
924+
/// Helper to create a PhysicalSortExpr
925+
fn sort_expr(
926+
name: &str,
927+
idx: usize,
928+
descending: bool,
929+
nulls_first: bool,
930+
) -> PhysicalSortExpr {
931+
PhysicalSortExpr::new(
932+
Arc::new(Column::new(name, idx)),
933+
SortOptions {
934+
descending,
935+
nulls_first,
936+
},
937+
)
938+
}
939+
940+
/// Helper to create a LexOrdering (unwraps the Option)
941+
fn lex_ordering(exprs: Vec<PhysicalSortExpr>) -> LexOrdering {
942+
LexOrdering::new(exprs).expect("expected non-empty ordering")
943+
}
944+
945+
/// Helper to create a PartitionedFile with optional ordering
946+
fn create_file(name: &str, ordering: Option<LexOrdering>) -> PartitionedFile {
947+
PartitionedFile::new(name.to_string(), 1024).with_ordering(ordering)
948+
}
949+
950+
#[test]
951+
fn test_derive_common_ordering_all_files_same_ordering() {
952+
// All files have the same ordering -> returns that ordering
953+
let ordering = lex_ordering(vec![
954+
sort_expr("a", 0, false, true),
955+
sort_expr("b", 1, true, false),
956+
]);
957+
958+
let file_groups = vec![
959+
FileGroup::new(vec![
960+
create_file("f1.parquet", Some(ordering.clone())),
961+
create_file("f2.parquet", Some(ordering.clone())),
962+
]),
963+
FileGroup::new(vec![create_file("f3.parquet", Some(ordering.clone()))]),
964+
];
965+
966+
let result = derive_common_ordering_from_files(&file_groups);
967+
assert_eq!(result, Some(ordering));
968+
}
969+
970+
#[test]
971+
fn test_derive_common_ordering_common_prefix() {
972+
// Files have different orderings but share a common prefix
973+
let ordering_abc = lex_ordering(vec![
974+
sort_expr("a", 0, false, true),
975+
sort_expr("b", 1, false, true),
976+
sort_expr("c", 2, false, true),
977+
]);
978+
let ordering_ab = lex_ordering(vec![
979+
sort_expr("a", 0, false, true),
980+
sort_expr("b", 1, false, true),
981+
]);
982+
983+
let file_groups = vec![FileGroup::new(vec![
984+
create_file("f1.parquet", Some(ordering_abc)),
985+
create_file("f2.parquet", Some(ordering_ab.clone())),
986+
])];
987+
988+
let result = derive_common_ordering_from_files(&file_groups);
989+
assert_eq!(result, Some(ordering_ab));
990+
}
991+
992+
#[test]
993+
fn test_derive_common_ordering_no_common_prefix() {
994+
// Files have completely different orderings -> returns None
995+
let ordering_a = lex_ordering(vec![sort_expr("a", 0, false, true)]);
996+
let ordering_b = lex_ordering(vec![sort_expr("b", 1, false, true)]);
997+
998+
let file_groups = vec![FileGroup::new(vec![
999+
create_file("f1.parquet", Some(ordering_a)),
1000+
create_file("f2.parquet", Some(ordering_b)),
1001+
])];
1002+
1003+
let result = derive_common_ordering_from_files(&file_groups);
1004+
assert_eq!(result, None);
1005+
}
1006+
1007+
#[test]
1008+
fn test_derive_common_ordering_mixed_with_none() {
1009+
// Some files have ordering, some don't -> returns None
1010+
let ordering = lex_ordering(vec![sort_expr("a", 0, false, true)]);
1011+
1012+
let file_groups = vec![FileGroup::new(vec![
1013+
create_file("f1.parquet", Some(ordering)),
1014+
create_file("f2.parquet", None),
1015+
])];
1016+
1017+
let result = derive_common_ordering_from_files(&file_groups);
1018+
assert_eq!(result, None);
1019+
}
1020+
1021+
#[test]
1022+
fn test_derive_common_ordering_all_none() {
1023+
// No files have ordering -> returns None
1024+
let file_groups = vec![FileGroup::new(vec![
1025+
create_file("f1.parquet", None),
1026+
create_file("f2.parquet", None),
1027+
])];
1028+
1029+
let result = derive_common_ordering_from_files(&file_groups);
1030+
assert_eq!(result, None);
1031+
}
1032+
1033+
#[test]
1034+
fn test_derive_common_ordering_empty_groups() {
1035+
// Empty file groups -> returns None
1036+
let file_groups: Vec<FileGroup> = vec![];
1037+
let result = derive_common_ordering_from_files(&file_groups);
1038+
assert_eq!(result, None);
1039+
}
1040+
1041+
#[test]
1042+
fn test_derive_common_ordering_single_file() {
1043+
// Single file with ordering -> returns that ordering
1044+
let ordering = lex_ordering(vec![
1045+
sort_expr("a", 0, false, true),
1046+
sort_expr("b", 1, true, false),
1047+
]);
1048+
1049+
let file_groups = vec![FileGroup::new(vec![create_file(
1050+
"f1.parquet",
1051+
Some(ordering.clone()),
1052+
)])];
1053+
1054+
let result = derive_common_ordering_from_files(&file_groups);
1055+
assert_eq!(result, Some(ordering));
1056+
}
1057+
}

datafusion/core/src/datasource/listing/table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,7 @@ mod tests {
347347
let table =
348348
ListingTable::try_new(config.clone()).expect("Creating the table");
349349
let ordering_result =
350-
table.try_create_output_ordering(state.execution_props());
350+
table.try_create_output_ordering(state.execution_props(), &[]);
351351

352352
match (expected_result, ordering_result) {
353353
(Ok(expected), Ok(result)) => {

0 commit comments

Comments
 (0)