Skip to content

Commit 2b7bf6d

Browse files
committed
wip on tiered ordering of files
1 parent ceaa07d commit 2b7bf6d

File tree

2 files changed

+743
-63
lines changed

2 files changed

+743
-63
lines changed

datafusion/core/src/datasource/listing/table.rs

Lines changed: 128 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use datafusion_datasource::{
4040
compute_all_files_statistics,
4141
file::FileSource,
4242
file_groups::FileGroup,
43-
file_scan_config::{FileScanConfig, FileScanConfigBuilder},
43+
file_scan_config::{FileGroupPartitioning, FileScanConfig, FileScanConfigBuilder},
4444
schema_adapter::{DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory},
4545
};
4646
use datafusion_execution::{
@@ -1131,6 +1131,124 @@ impl ListingTable {
11311131
fn try_create_output_ordering(&self) -> Result<Vec<LexOrdering>> {
11321132
create_ordering(&self.table_schema, &self.options.file_sort_order)
11331133
}
1134+
1135+
/// Checks if the requested ordering can be satisfied using file statistics.
1136+
///
1137+
/// Only simple column references (not expressions) can be used for file ordering
1138+
/// because statistics are typically available only at the column level.
1139+
fn can_use_ordering_from_statistics(&self, ordering: &[SortExpr]) -> bool {
1140+
ordering.iter().all(|sort_expr| {
1141+
// Check if sort expression contains only simple column references
1142+
let mut is_simple_column = true;
1143+
let _ = sort_expr.apply_elements(|e| {
1144+
if !matches!(e, Expr::Column(_)) {
1145+
is_simple_column = false;
1146+
Ok(TreeNodeRecursion::Stop)
1147+
} else {
1148+
Ok(TreeNodeRecursion::Continue)
1149+
}
1150+
});
1151+
is_simple_column
1152+
})
1153+
}
1154+
1155+
/// Resolves the desired file ordering based on query requirements and natural ordering.
1156+
///
1157+
/// This method prioritizes query-requested ordering if it can be satisfied using statistics,
1158+
/// otherwise falls back to any natural file ordering defined in the table configuration.
1159+
fn resolve_desired_ordering(
1160+
&self,
1161+
requested_ordering: Option<&[SortExpr]>,
1162+
) -> Result<Option<LexOrdering>> {
1163+
// Check if query requested specific ordering that we can use
1164+
if let Some(ordering) = requested_ordering {
1165+
if !ordering.is_empty() && self.can_use_ordering_from_statistics(ordering) {
1166+
return create_ordering(&self.table_schema, &[ordering.to_vec()])
1167+
.map(|orderings| orderings.first().cloned());
1168+
}
1169+
}
1170+
1171+
// Fall back to natural file ordering if any
1172+
self.try_create_output_ordering()
1173+
.map(|orderings| orderings.first().cloned())
1174+
}
1175+
1176+
/// Determines the optimal file grouping and ordering strategy.
1177+
///
1178+
/// This method orchestrates the file grouping process by:
1179+
/// 1. Resolving the desired ordering (query-requested vs natural)
1180+
/// 2. Applying statistics-based splitting if enabled and available
1181+
/// 3. Returning both the file groups and any output ordering that can be guaranteed
1182+
///
1183+
/// # Arguments
1184+
/// * `state` - The session state containing configuration options
1185+
/// * `partitioned_file_lists` - Original file groups to potentially reorganize
1186+
/// * `requested_ordering` - Ordering requested by the query, if any
1187+
///
1188+
/// # Returns
1189+
/// A tuple of (file_groups, optional_output_ordering) where:
1190+
/// - file_groups: The optimized file group arrangement
1191+
/// - optional_output_ordering: Output ordering that can be guaranteed (if any)
1192+
fn determine_file_groups_and_ordering(
1193+
&self,
1194+
state: &dyn Session,
1195+
partitioned_file_lists: Vec<FileGroup>,
1196+
requested_ordering: Option<&[SortExpr]>,
1197+
) -> Result<(Vec<FileGroup>, Option<Vec<LexOrdering>>)> {
1198+
// 1. Determine desired ordering (query-requested vs natural)
1199+
let desired_ordering = self.resolve_desired_ordering(requested_ordering)?;
1200+
1201+
// 2. Check if statistics-based splitting is enabled
1202+
if !state
1203+
.config_options()
1204+
.execution
1205+
.split_file_groups_by_statistics
1206+
{
1207+
return Ok((partitioned_file_lists, desired_ordering.map(|o| vec![o])));
1208+
}
1209+
1210+
// 3. Apply statistics-based splitting if we have an ordering requirement
1211+
let Some(ordering) = desired_ordering else {
1212+
// No ordering requirement, keep original groups
1213+
return Ok((partitioned_file_lists, None));
1214+
};
1215+
1216+
match FileScanConfig::split_groups_by_statistics_with_overlap_handling(
1217+
&self.table_schema,
1218+
&partitioned_file_lists,
1219+
&ordering,
1220+
self.options.target_partitions,
1221+
) {
1222+
Ok(FileGroupPartitioning::TotalOrder(groups)) => {
1223+
// Files don't overlap - can guarantee output ordering
1224+
log::debug!(
1225+
"Files arranged in total order across {} partitions",
1226+
groups.len()
1227+
);
1228+
Ok((groups, Some(vec![ordering])))
1229+
}
1230+
Ok(FileGroupPartitioning::PartialOrder(groups)) => {
1231+
// Files overlap but are ordered within partitions
1232+
log::debug!(
1233+
"Files arranged in partial order across {} partitions",
1234+
groups.len()
1235+
);
1236+
Ok((groups, None))
1237+
}
1238+
Ok(FileGroupPartitioning::Unordered(groups)) => {
1239+
// No statistics available, files ordered by path
1240+
log::debug!(
1241+
"Files arranged by path across {} partitions (no statistics)",
1242+
groups.len()
1243+
);
1244+
Ok((groups, None))
1245+
}
1246+
Err(e) => {
1247+
log::debug!("Failed to split file groups by statistics: {e}");
1248+
Ok((partitioned_file_lists, None))
1249+
}
1250+
}
1251+
}
11341252
}
11351253

11361254
// Expressions can be used for parttion pruning if they can be evaluated using
@@ -1207,7 +1325,7 @@ impl TableProvider for ListingTable {
12071325
// at the same time. This is because the limit should be applied after the filters are applied.
12081326
let statistic_file_limit = if filters.is_empty() { limit } else { None };
12091327

1210-
let (mut partitioned_file_lists, statistics) = self
1328+
let (partitioned_file_lists, statistics) = self
12111329
.list_files_for_scan(state, &partition_filters, statistic_file_limit)
12121330
.await?;
12131331

@@ -1220,66 +1338,13 @@ impl TableProvider for ListingTable {
12201338
));
12211339
}
12221340

1223-
let known_file_ordering = self.try_create_output_ordering()?;
1224-
let desired_file_ordering = match args.preferred_ordering() {
1225-
Some(ordering) if !ordering.is_empty() => {
1226-
// Prefer the ordering requested by the query to any natural file ordering.
1227-
// We'll try to re-order the file reads to match the requested ordering as best we can using statistics.
1228-
// Whatever the result is, it's likely better than a natural file ordering that doesn't match the query's ordering.
1229-
// But we can only do this if the query's ordering is a simple ordering of columns (no expressions).
1230-
let can_use_preferred_ordering = ordering.iter().all(|sort_expr| {
1231-
let mut contains_only_columns = true;
1232-
sort_expr
1233-
.apply_elements(|e| {
1234-
if !matches!(e, Expr::Column(_)) {
1235-
contains_only_columns = false;
1236-
Ok(TreeNodeRecursion::Stop)
1237-
} else {
1238-
Ok(TreeNodeRecursion::Continue)
1239-
}
1240-
})
1241-
.expect("infallible closure cannot fail");
1242-
contains_only_columns
1243-
});
1244-
if can_use_preferred_ordering {
1245-
create_ordering(&self.table_schema, &[ordering.to_vec()])?
1246-
.first()
1247-
.cloned()
1248-
} else {
1249-
known_file_ordering.first().cloned()
1250-
}
1251-
}
1252-
Some(_) | None => {
1253-
// If the query did not request a specific ordering, fall back to any inherent file ordering
1254-
known_file_ordering.first().cloned()
1255-
}
1256-
};
1257-
match state
1258-
.config_options()
1259-
.execution
1260-
.split_file_groups_by_statistics
1261-
.then(|| {
1262-
desired_file_ordering.map(|ordering| {
1263-
FileScanConfig::split_groups_by_statistics_with_target_partitions(
1264-
&self.table_schema,
1265-
&partitioned_file_lists,
1266-
&ordering,
1267-
self.options.target_partitions,
1268-
)
1269-
})
1270-
})
1271-
.flatten()
1272-
{
1273-
Some(Err(e)) => log::debug!("failed to split file groups by statistics: {e}"),
1274-
Some(Ok(new_groups)) => {
1275-
if new_groups.len() <= self.options.target_partitions {
1276-
partitioned_file_lists = new_groups;
1277-
} else {
1278-
log::debug!("attempted to split file groups by statistics, but there were more file groups than target_partitions; falling back to unordered")
1279-
}
1280-
}
1281-
None => {} // no ordering required
1282-
};
1341+
// Determine optimal file grouping and ordering strategy
1342+
let (partitioned_file_lists, output_ordering) = self
1343+
.determine_file_groups_and_ordering(
1344+
state,
1345+
partitioned_file_lists,
1346+
args.preferred_ordering(),
1347+
)?;
12831348

12841349
let Some(object_store_url) =
12851350
self.table_paths.first().map(ListingTableUrl::object_store)
@@ -1308,7 +1373,7 @@ impl TableProvider for ListingTable {
13081373
.with_statistics(statistics)
13091374
.with_projection(projection)
13101375
.with_limit(limit)
1311-
.with_output_ordering(known_file_ordering)
1376+
.with_output_ordering(output_ordering.unwrap_or_default())
13121377
.with_table_partition_cols(table_partition_cols)
13131378
.with_expr_adapter(self.expr_adapter_factory.clone())
13141379
.build(),

0 commit comments

Comments
 (0)