From 879e0e15a071473fb0f52228de9ee9465cb27570 Mon Sep 17 00:00:00 2001 From: Emily Matheys Date: Tue, 12 Aug 2025 17:15:17 +0300 Subject: [PATCH 1/4] Add optional extended metrics to sort_batch function --- .../aggregation_fuzzer/data_generator.rs | 2 +- .../physical-plan/src/aggregates/row_hash.rs | 12 +++- .../physical-plan/src/sorts/partial_sort.rs | 9 ++- datafusion/physical-plan/src/sorts/sort.rs | 55 +++++++++++++++---- 4 files changed, 59 insertions(+), 19 deletions(-) diff --git a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs index 753a74995d8f..84036fbcf1df 100644 --- a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs +++ b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs @@ -152,7 +152,7 @@ impl DatasetGenerator { .map(|key| col(key, schema).map(PhysicalSortExpr::new_default)) .collect::>>()?; let batch = if let Some(ordering) = LexOrdering::new(sort_exprs) { - sort_batch(&base_batch, &ordering, None)? + sort_batch(&base_batch, &ordering, None, None)? } else { base_batch.clone() }; diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 6132a8b0add5..b637ba798e8d 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -30,7 +30,7 @@ use crate::aggregates::{ PhysicalGroupBy, }; use crate::metrics::{BaselineMetrics, MetricBuilder, RecordOutput}; -use crate::sorts::sort::sort_batch; +use crate::sorts::sort::{sort_batch, LexSortMetrics}; use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder}; use crate::spill::spill_manager::SpillManager; use crate::stream::RecordBatchStreamAdapter; @@ -430,6 +430,9 @@ pub(crate) struct GroupedHashAggregateStream { /// Execution metrics baseline_metrics: BaselineMetrics, + + /// Metrics for sorting in the spill manager + lexsort_metrics: LexSortMetrics, } impl GroupedHashAggregateStream { @@ -447,6 +450,7 @@ impl GroupedHashAggregateStream { let batch_size = context.session_config().batch_size(); let input = agg.input.execute(partition, Arc::clone(&context))?; let baseline_metrics = BaselineMetrics::new(&agg.metrics, partition); + let lexsort_metrics = LexSortMetrics::new(&agg.metrics, partition); let timer = baseline_metrics.elapsed_compute().timer(); @@ -609,6 +613,7 @@ impl GroupedHashAggregateStream { current_group_indices: Default::default(), exec_state, baseline_metrics, + lexsort_metrics, batch_size, group_ordering, input_done: false, @@ -996,7 +1001,7 @@ impl GroupedHashAggregateStream { let Some(emit) = self.emit(EmitTo::All, true)? else { return Ok(()); }; - let sorted = sort_batch(&emit, &self.spill_state.spill_expr, None)?; + let sorted = sort_batch(&emit, &self.spill_state.spill_expr, Some(&self.lexsort_metrics), None)?; // Spill sorted state to disk let spillfile = self @@ -1068,10 +1073,11 @@ impl GroupedHashAggregateStream { let mut streams: Vec = vec![]; let expr = self.spill_state.spill_expr.clone(); let schema = batch.schema(); + let lexsort_metrics = self.lexsort_metrics.clone(); streams.push(Box::pin(RecordBatchStreamAdapter::new( Arc::clone(&schema), futures::stream::once(futures::future::lazy(move |_| { - sort_batch(&batch, &expr, None) + sort_batch(&batch, &expr, Some(&lexsort_metrics), None) })), ))); diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs b/datafusion/physical-plan/src/sorts/partial_sort.rs index 513081e627e1..8b5bb2de8e45 100644 --- a/datafusion/physical-plan/src/sorts/partial_sort.rs +++ b/datafusion/physical-plan/src/sorts/partial_sort.rs @@ -58,7 +58,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; -use crate::sorts::sort::sort_batch; +use crate::sorts::sort::{sort_batch, LexSortMetrics}; use crate::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream, Statistics, @@ -309,6 +309,7 @@ impl ExecutionPlan for PartialSortExec { fetch: self.fetch, is_closed: false, baseline_metrics: BaselineMetrics::new(&self.metrics_set, partition), + lexsort_metrics: LexSortMetrics::new(&self.metrics_set, partition) })) } @@ -341,6 +342,8 @@ struct PartialSortStream { is_closed: bool, /// Execution metrics baseline_metrics: BaselineMetrics, + /// Metrics for performing the actual sort + lexsort_metrics: LexSortMetrics, } impl Stream for PartialSortStream { @@ -398,7 +401,7 @@ impl PartialSortStream { slice_point, self.in_mem_batch.num_rows() - slice_point, ); - let sorted_batch = sort_batch(&sorted, &self.expr, self.fetch)?; + let sorted_batch = sort_batch(&sorted, &self.expr, Some(&self.lexsort_metrics), self.fetch)?; if let Some(fetch) = self.fetch.as_mut() { *fetch -= sorted_batch.num_rows(); } @@ -430,7 +433,7 @@ impl PartialSortStream { fn sort_in_mem_batch(self: &mut Pin<&mut Self>) -> Result { let input_batch = self.in_mem_batch.clone(); self.in_mem_batch = RecordBatch::new_empty(self.schema()); - let result = sort_batch(&input_batch, &self.expr, self.fetch)?; + let result = sort_batch(&input_batch, &self.expr, Some(&self.lexsort_metrics), self.fetch)?; if let Some(remaining_fetch) = self.fetch { // remaining_fetch - result.num_rows() is always be >= 0 // because result length of sort_batch with limit cannot be diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index dc2a5640f40b..616226a6cfc7 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -31,9 +31,7 @@ use crate::filter_pushdown::{ ChildFilterDescription, FilterDescription, FilterPushdownPhase, }; use crate::limit::LimitStream; -use crate::metrics::{ - BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, SpillMetrics, -}; +use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, SpillMetrics, Time}; use crate::projection::{make_with_child, update_ordering, ProjectionExec}; use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder}; use crate::spill::get_record_batch_memory_size; @@ -62,11 +60,32 @@ use datafusion_physical_expr::PhysicalExpr; use futures::{StreamExt, TryStreamExt}; use log::{debug, trace}; +#[derive(Clone, Debug)] +pub struct LexSortMetrics { + pub time_evaluating_sort_columns: Time, + + pub time_calculating_lexsort_indices: Time, + + pub time_taking_indices_in_lexsort: Time, +} + +impl LexSortMetrics { + pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { + Self { + time_evaluating_sort_columns: MetricBuilder::new(metrics).subset_time("time_evaluating_sort_columns", partition), + time_calculating_lexsort_indices: MetricBuilder::new(metrics).subset_time("time_calculating_lexsort_indices", partition), + time_taking_indices_in_lexsort: MetricBuilder::new(metrics).subset_time("time_taking_indices_in_lexsort", partition), + } + } +} + struct ExternalSorterMetrics { /// metrics baseline: BaselineMetrics, spill_metrics: SpillMetrics, + + lexsort_metrics: LexSortMetrics, } impl ExternalSorterMetrics { @@ -74,6 +93,7 @@ impl ExternalSorterMetrics { Self { baseline: BaselineMetrics::new(metrics, partition), spill_metrics: SpillMetrics::new(metrics, partition), + lexsort_metrics: LexSortMetrics::new(metrics, partition) } } } @@ -708,11 +728,12 @@ impl ExternalSorter { ); let schema = batch.schema(); + let lexsort_metrics = self.metrics.lexsort_metrics.clone(); let expressions = self.expr.clone(); let stream = futures::stream::once(async move { let _timer = metrics.elapsed_compute().timer(); - let sorted = sort_batch(&batch, &expressions, None)?; + let sorted = sort_batch(&batch, &expressions, Some(&lexsort_metrics), None)?; metrics.record_output(sorted.num_rows()); drop(batch); @@ -811,15 +832,25 @@ impl Debug for ExternalSorter { pub fn sort_batch( batch: &RecordBatch, expressions: &LexOrdering, + metrics: Option<&LexSortMetrics>, fetch: Option, ) -> Result { - let sort_columns = expressions - .iter() - .map(|expr| expr.evaluate_to_sort_column(batch)) - .collect::>>()?; - - let indices = lexsort_to_indices(&sort_columns, fetch)?; - let mut columns = take_arrays(batch.columns(), &indices, None)?; + let sort_columns = { + let _timer = metrics.map(|metrics| metrics.time_evaluating_sort_columns.timer()); + expressions + .iter() + .map(|expr| expr.evaluate_to_sort_column(batch)) + .collect::>>()? + }; + + let indices = { + let _timer = metrics.map(|metrics| metrics.time_calculating_lexsort_indices.timer()); + lexsort_to_indices(&sort_columns, fetch)? + }; + let mut columns = { + let _timer = metrics.map(|metrics| metrics.time_taking_indices_in_lexsort.timer()); + take_arrays(batch.columns(), &indices, None)? + }; // The columns may be larger than the unsorted columns in `batch` especially for variable length // data types due to exponential growth when building the sort columns. We shrink the columns @@ -2051,7 +2082,7 @@ mod tests { }] .into(); - let result = sort_batch(&batch, &expressions, None).unwrap(); + let result = sort_batch(&batch, &expressions, None, None).unwrap(); assert_eq!(result.num_rows(), 1); } From e7cbf197b51a258fba8c7a108498050c4234a33f Mon Sep 17 00:00:00 2001 From: Emily Matheys Date: Tue, 12 Aug 2025 22:32:50 +0300 Subject: [PATCH 2/4] ok commit hook wasn't enough --- .../physical-plan/src/aggregates/row_hash.rs | 7 ++++- .../physical-plan/src/sorts/partial_sort.rs | 16 +++++++++--- datafusion/physical-plan/src/sorts/sort.rs | 26 ++++++++++++------- 3 files changed, 36 insertions(+), 13 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index b637ba798e8d..9f2d12071ed7 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -1001,7 +1001,12 @@ impl GroupedHashAggregateStream { let Some(emit) = self.emit(EmitTo::All, true)? else { return Ok(()); }; - let sorted = sort_batch(&emit, &self.spill_state.spill_expr, Some(&self.lexsort_metrics), None)?; + let sorted = sort_batch( + &emit, + &self.spill_state.spill_expr, + Some(&self.lexsort_metrics), + None, + )?; // Spill sorted state to disk let spillfile = self diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs b/datafusion/physical-plan/src/sorts/partial_sort.rs index 8b5bb2de8e45..126b652586a2 100644 --- a/datafusion/physical-plan/src/sorts/partial_sort.rs +++ b/datafusion/physical-plan/src/sorts/partial_sort.rs @@ -309,7 +309,7 @@ impl ExecutionPlan for PartialSortExec { fetch: self.fetch, is_closed: false, baseline_metrics: BaselineMetrics::new(&self.metrics_set, partition), - lexsort_metrics: LexSortMetrics::new(&self.metrics_set, partition) + lexsort_metrics: LexSortMetrics::new(&self.metrics_set, partition), })) } @@ -401,7 +401,12 @@ impl PartialSortStream { slice_point, self.in_mem_batch.num_rows() - slice_point, ); - let sorted_batch = sort_batch(&sorted, &self.expr, Some(&self.lexsort_metrics), self.fetch)?; + let sorted_batch = sort_batch( + &sorted, + &self.expr, + Some(&self.lexsort_metrics), + self.fetch, + )?; if let Some(fetch) = self.fetch.as_mut() { *fetch -= sorted_batch.num_rows(); } @@ -433,7 +438,12 @@ impl PartialSortStream { fn sort_in_mem_batch(self: &mut Pin<&mut Self>) -> Result { let input_batch = self.in_mem_batch.clone(); self.in_mem_batch = RecordBatch::new_empty(self.schema()); - let result = sort_batch(&input_batch, &self.expr, Some(&self.lexsort_metrics), self.fetch)?; + let result = sort_batch( + &input_batch, + &self.expr, + Some(&self.lexsort_metrics), + self.fetch, + )?; if let Some(remaining_fetch) = self.fetch { // remaining_fetch - result.num_rows() is always be >= 0 // because result length of sort_batch with limit cannot be diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 616226a6cfc7..137b857b0f5e 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -31,7 +31,10 @@ use crate::filter_pushdown::{ ChildFilterDescription, FilterDescription, FilterPushdownPhase, }; use crate::limit::LimitStream; -use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, SpillMetrics, Time}; +use crate::metrics::{ + BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, SpillMetrics, + Time, +}; use crate::projection::{make_with_child, update_ordering, ProjectionExec}; use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder}; use crate::spill::get_record_batch_memory_size; @@ -71,12 +74,15 @@ pub struct LexSortMetrics { impl LexSortMetrics { pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { - Self { - time_evaluating_sort_columns: MetricBuilder::new(metrics).subset_time("time_evaluating_sort_columns", partition), - time_calculating_lexsort_indices: MetricBuilder::new(metrics).subset_time("time_calculating_lexsort_indices", partition), - time_taking_indices_in_lexsort: MetricBuilder::new(metrics).subset_time("time_taking_indices_in_lexsort", partition), - } + Self { + time_evaluating_sort_columns: MetricBuilder::new(metrics) + .subset_time("time_evaluating_sort_columns", partition), + time_calculating_lexsort_indices: MetricBuilder::new(metrics) + .subset_time("time_calculating_lexsort_indices", partition), + time_taking_indices_in_lexsort: MetricBuilder::new(metrics) + .subset_time("time_taking_indices_in_lexsort", partition), } + } } struct ExternalSorterMetrics { @@ -93,7 +99,7 @@ impl ExternalSorterMetrics { Self { baseline: BaselineMetrics::new(metrics, partition), spill_metrics: SpillMetrics::new(metrics, partition), - lexsort_metrics: LexSortMetrics::new(metrics, partition) + lexsort_metrics: LexSortMetrics::new(metrics, partition), } } } @@ -844,11 +850,13 @@ pub fn sort_batch( }; let indices = { - let _timer = metrics.map(|metrics| metrics.time_calculating_lexsort_indices.timer()); + let _timer = + metrics.map(|metrics| metrics.time_calculating_lexsort_indices.timer()); lexsort_to_indices(&sort_columns, fetch)? }; let mut columns = { - let _timer = metrics.map(|metrics| metrics.time_taking_indices_in_lexsort.timer()); + let _timer = + metrics.map(|metrics| metrics.time_taking_indices_in_lexsort.timer()); take_arrays(batch.columns(), &indices, None)? }; From ff9f759e7bc620f527f15f1a41e0b7bcfde38a23 Mon Sep 17 00:00:00 2001 From: Emily Matheys Date: Thu, 14 Aug 2025 13:15:22 +0300 Subject: [PATCH 3/4] update doc, remove evaluation metrics --- .../physical-plan/src/aggregates/row_hash.rs | 2 +- datafusion/physical-plan/src/sorts/sort.rs | 16 ++++------------ 2 files changed, 5 insertions(+), 13 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 9f2d12071ed7..c182de6451b5 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -431,7 +431,7 @@ pub(crate) struct GroupedHashAggregateStream { /// Execution metrics baseline_metrics: BaselineMetrics, - /// Metrics for sorting in the spill manager + /// Metrics for the sort_batch function itself, used when spilling and when stream-merging. lexsort_metrics: LexSortMetrics, } diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 137b857b0f5e..e5950a9d5173 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -65,8 +65,6 @@ use log::{debug, trace}; #[derive(Clone, Debug)] pub struct LexSortMetrics { - pub time_evaluating_sort_columns: Time, - pub time_calculating_lexsort_indices: Time, pub time_taking_indices_in_lexsort: Time, @@ -75,8 +73,6 @@ pub struct LexSortMetrics { impl LexSortMetrics { pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { Self { - time_evaluating_sort_columns: MetricBuilder::new(metrics) - .subset_time("time_evaluating_sort_columns", partition), time_calculating_lexsort_indices: MetricBuilder::new(metrics) .subset_time("time_calculating_lexsort_indices", partition), time_taking_indices_in_lexsort: MetricBuilder::new(metrics) @@ -841,17 +837,13 @@ pub fn sort_batch( metrics: Option<&LexSortMetrics>, fetch: Option, ) -> Result { - let sort_columns = { - let _timer = metrics.map(|metrics| metrics.time_evaluating_sort_columns.timer()); - expressions - .iter() - .map(|expr| expr.evaluate_to_sort_column(batch)) - .collect::>>()? - }; - let indices = { let _timer = metrics.map(|metrics| metrics.time_calculating_lexsort_indices.timer()); + let sort_columns = expressions + .iter() + .map(|expr| expr.evaluate_to_sort_column(batch)) + .collect::>>()?; lexsort_to_indices(&sort_columns, fetch)? }; let mut columns = { From 900f3069341e88548226fa0aad82cf10249698e1 Mon Sep 17 00:00:00 2001 From: Emily Matheys Date: Mon, 18 Aug 2025 16:00:17 +0300 Subject: [PATCH 4/4] reduce syscalls count, some renaming --- datafusion/physical-plan/src/sorts/sort.rs | 57 +++++++++++++--------- 1 file changed, 34 insertions(+), 23 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index e5950a9d5173..511d7ebbb409 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -52,6 +52,7 @@ use arrow::array::{Array, RecordBatch, RecordBatchOptions, StringViewArray}; use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays}; use arrow::datatypes::SchemaRef; use datafusion_common::config::SpillCompression; +use datafusion_common::instant::Instant; use datafusion_common::{internal_datafusion_err, internal_err, DataFusionError, Result}; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::runtime_env::RuntimeEnv; @@ -63,20 +64,22 @@ use datafusion_physical_expr::PhysicalExpr; use futures::{StreamExt, TryStreamExt}; use log::{debug, trace}; +/// Our current sort_batch implementation can be divided into two sections: +/// 1. The first section calculates the indices of the sorted order +/// 2. We use a take kernel to recreate the batch in sorted order #[derive(Clone, Debug)] pub struct LexSortMetrics { - pub time_calculating_lexsort_indices: Time, - - pub time_taking_indices_in_lexsort: Time, + pub calculate_indices_time: Time, + pub recreate_batch_time: Time, } impl LexSortMetrics { pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { Self { - time_calculating_lexsort_indices: MetricBuilder::new(metrics) - .subset_time("time_calculating_lexsort_indices", partition), - time_taking_indices_in_lexsort: MetricBuilder::new(metrics) - .subset_time("time_taking_indices_in_lexsort", partition), + calculate_indices_time: MetricBuilder::new(metrics) + .subset_time("calculate_indices_time", partition), + recreate_batch_time: MetricBuilder::new(metrics) + .subset_time("recreate_batch_time", partition), } } } @@ -837,28 +840,36 @@ pub fn sort_batch( metrics: Option<&LexSortMetrics>, fetch: Option, ) -> Result { - let indices = { - let _timer = - metrics.map(|metrics| metrics.time_calculating_lexsort_indices.timer()); - let sort_columns = expressions - .iter() - .map(|expr| expr.evaluate_to_sort_column(batch)) - .collect::>>()?; - lexsort_to_indices(&sort_columns, fetch)? - }; - let mut columns = { - let _timer = - metrics.map(|metrics| metrics.time_taking_indices_in_lexsort.timer()); - take_arrays(batch.columns(), &indices, None)? + let start_time = Instant::now(); + + let sort_columns = expressions + .iter() + .map(|expr| expr.evaluate_to_sort_column(batch)) + .collect::>>()?; + let indices = lexsort_to_indices(&sort_columns, fetch)?; + let indices_time = if let Some(metrics) = metrics { + let indices_time = Instant::now(); + metrics + .calculate_indices_time + .add_duration(indices_time - start_time); + Some(indices_time) + } else { + None }; + let mut columns = take_arrays(batch.columns(), &indices, None)?; // The columns may be larger than the unsorted columns in `batch` especially for variable length // data types due to exponential growth when building the sort columns. We shrink the columns // to prevent memory reservation failures, as well as excessive memory allocation when running // merges in `SortPreservingMergeStream`. - columns.iter_mut().for_each(|c| { - c.shrink_to_fit(); - }); + columns.iter_mut().for_each(|c| c.shrink_to_fit()); + + if let Some(metrics) = metrics { + let batch_time = Instant::now(); + metrics + .recreate_batch_time + .add_duration(batch_time - indices_time.unwrap()); + } let options = RecordBatchOptions::new().with_row_count(Some(indices.len())); Ok(RecordBatch::try_new_with_options(