Skip to content

Commit 53d2057

Browse files
wip
1 parent 0d86f65 commit 53d2057

File tree

9 files changed

+3124
-749
lines changed

9 files changed

+3124
-749
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,3 +64,4 @@ parquet = "55.2.0"
6464
arrow = "55.2.0"
6565
tokio-stream = "0.1.17"
6666
hyper-util = "0.1.16"
67+
regex = "1.0"

src/execution_plans/metrics.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ impl MetricsWrapperExec {
2727
children: None,
2828
}
2929
}
30+
31+
pub(crate) fn get_inner(&self) -> &Arc<dyn ExecutionPlan> {
32+
&self.inner
33+
}
3034
}
3135

3236
/// MetricsWrapperExec is invisible during display.

src/execution_plans/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,4 @@ pub use network_shuffle::{NetworkShuffleExec, NetworkShuffleReadyExec};
1010
pub use partition_isolator::PartitionIsolatorExec;
1111
pub(crate) use stage::InputStage;
1212
pub use stage::display_plan_graphviz;
13-
pub use stage::{DistributedTaskContext, ExecutionTask, StageExec, DisplayCtx};
13+
pub use stage::{DisplayCtx, DistributedTaskContext, ExecutionTask, StageExec};

src/execution_plans/stage.rs

Lines changed: 92 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::channel_resolver_ext::get_distributed_channel_resolver;
2+
use crate::execution_plans::MetricsWrapperExec;
23
use crate::execution_plans::NetworkCoalesceExec;
34
use crate::metrics::TaskMetricsRewriter;
45
use crate::{ChannelResolver, NetworkShuffleExec, PartitionIsolatorExec};
@@ -10,7 +11,6 @@ use datafusion::physical_plan::{
1011
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, displayable,
1112
};
1213
use datafusion::prelude::SessionContext;
13-
use datafusion::sql::sqlparser::keywords::CHAIN;
1414
use itertools::Itertools;
1515
use rand::Rng;
1616
use std::collections::VecDeque;
@@ -303,18 +303,22 @@ impl ExecutionPlan for StageExec {
303303
) -> Result<Arc<dyn ExecutionPlan>> {
304304
let num_children = children.len();
305305
let child_stage_execs = children
306-
.into_iter()
307-
.filter(|child| child.as_any().downcast_ref::<StageExec>().is_some())
308-
.map(|child| child.as_any().downcast_ref::<StageExec>().unwrap().clone()).collect::<Vec<_>>();
306+
.into_iter()
307+
.filter(|child| child.as_any().downcast_ref::<StageExec>().is_some())
308+
.map(|child| child.as_any().downcast_ref::<StageExec>().unwrap().clone())
309+
.collect::<Vec<_>>();
309310
if child_stage_execs.len() != num_children {
310311
return plan_err!("not all children are StageExec");
311312
}
312-
let stage = StageExec{
313-
query_id: self.query_id.clone(),
313+
let stage = StageExec {
314+
query_id: self.query_id,
314315
num: self.num,
315316
name: self.name.clone(),
316317
plan: self.plan.clone(),
317-
inputs: child_stage_execs.into_iter().map(|s| InputStage::Decoded(Arc::new(s))).collect(),
318+
inputs: child_stage_execs
319+
.into_iter()
320+
.map(|s| InputStage::Decoded(Arc::new(s)))
321+
.collect(),
318322
tasks: self.tasks.clone(),
319323
depth: self.depth,
320324
display_ctx: self.display_ctx.clone(),
@@ -369,12 +373,12 @@ impl ExecutionPlan for StageExec {
369373
}
370374
}
371375

376+
use crate::metrics::proto::MetricsSetProto;
377+
use crate::protobuf::StageKey;
372378
use bytes::Bytes;
379+
use datafusion::common::HashMap;
373380
use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion};
374381
use datafusion::physical_expr::Partitioning;
375-
use datafusion::common::HashMap;
376-
use crate::metrics::proto::MetricsSetProto;
377-
use crate::protobuf::StageKey;
378382

379383
/// Be able to display a nice tree for stages.
380384
///
@@ -410,14 +414,25 @@ impl DisplayCtx {
410414
}
411415
}
412416

417+
#[derive(Clone, Copy)]
418+
enum TaskFmt {
419+
All,
420+
TaskID { task_id: usize },
421+
}
422+
413423
impl StageExec {
414-
fn format(&self, plan: &dyn ExecutionPlan, indent: usize, f: &mut String) -> std::fmt::Result {
415-
// println!("plan {:?}", plan);
424+
fn format(
425+
&self,
426+
plan: &dyn ExecutionPlan,
427+
indent: usize,
428+
task_fmt: TaskFmt,
429+
f: &mut String,
430+
) -> std::fmt::Result {
416431
let mut node_str = match &self.display_ctx {
417432
None => displayable(plan).one_line().to_string(),
418-
Some(_) => {
419-
DisplayableExecutionPlan::with_metrics(plan).one_line().to_string()
420-
}
433+
Some(_) => DisplayableExecutionPlan::with_metrics(plan)
434+
.one_line()
435+
.to_string(),
421436
};
422437
node_str.pop();
423438
write!(f, "{} {node_str}", " ".repeat(indent))?;
@@ -455,17 +470,29 @@ impl StageExec {
455470
)?;
456471
}
457472

458-
if let Some(isolator) = plan.as_any().downcast_ref::<PartitionIsolatorExec>() {
459-
write!(
460-
f,
461-
" {}",
462-
format_tasks_for_partition_isolator(isolator, &self.tasks)
463-
)?;
473+
let mut maybe_partition_isolator = plan;
474+
if self.display_ctx.is_some() {
475+
if let Some(wrapper) = plan.as_any().downcast_ref::<MetricsWrapperExec>() {
476+
maybe_partition_isolator = wrapper.get_inner().as_ref();
477+
}
478+
}
479+
480+
if let Some(isolator) = maybe_partition_isolator
481+
.as_any()
482+
.downcast_ref::<PartitionIsolatorExec>()
483+
{
484+
let task_info = match task_fmt {
485+
TaskFmt::All => format_tasks_for_partition_isolator(isolator, &self.tasks),
486+
TaskFmt::TaskID { task_id } => {
487+
format_task_for_partition_isolator(isolator, task_id, self.tasks.len())
488+
}
489+
};
490+
write!(f, " {}", task_info)?;
464491
}
465492
writeln!(f)?;
466493

467494
for child in plan.children() {
468-
self.format(child.as_ref(), indent + 2, f)?;
495+
self.format(child.as_ref(), indent + 2, task_fmt, f)?;
469496
}
470497
Ok(())
471498
}
@@ -491,7 +518,7 @@ impl DisplayAs for StageExec {
491518
)?;
492519

493520
let mut plan_str = String::new();
494-
self.format(self.plan.as_ref(), 0, &mut plan_str)?;
521+
self.format(self.plan.as_ref(), 0, TaskFmt::All, &mut plan_str)?;
495522
let plan_str = plan_str
496523
.split('\n')
497524
.filter(|v| !v.is_empty())
@@ -511,7 +538,7 @@ impl DisplayAs for StageExec {
511538
}
512539
Some(display_ctx) => {
513540
for (i, _) in self.tasks.iter().enumerate() {
514-
let mut extra_spacing = "".to_string();
541+
let mut extra_spacing = "".to_string();
515542
if i > 0 {
516543
writeln!(f)?; // Add newline for each task
517544
extra_spacing = " ".repeat(self.depth); // with_indent() in DisplayableExectutionPlan will not add indentation for tasks, so we add it manually.
@@ -535,34 +562,38 @@ impl DisplayAs for StageExec {
535562
let mut plan_str = String::new();
536563
let plan = match display_ctx.metrics.get(&key) {
537564
Some(metrics) => {
538-
let result = TaskMetricsRewriter::new(metrics.to_owned()).enrich_task_with_metrics(self.plan.clone());
565+
let result = TaskMetricsRewriter::new(metrics.to_owned())
566+
.enrich_task_with_metrics(self.plan.clone());
539567
if let Err(e) = result {
540568
write!(f, "Error enriching task with metrics: {}", e)?;
541569
return Err(std::fmt::Error);
542570
}
543571
result.unwrap()
544572
}
545-
None => {
546-
self.plan.clone()
547-
}
573+
None => self.plan.clone(),
548574
};
549-
self.format(plan.as_ref(), 0, &mut plan_str)?;
550-
let plan_str = plan_str
551-
.split('\n')
552-
.filter(|v| !v.is_empty())
553-
.collect::<Vec<_>>()
554-
.join(&format!("\n{}{}", " ".repeat(self.depth), VERTICAL));
555-
writeln!(f, "{}{}{}", " ".repeat(self.depth), VERTICAL, plan_str)?;
556-
// Add bottom border
575+
self.format(
576+
plan.as_ref(),
577+
0,
578+
TaskFmt::TaskID { task_id: i },
579+
&mut plan_str,
580+
)?;
581+
let plan_str = plan_str
582+
.split('\n')
583+
.filter(|v| !v.is_empty())
584+
.collect::<Vec<_>>()
585+
.join(&format!("\n{}{}", " ".repeat(self.depth), VERTICAL));
586+
writeln!(f, "{}{}{}", " ".repeat(self.depth), VERTICAL, plan_str)?;
587+
// Add bottom border
557588
write!(
558589
f,
559590
"{}{}{}",
560591
" ".repeat(self.depth),
561592
LDCORNER,
562593
HORIZONTAL.repeat(50)
563-
)?;
594+
)?;
564595
}
565-
return Ok(());
596+
Ok(())
566597
}
567598
}
568599
}
@@ -604,7 +635,7 @@ fn format_task_for_stage(task_number: usize, head: &Arc<dyn ExecutionPlan>) -> S
604635
.map(|v| format!("p{v}"))
605636
.join(",");
606637
result += "] ";
607-
638+
608639
result
609640
}
610641

@@ -631,6 +662,28 @@ fn format_tasks_for_partition_isolator(
631662
result
632663
}
633664

665+
fn format_task_for_partition_isolator(
666+
isolator: &PartitionIsolatorExec,
667+
task_number: usize,
668+
num_tasks: usize,
669+
) -> String {
670+
let input_partitions = isolator.input().output_partitioning().partition_count();
671+
let partition_groups = PartitionIsolatorExec::partition_groups(input_partitions, num_tasks);
672+
673+
let n: usize = partition_groups.iter().map(|v| v.len()).sum();
674+
let mut partitions = vec!["__".to_string(); n];
675+
676+
let mut result = "Task ".to_string();
677+
partition_groups
678+
.get(task_number)
679+
.unwrap()
680+
.iter()
681+
.enumerate()
682+
.for_each(|(j, p)| partitions[*p] = format!("p{j}"));
683+
result += &format!("t{task_number}:[{}] ", partitions.join(","));
684+
result
685+
}
686+
634687
// num_colors must agree with the colorscheme selected from
635688
// https://graphviz.org/doc/info/colors.html
636689
const NUM_COLORS: usize = 6;

src/explain.rs

Lines changed: 33 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
1-
use datafusion::physical_plan::display::DisplayableExecutionPlan;
21
use crate::execution_plans::{DisplayCtx, StageExec};
3-
use crate::metrics::proto::df_metrics_set_to_proto;
4-
use crate::protobuf::StageKey;
5-
use std::sync::Arc;
6-
use datafusion::physical_plan::ExecutionPlan;
7-
use crate::metrics::TaskMetricsCollector;
8-
use datafusion::error::DataFusionError;
92
use crate::metrics::MetricsCollectorResult;
3+
use crate::metrics::TaskMetricsCollector;
104
use crate::metrics::proto::MetricsSetProto;
11-
use datafusion::common::tree_node::{TreeNode, TreeNodeRewriter};
5+
use crate::metrics::proto::df_metrics_set_to_proto;
6+
use crate::protobuf::StageKey;
127
use datafusion::common::tree_node::Transformed;
138
use datafusion::common::tree_node::TreeNodeRecursion;
9+
use datafusion::common::tree_node::{TreeNode, TreeNodeRewriter};
10+
use datafusion::error::DataFusionError;
11+
use datafusion::physical_plan::ExecutionPlan;
12+
use datafusion::physical_plan::display::DisplayableExecutionPlan;
13+
use std::sync::Arc;
1414

1515
pub struct DisplayCtxReWriter {
1616
display_ctx: DisplayCtx,
@@ -40,33 +40,43 @@ impl TreeNodeRewriter for DisplayCtxReWriter {
4040
Some(stage_exec) => {
4141
let mut copy = stage_exec.clone();
4242
copy.display_ctx = Some(self.display_ctx.clone());
43-
Ok(Transformed::new(Arc::new(copy), true, TreeNodeRecursion::Continue))
44-
},
43+
Ok(Transformed::new(
44+
Arc::new(copy),
45+
true,
46+
TreeNodeRecursion::Continue,
47+
))
48+
}
4549
None => Err(DataFusionError::Internal("expected stage exec".to_string())),
4650
}
4751
}
4852
}
4953

50-
5154
pub fn explain_analyze(executed: Arc<dyn ExecutionPlan>) -> Result<String, DataFusionError> {
5255
let plan = match executed.as_any().downcast_ref::<StageExec>() {
5356
None => executed,
5457
Some(stage_exec) => {
55-
let MetricsCollectorResult{task_metrics, mut input_task_metrics} = TaskMetricsCollector::new()
56-
.collect(stage_exec.plan.clone())?;
57-
input_task_metrics.insert(StageKey{
58-
query_id: stage_exec.query_id.to_string(),
59-
stage_id: stage_exec.num as u64,
60-
task_number: 0,
61-
}, task_metrics.into_iter()
62-
.map(|metrics| df_metrics_set_to_proto(&metrics))
63-
.collect::<Result<Vec<MetricsSetProto>, DataFusionError>>()?);
58+
let MetricsCollectorResult {
59+
task_metrics,
60+
mut input_task_metrics,
61+
} = TaskMetricsCollector::new().collect(stage_exec.plan.clone())?;
62+
input_task_metrics.insert(
63+
StageKey {
64+
query_id: stage_exec.query_id.to_string(),
65+
stage_id: stage_exec.num as u64,
66+
task_number: 0,
67+
},
68+
task_metrics
69+
.into_iter()
70+
.map(|metrics| df_metrics_set_to_proto(&metrics))
71+
.collect::<Result<Vec<MetricsSetProto>, DataFusionError>>()?,
72+
);
6473

6574
let display_ctx = DisplayCtx::new(input_task_metrics);
6675
DisplayCtxReWriter::new(display_ctx).rewrite(executed.clone())?
67-
},
76+
}
6877
};
6978

70-
Ok(DisplayableExecutionPlan::new(plan.as_ref()).indent(true).to_string())
79+
Ok(DisplayableExecutionPlan::new(plan.as_ref())
80+
.indent(true)
81+
.to_string())
7182
}
72-

src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ mod config_extension_ext;
66
mod distributed_ext;
77
mod distributed_physical_optimizer_rule;
88
mod execution_plans;
9+
mod explain;
910
mod flight_service;
1011
mod metrics;
11-
mod explain;
1212

1313
mod protobuf;
1414
#[cfg(any(feature = "integration", test))]
@@ -22,9 +22,9 @@ pub use execution_plans::{
2222
DistributedTaskContext, ExecutionTask, NetworkCoalesceExec, NetworkShuffleExec,
2323
PartitionIsolatorExec, StageExec,
2424
};
25+
pub use explain::explain_analyze;
2526
pub use flight_service::{
2627
ArrowFlightEndpoint, DefaultSessionBuilder, DistributedSessionBuilder,
2728
DistributedSessionBuilderContext, MappedDistributedSessionBuilder,
2829
MappedDistributedSessionBuilderExt,
2930
};
30-
pub use explain::explain_analyze;

src/metrics/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,5 @@ pub(crate) mod proto;
33
mod task_metrics_collector;
44
mod task_metrics_rewriter;
55
pub(crate) use metrics_collecting_stream::MetricsCollectingStream;
6-
pub(crate) use task_metrics_collector::{TaskMetricsCollector, MetricsCollectorResult};
6+
pub(crate) use task_metrics_collector::{MetricsCollectorResult, TaskMetricsCollector};
77
pub(crate) use task_metrics_rewriter::TaskMetricsRewriter;
8-

0 commit comments

Comments
 (0)