-
Notifications
You must be signed in to change notification settings - Fork 1.6k
feat: Add optional extended metrics to sort_batch function #17147
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
FYI @ding-young and @2010YOUY01 @rluvaton could you also help review this PR? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a beneficial change. Having detailed sort metrics is valuable, and I think the direction of eventually extending this to cover sort-preserving merge and multi-level merge makes a lot of sense.
One small concern I have is that, unlike aggregation where metrics are recorded for a single large batch, sorting often involves many small batches, each of which would now report timing to LexSortMetrics
. Since much of the work in the current sort implementation happens in the merge phase rather than sort_batch
, I’m a bit concerned that the relative cost of metric reporting within sort_batch
might become non-trivial in practice.
Have you had a chance to measure or evaluate the overhead introduced by this change? If so, I’d be very interested in seeing the results.
|
||
/// Metrics for sorting in the spill manager | ||
lexsort_metrics: LexSortMetrics, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current comment is fine, but perhaps we could clarify that this only applies to sort_batch
, not SortPreservingMerge
via StreamingMergeBuilder
, since "sorting in the spill manager" could imply both.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated^
I think the extended metrics in this PR are too fine-grained, since we are rarely checking them, and also it's possible to measure those metrics through the flamegraph (https://datafusion.apache.org/library-user-guide/profiling.html), it might not worth to implement them as metrics. However, for certain metrics that are not possible to obtain from flamegraphs (such as, within a single in-memory sort, the average number of batches being handled at a time; or the number of merge levels), it would be a good idea to include them in the metrics. |
When used in distributed compute environments(such as when using DataFusion via Comet, which is where this arose), it can get very unwieldy to use flamegraph, and I also don't always have control over how the executable was launched. |
@alamb sure
I think so too.
Aggregation batches and sort batches should be the same size ideally, what do you mean single large batch?
I did not know that actually and these metrics would have shown me. also, it is depend on many things in order for it to be the case:
@2010YOUY01 metrics are made for production, flamegraph is where you actually can run it to reproduce. having this in metrics would actually benefit visibility. the only problem I have is the naming of the metrics, take to indices and lexsort are implementation detail, maybe we can update the naming to be better understood by people who are not aware of the internals like calculating sort indices for a single batch and sorting which is the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a valuable change
#[derive(Clone, Debug)] | ||
pub struct LexSortMetrics { | ||
pub time_calculating_lexsort_indices: Time, | ||
|
||
pub time_taking_indices_in_lexsort: Time, | ||
} | ||
|
||
impl LexSortMetrics { | ||
pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { | ||
Self { | ||
time_calculating_lexsort_indices: MetricBuilder::new(metrics) | ||
.subset_time("time_calculating_lexsort_indices", partition), | ||
time_taking_indices_in_lexsort: MetricBuilder::new(metrics) | ||
.subset_time("time_taking_indices_in_lexsort", partition), | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the naming for this metrics are implementation details, maybe rename this to help the user to understand what each metrics is for? like finding sort indices and coping the data
If it's possible to use those metrics to find a better Comet tuning, I think including them makes sense. I was imaging those metrics look like something DF internal developer would care about, that are checked to optimize |
Which issue does this PR close?
Rationale for this change
It makes accurate benchmarking much easier.
I just added a new metrics struct for the lex sort done on the sort columns, which contains the column evaluation, sort time on the indices, and how long the
take
kernel itself took.Are there any user-facing changes?
the sort_batch itself function now takes another parameter, which is an Option, so yes any user using that function will have to either provide the metrics struct or just specify None, actual calls to this function within the crates are internal iiuc