Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ impl DatasetGenerator {
.map(|key| col(key, schema).map(PhysicalSortExpr::new_default))
.collect::<Result<Vec<_>>>()?;
let batch = if let Some(ordering) = LexOrdering::new(sort_exprs) {
sort_batch(&base_batch, &ordering, None)?
sort_batch(&base_batch, &ordering, None, None)?
} else {
base_batch.clone()
};
Expand Down
17 changes: 14 additions & 3 deletions datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::aggregates::{
PhysicalGroupBy,
};
use crate::metrics::{BaselineMetrics, MetricBuilder, RecordOutput};
use crate::sorts::sort::sort_batch;
use crate::sorts::sort::{sort_batch, LexSortMetrics};
use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
use crate::spill::spill_manager::SpillManager;
use crate::stream::RecordBatchStreamAdapter;
Expand Down Expand Up @@ -430,6 +430,9 @@ pub(crate) struct GroupedHashAggregateStream {

/// Execution metrics
baseline_metrics: BaselineMetrics,

/// Metrics for sorting in the spill manager
lexsort_metrics: LexSortMetrics,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current comment is fine, but perhaps we could clarify that this only applies to sort_batch, not SortPreservingMerge via StreamingMergeBuilder, since "sorting in the spill manager" could imply both.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated^

}

impl GroupedHashAggregateStream {
Expand All @@ -447,6 +450,7 @@ impl GroupedHashAggregateStream {
let batch_size = context.session_config().batch_size();
let input = agg.input.execute(partition, Arc::clone(&context))?;
let baseline_metrics = BaselineMetrics::new(&agg.metrics, partition);
let lexsort_metrics = LexSortMetrics::new(&agg.metrics, partition);

let timer = baseline_metrics.elapsed_compute().timer();

Expand Down Expand Up @@ -609,6 +613,7 @@ impl GroupedHashAggregateStream {
current_group_indices: Default::default(),
exec_state,
baseline_metrics,
lexsort_metrics,
batch_size,
group_ordering,
input_done: false,
Expand Down Expand Up @@ -996,7 +1001,12 @@ impl GroupedHashAggregateStream {
let Some(emit) = self.emit(EmitTo::All, true)? else {
return Ok(());
};
let sorted = sort_batch(&emit, &self.spill_state.spill_expr, None)?;
let sorted = sort_batch(
&emit,
&self.spill_state.spill_expr,
Some(&self.lexsort_metrics),
None,
)?;

// Spill sorted state to disk
let spillfile = self
Expand Down Expand Up @@ -1068,10 +1078,11 @@ impl GroupedHashAggregateStream {
let mut streams: Vec<SendableRecordBatchStream> = vec![];
let expr = self.spill_state.spill_expr.clone();
let schema = batch.schema();
let lexsort_metrics = self.lexsort_metrics.clone();
streams.push(Box::pin(RecordBatchStreamAdapter::new(
Arc::clone(&schema),
futures::stream::once(futures::future::lazy(move |_| {
sort_batch(&batch, &expr, None)
sort_batch(&batch, &expr, Some(&lexsort_metrics), None)
})),
)));

Expand Down
19 changes: 16 additions & 3 deletions datafusion/physical-plan/src/sorts/partial_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ use std::sync::Arc;
use std::task::{Context, Poll};

use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use crate::sorts::sort::sort_batch;
use crate::sorts::sort::{sort_batch, LexSortMetrics};
use crate::{
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties,
Partitioning, PlanProperties, SendableRecordBatchStream, Statistics,
Expand Down Expand Up @@ -309,6 +309,7 @@ impl ExecutionPlan for PartialSortExec {
fetch: self.fetch,
is_closed: false,
baseline_metrics: BaselineMetrics::new(&self.metrics_set, partition),
lexsort_metrics: LexSortMetrics::new(&self.metrics_set, partition),
}))
}

Expand Down Expand Up @@ -341,6 +342,8 @@ struct PartialSortStream {
is_closed: bool,
/// Execution metrics
baseline_metrics: BaselineMetrics,
/// Metrics for performing the actual sort
lexsort_metrics: LexSortMetrics,
}

impl Stream for PartialSortStream {
Expand Down Expand Up @@ -398,7 +401,12 @@ impl PartialSortStream {
slice_point,
self.in_mem_batch.num_rows() - slice_point,
);
let sorted_batch = sort_batch(&sorted, &self.expr, self.fetch)?;
let sorted_batch = sort_batch(
&sorted,
&self.expr,
Some(&self.lexsort_metrics),
self.fetch,
)?;
if let Some(fetch) = self.fetch.as_mut() {
*fetch -= sorted_batch.num_rows();
}
Expand Down Expand Up @@ -430,7 +438,12 @@ impl PartialSortStream {
fn sort_in_mem_batch(self: &mut Pin<&mut Self>) -> Result<RecordBatch> {
let input_batch = self.in_mem_batch.clone();
self.in_mem_batch = RecordBatch::new_empty(self.schema());
let result = sort_batch(&input_batch, &self.expr, self.fetch)?;
let result = sort_batch(
&input_batch,
&self.expr,
Some(&self.lexsort_metrics),
self.fetch,
)?;
if let Some(remaining_fetch) = self.fetch {
// remaining_fetch - result.num_rows() is always be >= 0
// because result length of sort_batch with limit cannot be
Expand Down
59 changes: 49 additions & 10 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ use crate::filter_pushdown::{
};
use crate::limit::LimitStream;
use crate::metrics::{
BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, SpillMetrics,
BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, SpillMetrics,
Time,
};
use crate::projection::{make_with_child, update_ordering, ProjectionExec};
use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
Expand Down Expand Up @@ -62,18 +63,43 @@ use datafusion_physical_expr::PhysicalExpr;
use futures::{StreamExt, TryStreamExt};
use log::{debug, trace};

#[derive(Clone, Debug)]
pub struct LexSortMetrics {
pub time_evaluating_sort_columns: Time,

pub time_calculating_lexsort_indices: Time,

pub time_taking_indices_in_lexsort: Time,
}

impl LexSortMetrics {
pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
Self {
time_evaluating_sort_columns: MetricBuilder::new(metrics)
.subset_time("time_evaluating_sort_columns", partition),
time_calculating_lexsort_indices: MetricBuilder::new(metrics)
.subset_time("time_calculating_lexsort_indices", partition),
time_taking_indices_in_lexsort: MetricBuilder::new(metrics)
.subset_time("time_taking_indices_in_lexsort", partition),
}
}
}

struct ExternalSorterMetrics {
/// metrics
baseline: BaselineMetrics,

spill_metrics: SpillMetrics,

lexsort_metrics: LexSortMetrics,
}

impl ExternalSorterMetrics {
fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
Self {
baseline: BaselineMetrics::new(metrics, partition),
spill_metrics: SpillMetrics::new(metrics, partition),
lexsort_metrics: LexSortMetrics::new(metrics, partition),
}
}
}
Expand Down Expand Up @@ -708,11 +734,12 @@ impl ExternalSorter {
);
let schema = batch.schema();

let lexsort_metrics = self.metrics.lexsort_metrics.clone();
let expressions = self.expr.clone();
let stream = futures::stream::once(async move {
let _timer = metrics.elapsed_compute().timer();

let sorted = sort_batch(&batch, &expressions, None)?;
let sorted = sort_batch(&batch, &expressions, Some(&lexsort_metrics), None)?;

metrics.record_output(sorted.num_rows());
drop(batch);
Expand Down Expand Up @@ -811,15 +838,27 @@ impl Debug for ExternalSorter {
pub fn sort_batch(
batch: &RecordBatch,
expressions: &LexOrdering,
metrics: Option<&LexSortMetrics>,
fetch: Option<usize>,
) -> Result<RecordBatch> {
let sort_columns = expressions
.iter()
.map(|expr| expr.evaluate_to_sort_column(batch))
.collect::<Result<Vec<_>>>()?;

let indices = lexsort_to_indices(&sort_columns, fetch)?;
let mut columns = take_arrays(batch.columns(), &indices, None)?;
let sort_columns = {
let _timer = metrics.map(|metrics| metrics.time_evaluating_sort_columns.timer());
expressions
.iter()
.map(|expr| expr.evaluate_to_sort_column(batch))
.collect::<Result<Vec<_>>>()?
};

let indices = {
let _timer =
metrics.map(|metrics| metrics.time_calculating_lexsort_indices.timer());
lexsort_to_indices(&sort_columns, fetch)?
};
let mut columns = {
let _timer =
metrics.map(|metrics| metrics.time_taking_indices_in_lexsort.timer());
take_arrays(batch.columns(), &indices, None)?
};

// The columns may be larger than the unsorted columns in `batch` especially for variable length
// data types due to exponential growth when building the sort columns. We shrink the columns
Expand Down Expand Up @@ -2051,7 +2090,7 @@ mod tests {
}]
.into();

let result = sort_batch(&batch, &expressions, None).unwrap();
let result = sort_batch(&batch, &expressions, None, None).unwrap();
assert_eq!(result.num_rows(), 1);
}

Expand Down