Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/.idea
/target
/benchmarks/data/
testdata/tpch/data/
testdata/tpch/data/
*/**/*.pending-snap
5 changes: 5 additions & 0 deletions src/execution_plans/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ impl MetricsWrapperExec {
children: None,
}
}

/// Returns the inner execution plan.
pub(crate) fn get_inner(&self) -> &Arc<dyn ExecutionPlan> {
&self.inner
}
}

/// MetricsWrapperExec is invisible during display.
Expand Down
2 changes: 1 addition & 1 deletion src/execution_plans/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ pub use network_shuffle::{NetworkShuffleExec, NetworkShuffleReadyExec};
pub use partition_isolator::PartitionIsolatorExec;
pub(crate) use stage::InputStage;
pub use stage::display_plan_graphviz;
pub use stage::{DistributedTaskContext, ExecutionTask, StageExec};
pub use stage::{DisplayCtx, DistributedTaskContext, ExecutionTask, StageExec};
255 changes: 220 additions & 35 deletions src/execution_plans/stage.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use crate::channel_resolver_ext::get_distributed_channel_resolver;
use crate::execution_plans::MetricsWrapperExec;
use crate::execution_plans::NetworkCoalesceExec;
use crate::metrics::TaskMetricsRewriter;
use crate::{ChannelResolver, NetworkShuffleExec, PartitionIsolatorExec};
use datafusion::common::{exec_err, internal_datafusion_err, internal_err, plan_err};
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, displayable,
};
Expand Down Expand Up @@ -91,6 +94,8 @@ pub struct StageExec {
pub tasks: Vec<ExecutionTask>,
/// tree depth of our location in the stage tree, used for display only
pub depth: usize,
/// Optional extra information used at display time.
pub display_ctx: Option<DisplayCtx>,
}

/// A [StageExec] that is the input of another [StageExec].
Expand Down Expand Up @@ -192,6 +197,7 @@ impl StageExec {
.collect(),
tasks: vec![ExecutionTask { url: None }; n_tasks],
depth: 0,
display_ctx: None,
}
}

Expand Down Expand Up @@ -239,6 +245,7 @@ impl StageExec {
inputs: assigned_input_stages,
tasks: assigned_tasks,
depth: self.depth,
display_ctx: self.display_ctx.clone(),
};

Ok(assigned_stage)
Expand Down Expand Up @@ -292,9 +299,31 @@ impl ExecutionPlan for StageExec {

fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
plan_err!("with_new_children() not supported for StageExec")
let num_children = children.len();
let child_stage_execs = children
.into_iter()
.filter(|child| child.as_any().downcast_ref::<StageExec>().is_some())
.map(|child| child.as_any().downcast_ref::<StageExec>().unwrap().clone())
.collect::<Vec<_>>();
if child_stage_execs.len() != num_children {
return plan_err!("The children of a StageExec must all be StageExec");
}
let stage = StageExec {
query_id: self.query_id,
num: self.num,
name: self.name.clone(),
plan: self.plan.clone(),
inputs: child_stage_execs
.into_iter()
.map(|s| InputStage::Decoded(Arc::new(s)))
.collect(),
tasks: self.tasks.clone(),
depth: self.depth,
display_ctx: self.display_ctx.clone(),
};
Ok(Arc::new(stage))
}

fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
Expand Down Expand Up @@ -344,9 +373,13 @@ impl ExecutionPlan for StageExec {
}
}

use crate::metrics::proto::MetricsSetProto;
use crate::protobuf::StageKey;
use bytes::Bytes;
use datafusion::common::HashMap;
use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion::physical_expr::Partitioning;

/// Be able to display a nice tree for stages.
///
/// The challenge to doing this at the moment is that `TreeRenderVistor`
Expand All @@ -367,9 +400,44 @@ const LDCORNER: &str = "└"; // Left bottom corner
const VERTICAL: &str = "│"; // Vertical line
const HORIZONTAL: &str = "─"; // Horizontal line

// Context used to display a StageExec, tasks, and plans.
#[derive(Debug, Clone)]
pub struct DisplayCtx {
metrics: Arc<HashMap<StageKey, Vec<MetricsSetProto>>>,
}

impl DisplayCtx {
pub fn new(metrics: HashMap<StageKey, Vec<MetricsSetProto>>) -> Self {
Self {
metrics: Arc::new(metrics),
}
}
}

/// TaskFmt is used to control how tasks are displayed when displaying a [StageExec]'s inner plan.
#[derive(Clone, Copy)]
enum TaskFmt {
// Display all tasks
All,
// Display only the task with the given task_id
TaskID { task_id: usize },
}

impl StageExec {
fn format(&self, plan: &dyn ExecutionPlan, indent: usize, f: &mut String) -> std::fmt::Result {
let mut node_str = displayable(plan).one_line().to_string();
fn format(
&self,
plan: &dyn ExecutionPlan,
indent: usize,
task_fmt: TaskFmt,
f: &mut String,
) -> std::fmt::Result {
// If metrics are available, then display the plan with metrics.
let mut node_str = match &self.display_ctx {
None => displayable(plan).one_line().to_string(),
Some(_) => DisplayableExecutionPlan::with_metrics(plan)
.one_line()
.to_string(),
};
node_str.pop();
write!(f, "{} {node_str}", " ".repeat(indent))?;

Expand Down Expand Up @@ -406,17 +474,29 @@ impl StageExec {
)?;
}

if let Some(isolator) = plan.as_any().downcast_ref::<PartitionIsolatorExec>() {
write!(
f,
" {}",
format_tasks_for_partition_isolator(isolator, &self.tasks)
)?;
let mut maybe_partition_isolator = plan;
// It's possible that the plan node is wrapped in a MetricsWrapperExec for displaying purposes.
// Check if that's the case.
if let Some(wrapper) = plan.as_any().downcast_ref::<MetricsWrapperExec>() {
maybe_partition_isolator = wrapper.get_inner().as_ref();
}

if let Some(isolator) = maybe_partition_isolator
.as_any()
.downcast_ref::<PartitionIsolatorExec>()
{
let task_info = match task_fmt {
TaskFmt::All => format_tasks_for_partition_isolator(isolator, &self.tasks),
TaskFmt::TaskID { task_id } => {
format_task_for_partition_isolator(isolator, task_id, self.tasks.len())
}
};
write!(f, " {}", task_info)?;
}
writeln!(f)?;

for child in plan.children() {
self.format(child.as_ref(), indent + 2, f)?;
self.format(child.as_ref(), indent + 2, task_fmt, f)?;
}
Ok(())
}
Expand All @@ -430,32 +510,99 @@ impl DisplayAs for StageExec {
write!(f, "{}", self.name)
}
DisplayFormatType::Verbose => {
writeln!(
f,
"{}{} {} {}",
LTCORNER,
HORIZONTAL.repeat(5),
self.name,
format_tasks_for_stage(self.tasks.len(), &self.plan)
)?;
match &self.display_ctx {
None => {
writeln!(
f,
"{}{} {} {}",
LTCORNER,
HORIZONTAL.repeat(5),
self.name,
format_tasks_for_stage(self.tasks.len(), &self.plan)
)?;

let mut plan_str = String::new();
self.format(self.plan.as_ref(), 0, &mut plan_str)?;
let plan_str = plan_str
.split('\n')
.filter(|v| !v.is_empty())
.collect::<Vec<_>>()
.join(&format!("\n{}{}", " ".repeat(self.depth), VERTICAL));
writeln!(f, "{}{}{}", " ".repeat(self.depth), VERTICAL, plan_str)?;
write!(
f,
"{}{}{}",
" ".repeat(self.depth),
LDCORNER,
HORIZONTAL.repeat(50)
)?;
let mut plan_str = String::new();
self.format(self.plan.as_ref(), 0, TaskFmt::All, &mut plan_str)?;
let plan_str = plan_str
.split('\n')
.filter(|v| !v.is_empty())
.collect::<Vec<_>>()
.join(&format!("\n{}{}", " ".repeat(self.depth), VERTICAL));
writeln!(f, "{}{}{}", " ".repeat(self.depth), VERTICAL, plan_str)?;
// Add bottom border
write!(
f,
"{}{}{}",
" ".repeat(self.depth),
LDCORNER,
HORIZONTAL.repeat(50)
)?;

Ok(())
Ok(())
}
Some(display_ctx) => {
// If metrics are available, always display each task separately.
for (i, _) in self.tasks.iter().enumerate() {
let mut extra_spacing = "".to_string();
if i > 0 {
// Add newline for each task
writeln!(f)?;
// with_indent() in DisplayableExectutionPlan will not add indentation for tasks, so we add it manually.
extra_spacing = " ".repeat(self.depth);
}
writeln!(
f,
"{}{}{}{} {}",
extra_spacing,
LTCORNER,
HORIZONTAL.repeat(5),
format!(" {} ", self.name),
format_task_for_stage(i, &self.plan),
)?;
// Uniquely identify the task.
let key = StageKey {
query_id: self.query_id.to_string(),
stage_id: self.num as u64,
task_number: i as u64,
};

let mut plan_str = String::new();
let plan = match display_ctx.metrics.get(&key) {
Some(metrics) => {
let result = TaskMetricsRewriter::new(metrics.to_owned())
.enrich_task_with_metrics(self.plan.clone());
if let Err(e) = result {
write!(f, "Error enriching task with metrics: {}", e)?;
return Err(std::fmt::Error);
}
result.unwrap()
}
None => self.plan.clone(),
};
self.format(
plan.as_ref(),
0,
TaskFmt::TaskID { task_id: i },
&mut plan_str,
)?;
let plan_str = plan_str
.split('\n')
.filter(|v| !v.is_empty())
.collect::<Vec<_>>()
.join(&format!("\n{}{}", " ".repeat(self.depth), VERTICAL));
writeln!(f, "{}{}{}", " ".repeat(self.depth), VERTICAL, plan_str)?;
// Add bottom border
write!(
f,
"{}{}{}",
" ".repeat(self.depth),
LDCORNER,
HORIZONTAL.repeat(50)
)?;
}
Ok(())
}
}
}
DisplayFormatType::TreeRender => write!(
f,
Expand Down Expand Up @@ -483,6 +630,22 @@ fn format_tasks_for_stage(n_tasks: usize, head: &Arc<dyn ExecutionPlan>) -> Stri
result
}

fn format_task_for_stage(task_number: usize, head: &Arc<dyn ExecutionPlan>) -> String {
let partitioning = head.properties().output_partitioning();
let input_partitions = partitioning.partition_count();
let hash_shuffle = matches!(partitioning, Partitioning::Hash(_, _));
let off = task_number * if hash_shuffle { 0 } else { input_partitions };

let mut result = "Task ".to_string();
result += &format!("t{task_number}:[");
result += &(off..(off + input_partitions))
.map(|v| format!("p{v}"))
.join(",");
result += "] ";

result
}

fn format_tasks_for_partition_isolator(
isolator: &PartitionIsolatorExec,
tasks: &[ExecutionTask],
Expand All @@ -506,6 +669,28 @@ fn format_tasks_for_partition_isolator(
result
}

fn format_task_for_partition_isolator(
isolator: &PartitionIsolatorExec,
task_number: usize,
num_tasks: usize,
) -> String {
let input_partitions = isolator.input().output_partitioning().partition_count();
let partition_groups = PartitionIsolatorExec::partition_groups(input_partitions, num_tasks);

let n: usize = partition_groups.iter().map(|v| v.len()).sum();
let mut partitions = vec!["__".to_string(); n];

let mut result = "Task ".to_string();
partition_groups
.get(task_number)
.unwrap()
.iter()
.enumerate()
.for_each(|(j, p)| partitions[*p] = format!("p{j}"));
result += &format!("t{task_number}:[{}] ", partitions.join(","));
result
}

// num_colors must agree with the colorscheme selected from
// https://graphviz.org/doc/info/colors.html
const NUM_COLORS: usize = 6;
Expand Down
Loading