-
Notifications
You must be signed in to change notification settings - Fork 25
add first_message_network_latency metric for network* nodes
#322
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
| let bytes_transferred = MetricBuilder::new(metrics).global_counter("bytes_transferred"); | ||
| // Track the time between calling do_get on the client and recieving the first message | ||
| // from any partition. | ||
| let time_to_first_byte = Time::new(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 Do you think we could get any meaningful conclusion from this metric?
What I imagine is that we are going to be measuring details about how the plan is formed, like for example, in this plan:
┌───── DistributedExec ── Tasks: t0:[p0]
│ CoalescePartitionsExec
│ [Stage 2] => NetworkCoalesceExec: output_partitions=9, input_tasks=3
└──────────────────────────────────────────────────
┌───── Stage 2 ── Tasks: t0:[p0..p2] t1:[p0..p2] t2:[p0..p2]
│ ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday]
│ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
│ [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=4
└──────────────────────────────────────────────────
┌───── Stage 1 ── Tasks: t0:[p0..p8] t1:[p0..p8] t2:[p0..p8] t3:[p0..p8]
│ RepartitionExec: partitioning=Hash([RainToday@0], 9), input_partitions=1
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
│ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0] t3:[__,__,__]
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
└──────────────────────────────────────────────────
We can expect the network coalesce above to not receive anything until the query is pretty much finished. So if the query takes 10s, you would probably see something like time_to_first_byte=9.8 s which tells you something about the plan topology, but it does not tell you much about the networking details of the NetworkCoalesceExec
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More or less the same goes for NetworkShuffleExec, the time_to_first_byte would bake information irrelevant to the networking which is the amount of time it takes DataSourceExec>PartitionIsolatorExec>AggregateExec(partial)>RepartitionExec+ network latency to produce something, which I don't think it tells you much about how well the NetworkShuffleExec network transfer is going.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack. I've updated the metric to be the delta between sending and recieving.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's still a bit of a weird thing that happens.
The two tasks sett 7.82ms and 7.67ms respectively.
[Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3, metrics=[elapsed_compute_0=742.58µs, elapsed_compute_1=835.25µs, bytes_transferred_0=16.47 K, bytes_transferre
d_1=10.60 K, max_mem_used_0=3.44 K, max_mem_used_1=3.45 K, first_message_network_latency_0=7.82ms, first_message_network_latency_1=7.67ms]
Time metrics are aggregated by summing:
[Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3, metrics=[elapsed_compute=1.66ms, bytes_transferred=27.07 K, max_mem_used=6.88 K, first_message_network_latency
=15.57ms]
In the aggregated form, the metric makes less sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right... I imagine time metrics where designed with "elapsed compute" in mind, which makes sense to aggregate by summing, but when measuring latencies we probably would prefer AVG.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at MetricValue::aggregate upstream the aggregation functions seem to be all over the place...
1813629 to
1816afa
Compare
time_to_first_byte metric for network* nodesfirst_message_network_latency metric for network* nodes
| SystemTime::now() | ||
| .duration_since(UNIX_EPOCH) | ||
| .map(|duration| duration.as_millis() as u64) | ||
| .unwrap_or(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't seem worth erroring here so...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this should be completely fine.
| │ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday, WindGustDir@1 as WindGustDir], aggr=[] | ||
| │ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | ||
| │ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0] | ||
| │ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder why insta decides randomly that it's a good idea to change the formatting of the snapshots. I think this happens to all of us.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is from my other PR where I removed the trailing whitespace. I changed the base of this PR so these diffs wouldn't appear, but it seems the base didn't change. Will fix that.
| SystemTime::now() | ||
| .duration_since(UNIX_EPOCH) | ||
| .map(|duration| duration.as_millis() as u64) | ||
| .unwrap_or(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this should be completely fine.
| let sent_time = | ||
| UNIX_EPOCH + Duration::from_millis(flight_metadata.created_timestamp_unix_millis); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have though about this for a while in the past, and there's something to take into account.
This metric will be reliable as long as the two machines involved in the communication have the system clock perfectly in sync. A small clock skew of some milliseconds would affect this measurement.
I have no idea how much is typically the clock skew in different machines, maybe it's negligible if we are performing measurements in the order of milliseconds?
I think this metric is too good to not have it, I think it might be still worth a try even if clock skew affects measurement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be cool to measure this in the benchmarking cluster
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tbh I thought the clock skew wouldn't matter too much for simple timing metrics. We could look into NTP or something but I feel like that's for more hardcore use cases like transaction timestamps. Ex. https://www.cockroachlabs.com/blog/living-without-atomic-clocks/
| let value = | ||
| node_metrics::<NetworkCoalesceExec>(&d_physical, "first_message_network_latency", 1); | ||
| assert!(value > 0); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would even be interesting to measure more than just the first one. Maybe this can unveil things like latency degradation as the query progresses?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Any ideas for how to display the metric for N messages?
Maybe first and last makes sense?
This change adds a `first_message_network_latency` metric that measures the time between creating a message on a worker to the client recieving it. This is only measured for the first message recieved by a task (in any partition). I considered tracking it for all partitions (using labels for the partition id), however, `Time` metrics are aggregated by summation, not average, so this would not work. We would need a custom metric for an avg time aggregator and these are unsupported. Example plan: ``` ┌───── DistributedExec ── Tasks: t0:[p0] │ CoalescePartitionsExec, metrics=[output_rows=2, elapsed_compute=16.58µs, output_bytes=256.0 KB, output_batches=2] │ HashJoinExec: mode=CollectLeft, join_type=Left, on=[(RainTomorrow@1, RainTomorrow@1)], projection=[MinTemp@0, MaxTemp@2], metrics=[output_rows=2, elapsed_compute=166.58µs, output_by tes=256.0 KB, output_batches=2, build_input_batches=2, build_input_rows=2, input_batches=3, input_rows=2, build_mem_used=731, build_time=85.54µs, join_time=55.96µs, avg_fanout=100% (2/2 ), probe_hit_rate=100% (2/2)] │ CoalescePartitionsExec, metrics=[output_rows=2, elapsed_compute=6.25µs, output_bytes=640.0 B, output_batches=2] │ [Stage 2] => NetworkCoalesceExec: output_partitions=6, input_tasks=2, metrics=[elapsed_compute=444.37µs, bytes_transferred=23.05 K, max_mem_used=616, first_message_network_laten cy=2.96ms] │ ProjectionExec: expr=[avg(weather.MaxTemp)@1 as MaxTemp, RainTomorrow@0 as RainTomorrow], metrics=[output_rows=2, elapsed_compute=3.04µs, output_bytes=32.1 KB, output_batches=2, e xpr_0_eval_time=376ns, expr_1_eval_time=167ns] │ AggregateExec: mode=FinalPartitioned, gby=[RainTomorrow@0 as RainTomorrow], aggr=[avg(weather.MaxTemp)], metrics=[output_rows=2, elapsed_compute=107.00µs, output_bytes=32.1 KB, output_batches=2, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, peak_mem_used=50.57 K, aggregate_arguments_time=6.04µs, aggregation_time=6.29µs, emitting_time=6.75µs, time_calcula ting_group_ids=12.29µs] │ [Stage 3] => NetworkShuffleExec: output_partitions=3, input_tasks=3, metrics=[elapsed_compute=1.36ms, bytes_transferred=23.21 K, max_mem_used=4.71 K, first_message_network_lat ency=7.30ms] └────────────────────────────────────────────────── ┌───── Stage 2 ── Tasks: t0:[p0..p2] t1:[p0..p2] │ ProjectionExec: expr=[avg(weather.MinTemp)@1 as MinTemp, RainTomorrow@0 as RainTomorrow], metrics=[output_rows=2, elapsed_compute=3.71µs, output_bytes=32.1 KB, output_batches=2, exp r_0_eval_time=379ns, expr_1_eval_time=213ns] │ AggregateExec: mode=FinalPartitioned, gby=[RainTomorrow@0 as RainTomorrow], aggr=[avg(weather.MinTemp)], metrics=[output_rows=2, elapsed_compute=141.42µs, output_bytes=32.1 KB, ou tput_batches=2, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, peak_mem_used=100.9 K, aggregate_arguments_time=7.63µs, aggregation_time=10.51µs, emitting_time=12.25µs, time_calcula ting_group_ids=11.25µs] │ [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3, metrics=[elapsed_compute=1.51ms, bytes_transferred=27.07 K, max_mem_used=7.61 K, first_message_network_laten cy=12.17ms] └────────────────────────────────────────────────── ┌───── Stage 1 ── Tasks: t0:[p0..p5] t1:[p0..p5] t2:[p0..p5] │ RepartitionExec: partitioning=Hash([RainTomorrow@0], 6), input_partitions=1, metrics=[] │ AggregateExec: mode=Partial, gby=[RainTomorrow@1 as RainTomorrow], aggr=[avg(weather.MinTemp)], metrics=[] │ FilterExec: RainToday@1 = Yes, projection=[MinTemp@0, RainTomorrow@2], metrics=[] │ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0], metrics=[] │ DataSourceExec: file_groups={3 groups: [[Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000000.parquet], [Users/jayant.shrivastava/code/dataf usion-distributed/testdata/weather/result-000001.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToda y, RainTomorrow], file_type=parquet, predicate=RainToday@19 = Yes, pruning_predicate=RainToday_null_count@2 != row_count@3 AND RainToday_min@0 <= Yes AND Yes <= RainToday_max@1, require d_guarantees=[RainToday in (Yes)], metrics=[] └────────────────────────────────────────────────── ┌───── Stage 3 ── Tasks: t0:[p0..p2] t1:[p0..p2] t2:[p0..p2] │ RepartitionExec: partitioning=Hash([RainTomorrow@0], 3), input_partitions=1, metrics=[output_rows=6, elapsed_compute=46.38µs, output_bytes=1536.0 KB, output_batches=6, spill_count=0 , spilled_bytes=0.0 B, spilled_rows=0, fetch_time=14.54ms, repartition_time=38.58µs, send_time=11.34µs] │ AggregateExec: mode=Partial, gby=[RainTomorrow@1 as RainTomorrow], aggr=[avg(weather.MaxTemp)], metrics=[output_rows=6, elapsed_compute=175.88µs, output_bytes=48.2 KB, output_batc hes=3, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, skipped_aggregation_rows=0, peak_mem_used=55.94 K, aggregate_arguments_time=4.83µs, aggregation_time=17.29µs, emitting_time=13 .42µs, time_calculating_group_ids=68.54µs, reduction_factor=2% (6/300)] │ FilterExec: RainToday@1 = No, projection=[MaxTemp@0, RainTomorrow@2], metrics=[output_rows=300, elapsed_compute=144.58µs, output_bytes=576.0 KB, output_batches=3, selectivity=82 % (300/366)] │ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0], metrics=[] │ DataSourceExec: file_groups={3 groups: [[Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000000.parquet], [Users/jayant.shrivastava/code/datafus ion-distributed/testdata/weather/result-000001.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday, RainTomorrow], file_type=parquet, predicate=RainToday@19 = No, pruning_predicate=RainToday_null_count@2 != row_count@3 AND RainToday_min@0 <= No AND No <= RainToday_max@1, required_gua rantees=[RainToday in (No)], metrics=[output_rows=366, elapsed_compute=3ns, output_bytes=19.4 KB, output_batches=3, files_ranges_pruned_statistics=3 total → 3 matched, row_groups_pruned _statistics=3 total → 3 matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_rows_pruned=366 total → 366 matched, batches_split=0, bytes_scanned=40.37 K, file_open_er rors=0, file_scan_errors=0, num_predicate_creation_errors=0, predicate_cache_inner_records=0, predicate_cache_records=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown _rows_pruned=0, bloom_filter_eval_time=73.17µs, metadata_load_time=7.64ms, page_index_eval_time=125.83µs, row_pushdown_eval_time=6ns, statistics_eval_time=87.30µs, time_elapsed_opening= 8.56ms, time_elapsed_processing=4.19ms, time_elapsed_scanning_total=5.69ms, time_elapsed_scanning_until_data=5.40ms, scan_efficiency_ratio=29% (40.37 K/137.4 K)] └────────────────────────────────────────────────── ```
72cfc87 to
b0fc783
Compare
This change adds a
first_message_network_latencymetric that measures the time between creating a message ona worker to the client recieving it. This is only measured for the first message recieved by a task (in any partition).
I considered tracking it for all partitions (using labels for the partition id), however,
Timemetrics are aggregatedby summation, not average, so this would not work. We would need a custom metric for an avg time aggregator and these
are unsupported.
Example plan: