Skip to content

Commit 5fc4b41

Browse files
committed
Make metrics collection optional
1 parent 558c3c4 commit 5fc4b41

File tree

15 files changed

+142
-33
lines changed

15 files changed

+142
-33
lines changed

benchmarks/cdk/bin/datafusion-bench.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ async function main () {
1616
.option('--files-per-task <number>', 'Files per task', '4')
1717
.option('--cardinality-task-sf <number>', 'Cardinality task scale factor', '2')
1818
.option('--shuffle-batch-size <number>', 'Shuffle batch coalescing size (number of rows)', '8192')
19+
.option('--collect-metrics <boolean>', 'Propagates metric collection', 'true')
1920
.option('--query <number>', 'A specific query to run', undefined)
2021
.parse(process.argv);
2122

@@ -26,6 +27,7 @@ async function main () {
2627
const filesPerTask = parseInt(options.filesPerTask);
2728
const cardinalityTaskSf = parseInt(options.cardinalityTaskSf);
2829
const shuffleBatchSize = parseInt(options.shuffleBatchSize);
30+
const collectMetrics = options.collectMetrics === 'true' || options.collectMetrics === 1
2931

3032
// Compare with previous results first
3133
const results: BenchmarkResults = { queries: [] };
@@ -36,7 +38,8 @@ async function main () {
3638
await query(`
3739
SET distributed.files_per_task=${filesPerTask};
3840
SET distributed.cardinality_task_count_factor=${cardinalityTaskSf};
39-
SET distributed.shuffle_batch_size=${shuffleBatchSize}
41+
SET distributed.shuffle_batch_size=${shuffleBatchSize};
42+
SET distributed.collect_metrics=${collectMetrics}
4043
`)
4144

4245
for (let id of IDS) {

benchmarks/src/tpch/run.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,10 @@ pub struct RunOpt {
120120
/// Task count scale factor for when nodes in stages change the cardinality of the data
121121
#[structopt(long)]
122122
cardinality_task_sf: Option<f64>,
123+
124+
/// Collects metrics across network boundaries
125+
#[structopt(long)]
126+
collect_metrics: bool,
123127
}
124128

125129
#[async_trait]
@@ -147,7 +151,8 @@ impl DistributedSessionBuilder for RunOpt {
147151
)?
148152
.with_distributed_cardinality_effect_task_scale_factor(
149153
self.cardinality_task_sf.unwrap_or(1.0),
150-
)?;
154+
)?
155+
.with_distributed_metrics_collection(self.collect_metrics)?;
151156

152157
if self.mem_table {
153158
builder = builder.with_physical_optimizer_rule(Arc::new(InMemoryDataSourceRule));

src/channel_resolver_ext.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::DistributedConfig;
2+
use crate::config_extension_ext::set_distributed_option_extension;
23
use arrow_flight::flight_service_client::FlightServiceClient;
34
use async_trait::async_trait;
45
use datafusion::common::exec_err;
@@ -17,10 +18,10 @@ pub(crate) fn set_distributed_channel_resolver(
1718
if let Some(distributed_cfg) = opts.extensions.get_mut::<DistributedConfig>() {
1819
distributed_cfg.__private_channel_resolver = channel_resolver_ext;
1920
} else {
20-
opts.extensions.insert(DistributedConfig {
21+
set_distributed_option_extension(cfg, DistributedConfig {
2122
__private_channel_resolver: channel_resolver_ext,
2223
..Default::default()
23-
});
24+
}).expect("Calling set_distributed_option_extension with a default DistributedConfig should never fail");
2425
}
2526
}
2627

src/config_extension_ext.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,14 @@ pub(crate) fn set_distributed_option_extension<T: ConfigExtension + Default>(
1919
let mut meta = HeaderMap::new();
2020

2121
for entry in t.entries() {
22+
// assume that fields starting with "__" are private, and are not supposed to be sent
23+
// over the wire. This accounts for the fact that we need to send our DistributedConfig
24+
// options without setting the __private_task_estimator and __private_channel_resolver.
25+
// Ideally those two fields should not even be there on the first place, but until
26+
// https://github.com/apache/datafusion/pull/18739 we need to put them there.
27+
if entry.key.starts_with("__") {
28+
continue;
29+
}
2230
if let Some(value) = entry.value {
2331
meta.insert(
2432
HeaderName::from_str(&format!(

src/distributed_ext.rs

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,13 @@ pub trait DistributedExt: Sized {
360360
&mut self,
361361
factor: f64,
362362
) -> Result<(), DataFusionError>;
363+
364+
/// Enables metrics collection across network boundaries so that all the metrics gather in
365+
/// each node are accessible from the head stage that started running the query.
366+
fn with_distributed_metrics_collection(self, enabled: bool) -> Result<Self, DataFusionError>;
367+
368+
/// Same as [DistributedExt::with_distributed_metrics_collection] but with an in-place mutation.
369+
fn set_distributed_metrics_collection(&mut self, enabled: bool) -> Result<(), DataFusionError>;
363370
}
364371

365372
impl DistributedExt for SessionConfig {
@@ -374,7 +381,8 @@ impl DistributedExt for SessionConfig {
374381
&mut self,
375382
headers: &HeaderMap,
376383
) -> Result<(), DataFusionError> {
377-
set_distributed_option_extension_from_headers::<T>(self, headers)
384+
set_distributed_option_extension_from_headers::<T>(self, headers)?;
385+
Ok(())
378386
}
379387

380388
fn set_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(&mut self, codec: T) {
@@ -417,6 +425,12 @@ impl DistributedExt for SessionConfig {
417425
Ok(())
418426
}
419427

428+
fn set_distributed_metrics_collection(&mut self, enabled: bool) -> Result<(), DataFusionError> {
429+
let d_cfg = DistributedConfig::from_config_options_mut(self.options_mut())?;
430+
d_cfg.collect_metrics = enabled;
431+
Ok(())
432+
}
433+
420434
delegate! {
421435
to self {
422436
#[call(set_distributed_option_extension)]
@@ -450,6 +464,10 @@ impl DistributedExt for SessionConfig {
450464
#[call(set_distributed_cardinality_effect_task_scale_factor)]
451465
#[expr($?;Ok(self))]
452466
fn with_distributed_cardinality_effect_task_scale_factor(mut self, factor: f64) -> Result<Self, DataFusionError>;
467+
468+
#[call(set_distributed_metrics_collection)]
469+
#[expr($?;Ok(self))]
470+
fn with_distributed_metrics_collection(mut self, enabled: bool) -> Result<Self, DataFusionError>;
453471
}
454472
}
455473
}
@@ -496,6 +514,11 @@ impl DistributedExt for SessionStateBuilder {
496514
#[call(set_distributed_cardinality_effect_task_scale_factor)]
497515
#[expr($?;Ok(self))]
498516
fn with_distributed_cardinality_effect_task_scale_factor(mut self, factor: f64) -> Result<Self, DataFusionError>;
517+
518+
fn set_distributed_metrics_collection(&mut self, enabled: bool) -> Result<(), DataFusionError>;
519+
#[call(set_distributed_metrics_collection)]
520+
#[expr($?;Ok(self))]
521+
fn with_distributed_metrics_collection(mut self, enabled: bool) -> Result<Self, DataFusionError>;
499522
}
500523
}
501524
}
@@ -542,6 +565,11 @@ impl DistributedExt for SessionState {
542565
#[call(set_distributed_cardinality_effect_task_scale_factor)]
543566
#[expr($?;Ok(self))]
544567
fn with_distributed_cardinality_effect_task_scale_factor(mut self, factor: f64) -> Result<Self, DataFusionError>;
568+
569+
fn set_distributed_metrics_collection(&mut self, enabled: bool) -> Result<(), DataFusionError>;
570+
#[call(set_distributed_metrics_collection)]
571+
#[expr($?;Ok(self))]
572+
fn with_distributed_metrics_collection(mut self, enabled: bool) -> Result<Self, DataFusionError>;
545573
}
546574
}
547575
}
@@ -588,6 +616,11 @@ impl DistributedExt for SessionContext {
588616
#[call(set_distributed_cardinality_effect_task_scale_factor)]
589617
#[expr($?;Ok(self))]
590618
fn with_distributed_cardinality_effect_task_scale_factor(self, factor: f64) -> Result<Self, DataFusionError>;
619+
620+
fn set_distributed_metrics_collection(&mut self, enabled: bool) -> Result<(), DataFusionError>;
621+
#[call(set_distributed_metrics_collection)]
622+
#[expr($?;Ok(self))]
623+
fn with_distributed_metrics_collection(self, enabled: bool) -> Result<Self, DataFusionError>;
591624
}
592625
}
593626
}

src/distributed_planner/distributed_config.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ extensions_options! {
3131
/// batches over the wire.
3232
/// If set to 0, batch coalescing is disabled on network shuffle operations.
3333
pub shuffle_batch_size: usize, default = 8192
34+
/// Propagate collected metrics from all nodes in the plan across network boundaries
35+
/// so that they can be reconstructed on the head node of the plan.
36+
pub collect_metrics: bool, default = false
3437
/// Collection of [TaskEstimator]s that will be applied to leaf nodes in order to
3538
/// estimate how many tasks should be spawned for the [Stage] containing the leaf node.
3639
pub(crate) __private_task_estimator: CombinedTaskEstimator, default = CombinedTaskEstimator::default()

src/distributed_planner/task_estimator.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::config_extension_ext::set_distributed_option_extension;
12
use crate::{ChannelResolver, DistributedConfig, PartitionIsolatorExec};
23
use datafusion::catalog::memory::DataSourceExec;
34
use datafusion::config::ConfigOptions;
@@ -103,10 +104,10 @@ pub(crate) fn set_distributed_task_estimator(
103104
} else {
104105
let mut estimators = CombinedTaskEstimator::default();
105106
estimators.user_provided.push(Arc::new(estimator));
106-
opts.extensions.insert(DistributedConfig {
107+
set_distributed_option_extension(cfg, DistributedConfig {
107108
__private_task_estimator: estimators,
108109
..Default::default()
109-
});
110+
}).expect("Calling set_distributed_option_extension with a default DistributedConfig should never fail");
110111
}
111112
}
112113

src/execution_plans/common.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
use crate::DistributedConfig;
12
use datafusion::common::{DataFusionError, plan_err};
23
use datafusion::physical_expr::Partitioning;
34
use datafusion::physical_plan::{ExecutionPlan, PlanProperties};
5+
use http::HeaderMap;
46
use std::borrow::Borrow;
57
use std::sync::Arc;
68

@@ -40,3 +42,16 @@ pub(super) fn scale_partitioning(
4042
Partitioning::UnknownPartitioning(p) => Partitioning::UnknownPartitioning(f(*p)),
4143
}
4244
}
45+
46+
/// Manual propagation of the [DistributedConfig] fields relevant for execution. Can be removed
47+
/// after https://github.com/datafusion-contrib/datafusion-distributed/issues/247 is fixed, as this will become automatic.
48+
pub(super) fn manually_propagate_distributed_config(
49+
mut headers: HeaderMap,
50+
d_cfg: &DistributedConfig,
51+
) -> HeaderMap {
52+
headers.insert(
53+
"distributed.collect_metrics",
54+
d_cfg.collect_metrics.to_string().parse().unwrap(),
55+
);
56+
headers
57+
}

src/execution_plans/network_coalesce.rs

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
use crate::channel_resolver_ext::get_distributed_channel_resolver;
22
use crate::config_extension_ext::ContextGrpcMetadata;
33
use crate::distributed_planner::{InputStageInfo, NetworkBoundary, limit_tasks_err};
4-
use crate::execution_plans::common::{require_one_child, scale_partitioning_props};
4+
use crate::execution_plans::common::{
5+
manually_propagate_distributed_config, require_one_child, scale_partitioning_props,
6+
};
57
use crate::flight_service::DoGet;
68
use crate::metrics::MetricsCollectingStream;
79
use crate::metrics::proto::MetricsSetProto;
810
use crate::protobuf::{StageKey, map_flight_to_datafusion_error, map_status_to_datafusion_error};
911
use crate::stage::{MaybeEncodedPlan, Stage};
10-
use crate::{ChannelResolver, DistributedTaskContext};
12+
use crate::{ChannelResolver, DistributedConfig, DistributedTaskContext};
1113
use arrow_flight::Ticket;
1214
use arrow_flight::decode::FlightRecordBatchStream;
1315
use arrow_flight::error::FlightError;
@@ -18,7 +20,7 @@ use datafusion::error::DataFusionError;
1820
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
1921
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
2022
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
21-
use futures::{TryFutureExt, TryStreamExt};
23+
use futures::{StreamExt, TryFutureExt, TryStreamExt};
2224
use http::Extensions;
2325
use prost::Message;
2426
use std::any::Any;
@@ -258,6 +260,9 @@ impl ExecutionPlan for NetworkCoalesceExec {
258260
// get the channel manager and current stage from our context
259261
let channel_resolver = get_distributed_channel_resolver(context.session_config())?;
260262

263+
let d_cfg = DistributedConfig::from_config_options(context.session_config().options())?;
264+
let retrieve_metrics = d_cfg.collect_metrics;
265+
261266
let input_stage = &self_ready.input_stage;
262267
let encoded_input_plan = input_stage.plan.encoded()?;
263268

@@ -273,8 +278,10 @@ impl ExecutionPlan for NetworkCoalesceExec {
273278
let target_task = partition / partitions_per_task;
274279
let target_partition = partition % partitions_per_task;
275280

281+
// TODO: this propagation should be automatic <link to issue>
282+
let context_headers = manually_propagate_distributed_config(context_headers, d_cfg);
276283
let ticket = Request::from_parts(
277-
MetadataMap::from_headers(context_headers.clone()),
284+
MetadataMap::from_headers(context_headers),
278285
Extensions::default(),
279286
Ticket {
280287
ticket: DoGet {
@@ -311,13 +318,14 @@ impl ExecutionPlan for NetworkCoalesceExec {
311318
.into_inner()
312319
.map_err(|err| FlightError::Tonic(Box::new(err)));
313320

314-
let metrics_collecting_stream =
315-
MetricsCollectingStream::new(stream, metrics_collection_capture);
321+
let stream = if retrieve_metrics {
322+
MetricsCollectingStream::new(stream, metrics_collection_capture).left_stream()
323+
} else {
324+
stream.right_stream()
325+
};
316326

317-
Ok(
318-
FlightRecordBatchStream::new_from_flight_data(metrics_collecting_stream)
319-
.map_err(map_flight_to_datafusion_error),
320-
)
327+
Ok(FlightRecordBatchStream::new_from_flight_data(stream)
328+
.map_err(map_flight_to_datafusion_error))
321329
}
322330
.try_flatten_stream();
323331

src/execution_plans/network_shuffle.rs

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
use crate::channel_resolver_ext::get_distributed_channel_resolver;
22
use crate::config_extension_ext::ContextGrpcMetadata;
3-
use crate::execution_plans::common::{require_one_child, scale_partitioning};
3+
use crate::execution_plans::common::{
4+
manually_propagate_distributed_config, require_one_child, scale_partitioning,
5+
};
46
use crate::flight_service::DoGet;
57
use crate::metrics::MetricsCollectingStream;
68
use crate::metrics::proto::MetricsSetProto;
79
use crate::protobuf::StageKey;
810
use crate::protobuf::{map_flight_to_datafusion_error, map_status_to_datafusion_error};
911
use crate::stage::{MaybeEncodedPlan, Stage};
10-
use crate::{ChannelResolver, DistributedTaskContext, InputStageInfo, NetworkBoundary};
12+
use crate::{
13+
ChannelResolver, DistributedConfig, DistributedTaskContext, InputStageInfo, NetworkBoundary,
14+
};
1115
use arrow_flight::Ticket;
1216
use arrow_flight::decode::FlightRecordBatchStream;
1317
use arrow_flight::error::FlightError;
@@ -325,6 +329,9 @@ impl ExecutionPlan for NetworkShuffleExec {
325329
// get the channel manager and current stage from our context
326330
let channel_resolver = get_distributed_channel_resolver(context.session_config())?;
327331

332+
let d_cfg = DistributedConfig::from_config_options(context.session_config().options())?;
333+
let retrieve_metrics = d_cfg.collect_metrics;
334+
328335
let input_stage = &self_ready.input_stage;
329336
let encoded_input_plan = input_stage.plan.encoded()?;
330337

@@ -337,6 +344,8 @@ impl ExecutionPlan for NetworkShuffleExec {
337344
let task_context = DistributedTaskContext::from_ctx(&context);
338345
let off = self_ready.properties.partitioning.partition_count() * task_context.task_index;
339346

347+
// TODO: this propagation should be automatic <link to issue>
348+
let context_headers = manually_propagate_distributed_config(context_headers, d_cfg);
340349
let stream = input_stage_tasks.into_iter().enumerate().map(|(i, task)| {
341350
let channel_resolver = Arc::clone(&channel_resolver);
342351

@@ -370,13 +379,14 @@ impl ExecutionPlan for NetworkShuffleExec {
370379
.into_inner()
371380
.map_err(|err| FlightError::Tonic(Box::new(err)));
372381

373-
let metrics_collecting_stream =
374-
MetricsCollectingStream::new(stream, metrics_collection_capture);
382+
let stream = if retrieve_metrics {
383+
MetricsCollectingStream::new(stream, metrics_collection_capture).left_stream()
384+
} else {
385+
stream.right_stream()
386+
};
375387

376-
Ok(
377-
FlightRecordBatchStream::new_from_flight_data(metrics_collecting_stream)
378-
.map_err(map_flight_to_datafusion_error),
379-
)
388+
Ok(FlightRecordBatchStream::new_from_flight_data(stream)
389+
.map_err(map_flight_to_datafusion_error))
380390
}
381391
.try_flatten_stream()
382392
.boxed()

0 commit comments

Comments
 (0)