Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 3 additions & 15 deletions src/execution_plans/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,11 @@ pub(crate) struct MetricsWrapperExec {
inner: Arc<dyn ExecutionPlan>,
/// metrics for this plan node.
metrics: MetricsSet,
/// children is initially None. When used by the [TaskMetricsRewriter], the children will be updated
/// to point at other wrapped nodes.
children: Option<Vec<Arc<dyn ExecutionPlan>>>,
}

impl MetricsWrapperExec {
pub(crate) fn new(inner: Arc<dyn ExecutionPlan>, metrics: MetricsSet) -> Self {
Self {
inner,
metrics,
children: None,
}
Self { inner, metrics }
}
}

Expand All @@ -53,22 +46,17 @@ impl ExecutionPlan for MetricsWrapperExec {
}
}

/// Retrusn
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
match &self.children {
Some(children) => children.iter().collect(),
None => self.inner.children(),
}
self.inner.children()
}

fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(MetricsWrapperExec {
inner: self.inner.clone(),
inner: Arc::clone(&self.inner).with_new_children(children.clone())?,
metrics: self.metrics.clone(),
children: Some(children),
Comment on lines -69 to -71
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem I found was that, when this method was called, the self.children where being updated, but not the actual children from self.inner.

This is fine as long as you access MetricsWrapperExec with the children() method, as then, the ones just inject will be returned, but it's not fine if you access the children through input_stage(), which is what was happening in display_plan_ascii.

I think it should be fine with just removing this self.children in favor of just applying the new children to self.inner directly, but I may be missing something.

WDYT @jayshrivastava?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds okay.

I forget why with_new_children is even called on MetricsWrapperExec. It means we're transforming a node after its wrapped...

I made this change intentionally though - we used to error in this method and I remember updating it so we don't error. Might be in the git blame.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, but if we want input_stage() from a network boundary to return the correct thing... we need to transform it if we are expecting it to return the wrapped children rather than the original one.

If we don't transform the original node with the new wrapped children, anytime we call input_stage() we will be returning the original children, not the wrapped ones.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In

let transformed = plan.transform_down(|plan| {
, we call transform_down, which means that if we return Transformed::yes inside the function (which we do), then DataFusion will call with_new_children under the hood pretty much on every node.

But this should be fine and expected, why would we error in this situation? I see that just removing children from MetricsWrarpperExec has no negative implications.

}))
}

Expand Down
5 changes: 5 additions & 0 deletions src/execution_plans/network_broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use dashmap::DashMap;
use datafusion::common::internal_datafusion_err;
use datafusion::error::DataFusionError;
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::physical_expr_common::metrics::MetricsSet;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
Expand Down Expand Up @@ -263,4 +264,8 @@ impl ExecutionPlan for NetworkBroadcastExec {
futures::stream::select_all(streams),
)))
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.worker_connections.metrics.clone_inner())
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update the other network boundary nodes too? Like shuffle, coalese.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's already in the other ones, it was missing in this one because this two PRs where merged roughly at the same time:

}
18 changes: 17 additions & 1 deletion src/execution_plans/partition_isolator.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use crate::DistributedTaskContext;
use crate::common::require_one_child;
use datafusion::execution::TaskContext;
use datafusion::physical_expr_common::metrics::MetricsSet;
use datafusion::physical_plan::ExecutionPlanProperties;
use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::{
error::Result,
execution::SendableRecordBatchStream,
Expand All @@ -10,6 +13,7 @@ use datafusion::{
PlanProperties,
},
};
use futures::TryStreamExt;
use std::{fmt::Formatter, sync::Arc};

/// This is a simple [ExecutionPlan] that isolates a set of N partitions from an input
Expand Down Expand Up @@ -52,6 +56,7 @@ pub struct PartitionIsolatorExec {
pub(crate) input: Arc<dyn ExecutionPlan>,
pub(crate) properties: PlanProperties,
pub(crate) n_tasks: usize,
pub(crate) metrics: ExecutionPlanMetricsSet,
}

impl PartitionIsolatorExec {
Expand All @@ -69,6 +74,7 @@ impl PartitionIsolatorExec {
input: input.clone(),
properties,
n_tasks,
metrics: ExecutionPlanMetricsSet::new(),
}
}

Expand Down Expand Up @@ -150,6 +156,7 @@ impl ExecutionPlan for PartitionIsolatorExec {
) -> Result<SendableRecordBatchStream> {
let task_context = DistributedTaskContext::from_ctx(&context);

let metric = MetricBuilder::new(&self.metrics).output_rows(partition);
let input_partitions = self.input.output_partitioning().partition_count();

let partition_group = Self::partition_group(
Expand All @@ -169,13 +176,22 @@ impl ExecutionPlan for PartitionIsolatorExec {
Ok(Box::pin(EmptyRecordBatchStream::new(self.input.schema()))
as SendableRecordBatchStream)
} else {
self.input.execute(*actual_partition_number, context)
Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
self.input
.execute(*actual_partition_number, context)?
.inspect_ok(move |v| metric.add(v.num_rows())),
)))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's some assertions in #314 which are made weaker to skip over PartitionIsolatorExec nodes. Do you think you can revisit those assertions?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! done

}
}
None => Ok(Box::pin(EmptyRecordBatchStream::new(self.input.schema()))
as SendableRecordBatchStream),
}
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
}

#[cfg(test)]
Expand Down
27 changes: 27 additions & 0 deletions tests/metrics_collection.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#[cfg(all(feature = "integration", test))]
mod tests {
use datafusion::catalog::memory::DataSourceExec;
use datafusion::common::assert_not_contains;
use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{ExecutionPlan, execute_stream};
use datafusion::prelude::SessionContext;
use datafusion_distributed::test_utils::localhost::start_localhost_context;
Expand Down Expand Up @@ -163,6 +165,31 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_metric_collection_display_all_have_metrics()
-> Result<(), Box<dyn std::error::Error>> {
let format = DistributedMetricsFormat::PerTask;
let (d_ctx, _guard, _) = start_localhost_context(3, DefaultSessionBuilder).await;

let query =
r#"SELECT count(*), "RainToday" FROM weather GROUP BY "RainToday" ORDER BY count(*)"#;

let s_ctx = SessionContext::default();
let (_, mut d_physical) = execute(&s_ctx, &d_ctx, query).await?;
d_physical = rewrite_distributed_plan_with_metrics(d_physical.clone(), format)?;

let display =
DisplayableExecutionPlan::with_metrics(d_physical.children().swap_remove(0).as_ref())
.indent(true)
.to_string();
assert_not_contains!(display, "metrics=[]");

let display = display_plan_ascii(d_physical.as_ref(), true);
assert_not_contains!(display, "metrics=[]");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might as well use the node_metrics helper. You can also assert the exact output rows since that's very deterministic (or at least, assert they are equal).

Copy link
Collaborator Author

@gabotechs gabotechs Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bug was specific to the display_plan_ascii visualization, the metrics have always been placed correctly, so accessing the metrics programatically was succeeding. This is why we were not able to catch the bug before.


Ok(())
}

/// Looks for an [ExecutionPlan] that matches the provided type parameter `T` in
/// both root nodes and compares its metrics.
/// There might be more than one, so `index` determines which one is compared.
Expand Down