Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion benchmarks/cdk/bin/datafusion-bench.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ async function main () {
.option('--files-per-task <number>', 'Files per task', '4')
.option('--cardinality-task-sf <number>', 'Cardinality task scale factor', '2')
.option('--shuffle-batch-size <number>', 'Shuffle batch coalescing size (number of rows)', '8192')
.option('--collect-metrics <boolean>', 'Propagates metric collection', 'true')
.option('--query <number>', 'A specific query to run', undefined)
.parse(process.argv);

Expand All @@ -26,6 +27,7 @@ async function main () {
const filesPerTask = parseInt(options.filesPerTask);
const cardinalityTaskSf = parseInt(options.cardinalityTaskSf);
const shuffleBatchSize = parseInt(options.shuffleBatchSize);
const collectMetrics = options.collectMetrics === 'true' || options.collectMetrics === 1

// Compare with previous results first
const results: BenchmarkResults = { queries: [] };
Expand All @@ -36,7 +38,8 @@ async function main () {
await query(`
SET distributed.files_per_task=${filesPerTask};
SET distributed.cardinality_task_count_factor=${cardinalityTaskSf};
SET distributed.shuffle_batch_size=${shuffleBatchSize}
SET distributed.shuffle_batch_size=${shuffleBatchSize};
SET distributed.collect_metrics=${collectMetrics}
`)

for (let id of IDS) {
Expand Down
7 changes: 6 additions & 1 deletion benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ pub struct RunOpt {
/// Task count scale factor for when nodes in stages change the cardinality of the data
#[structopt(long)]
cardinality_task_sf: Option<f64>,

/// Collects metrics across network boundaries
#[structopt(long)]
collect_metrics: bool,
}

#[async_trait]
Expand Down Expand Up @@ -147,7 +151,8 @@ impl DistributedSessionBuilder for RunOpt {
)?
.with_distributed_cardinality_effect_task_scale_factor(
self.cardinality_task_sf.unwrap_or(1.0),
)?;
)?
.with_distributed_metrics_collection(self.collect_metrics)?;

if self.mem_table {
builder = builder.with_physical_optimizer_rule(Arc::new(InMemoryDataSourceRule));
Expand Down
5 changes: 3 additions & 2 deletions src/channel_resolver_ext.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::DistributedConfig;
use crate::config_extension_ext::set_distributed_option_extension;
use arrow_flight::flight_service_client::FlightServiceClient;
use async_trait::async_trait;
use datafusion::common::exec_err;
Expand All @@ -17,10 +18,10 @@ pub(crate) fn set_distributed_channel_resolver(
if let Some(distributed_cfg) = opts.extensions.get_mut::<DistributedConfig>() {
distributed_cfg.__private_channel_resolver = channel_resolver_ext;
} else {
opts.extensions.insert(DistributedConfig {
set_distributed_option_extension(cfg, DistributedConfig {
__private_channel_resolver: channel_resolver_ext,
..Default::default()
});
}).expect("Calling set_distributed_option_extension with a default DistributedConfig should never fail");
}
}

Expand Down
74 changes: 59 additions & 15 deletions src/config_extension_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ pub(crate) fn set_distributed_option_extension<T: ConfigExtension + Default>(
let mut meta = HeaderMap::new();

for entry in t.entries() {
// assume that fields starting with "__" are private, and are not supposed to be sent
// over the wire. This accounts for the fact that we need to send our DistributedConfig
// options without setting the __private_task_estimator and __private_channel_resolver.
// Ideally those two fields should not even be there on the first place, but until
// https://github.com/apache/datafusion/pull/18739 we need to put them there.
if entry.key.starts_with("__") {
continue;
}
if let Some(value) = entry.value {
meta.insert(
HeaderName::from_str(&format!(
Expand All @@ -44,29 +52,51 @@ pub(crate) fn set_distributed_option_extension<T: ConfigExtension + Default>(
Ok(())
}

pub(crate) fn set_distributed_option_extension_from_headers<T: ConfigExtension + Default>(
cfg: &mut SessionConfig,
pub(crate) fn set_distributed_option_extension_from_headers<'a, T: ConfigExtension + Default>(
cfg: &'a mut SessionConfig,
headers: &HeaderMap,
) -> Result<(), DataFusionError> {
let mut result = T::default();
let mut found_some = false;
) -> Result<&'a T, DataFusionError> {
enum MutOrOwned<'a, T> {
Mut(&'a mut T),
Owned(T),
}

impl<'a, T> MutOrOwned<'a, T> {
fn as_mut(&mut self) -> &mut T {
match self {
MutOrOwned::Mut(v) => v,
MutOrOwned::Owned(v) => v,
}
}
}

// If the config extension existed before, we want to modify instead of adding a new one from
// scratch. If not, we'll start from scratch with a new one.
let mut result = match cfg.options_mut().extensions.get_mut::<T>() {
Some(v) => MutOrOwned::Mut(v),
None => MutOrOwned::Owned(T::default()),
};

for (k, v) in headers.iter() {
let key = k.as_str().trim_start_matches(FLIGHT_METADATA_CONFIG_PREFIX);
let prefix = format!("{}.", T::PREFIX);
if key.starts_with(&prefix) {
found_some = true;
result.set(
result.as_mut().set(
key.trim_start_matches(&prefix),
v.to_str()
.map_err(|err| internal_datafusion_err!("Cannot parse header value: {err}"))?,
)?;
}
}
if !found_some {
return Ok(());

// Only insert the extension if it is not already there. If this is otherwise MutOrOwned::Mut it
// means that the extension was already there, and we already modified it.
if let MutOrOwned::Owned(v) = result {
cfg.options_mut().extensions.insert(v);
}
cfg.options_mut().extensions.insert(result);
Ok(())
cfg.options().extensions.get().ok_or_else(|| {
internal_datafusion_err!("ProgrammingError: a config option extension was just inserted, but it was not immediately retrievable")
})
}

#[derive(Clone, Debug, Default)]
Expand Down Expand Up @@ -190,8 +220,15 @@ mod tests {
&Default::default(),
)?;

let extension = config.options().extensions.get::<CustomExtension>();
assert!(extension.is_none());
let extension = config
.options()
.extensions
.get::<CustomExtension>()
.unwrap();
let default = CustomExtension::default();
assert_eq!(extension.foo, default.foo);
assert_eq!(extension.bar, default.bar);
assert_eq!(extension.baz, default.baz);

Ok(())
}
Expand All @@ -207,8 +244,15 @@ mod tests {

set_distributed_option_extension_from_headers::<CustomExtension>(&mut config, &header_map)?;

let extension = config.options().extensions.get::<CustomExtension>();
assert!(extension.is_none());
let extension = config
.options()
.extensions
.get::<CustomExtension>()
.unwrap();
let default = CustomExtension::default();
assert_eq!(extension.foo, default.foo);
assert_eq!(extension.bar, default.bar);
assert_eq!(extension.baz, default.baz);

Ok(())
}
Expand Down
41 changes: 38 additions & 3 deletions src/distributed_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,10 @@ pub trait DistributedExt: Sized {
/// method with their own extensions to be able to access them in any place in the
/// plan.
///
/// This method also adds the provided [ConfigExtension] to the current session option
/// extensions, the same as calling [SessionConfig::with_option_extension].
/// - If there was a [ConfigExtension] of the same type already present, it's updated with an
/// in-place mutation base on the headers that came over the wire.
/// - If there was no [ConfigExtension] set before, it will get added, as if
/// [SessionConfig::with_option_extension] was being called.
///
/// Example:
///
Expand Down Expand Up @@ -358,6 +360,13 @@ pub trait DistributedExt: Sized {
&mut self,
factor: f64,
) -> Result<(), DataFusionError>;

/// Enables metrics collection across network boundaries so that all the metrics gather in
/// each node are accessible from the head stage that started running the query.
fn with_distributed_metrics_collection(self, enabled: bool) -> Result<Self, DataFusionError>;

/// Same as [DistributedExt::with_distributed_metrics_collection] but with an in-place mutation.
fn set_distributed_metrics_collection(&mut self, enabled: bool) -> Result<(), DataFusionError>;
}

impl DistributedExt for SessionConfig {
Expand All @@ -372,7 +381,8 @@ impl DistributedExt for SessionConfig {
&mut self,
headers: &HeaderMap,
) -> Result<(), DataFusionError> {
set_distributed_option_extension_from_headers::<T>(self, headers)
set_distributed_option_extension_from_headers::<T>(self, headers)?;
Ok(())
}

fn set_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(&mut self, codec: T) {
Expand Down Expand Up @@ -415,6 +425,12 @@ impl DistributedExt for SessionConfig {
Ok(())
}

fn set_distributed_metrics_collection(&mut self, enabled: bool) -> Result<(), DataFusionError> {
let d_cfg = DistributedConfig::from_config_options_mut(self.options_mut())?;
d_cfg.collect_metrics = enabled;
Ok(())
}

delegate! {
to self {
#[call(set_distributed_option_extension)]
Expand Down Expand Up @@ -448,6 +464,10 @@ impl DistributedExt for SessionConfig {
#[call(set_distributed_cardinality_effect_task_scale_factor)]
#[expr($?;Ok(self))]
fn with_distributed_cardinality_effect_task_scale_factor(mut self, factor: f64) -> Result<Self, DataFusionError>;

#[call(set_distributed_metrics_collection)]
#[expr($?;Ok(self))]
fn with_distributed_metrics_collection(mut self, enabled: bool) -> Result<Self, DataFusionError>;
}
}
}
Expand Down Expand Up @@ -494,6 +514,11 @@ impl DistributedExt for SessionStateBuilder {
#[call(set_distributed_cardinality_effect_task_scale_factor)]
#[expr($?;Ok(self))]
fn with_distributed_cardinality_effect_task_scale_factor(mut self, factor: f64) -> Result<Self, DataFusionError>;

fn set_distributed_metrics_collection(&mut self, enabled: bool) -> Result<(), DataFusionError>;
#[call(set_distributed_metrics_collection)]
#[expr($?;Ok(self))]
fn with_distributed_metrics_collection(mut self, enabled: bool) -> Result<Self, DataFusionError>;
}
}
}
Expand Down Expand Up @@ -540,6 +565,11 @@ impl DistributedExt for SessionState {
#[call(set_distributed_cardinality_effect_task_scale_factor)]
#[expr($?;Ok(self))]
fn with_distributed_cardinality_effect_task_scale_factor(mut self, factor: f64) -> Result<Self, DataFusionError>;

fn set_distributed_metrics_collection(&mut self, enabled: bool) -> Result<(), DataFusionError>;
#[call(set_distributed_metrics_collection)]
#[expr($?;Ok(self))]
fn with_distributed_metrics_collection(mut self, enabled: bool) -> Result<Self, DataFusionError>;
}
}
}
Expand Down Expand Up @@ -586,6 +616,11 @@ impl DistributedExt for SessionContext {
#[call(set_distributed_cardinality_effect_task_scale_factor)]
#[expr($?;Ok(self))]
fn with_distributed_cardinality_effect_task_scale_factor(self, factor: f64) -> Result<Self, DataFusionError>;

fn set_distributed_metrics_collection(&mut self, enabled: bool) -> Result<(), DataFusionError>;
#[call(set_distributed_metrics_collection)]
#[expr($?;Ok(self))]
fn with_distributed_metrics_collection(self, enabled: bool) -> Result<Self, DataFusionError>;
}
}
}
3 changes: 3 additions & 0 deletions src/distributed_planner/distributed_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ extensions_options! {
/// batches over the wire.
/// If set to 0, batch coalescing is disabled on network shuffle operations.
pub shuffle_batch_size: usize, default = 8192
/// Propagate collected metrics from all nodes in the plan across network boundaries
/// so that they can be reconstructed on the head node of the plan.
pub collect_metrics: bool, default = false
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: defaulting to true might be better since that's how vanilla df works.

/// Collection of [TaskEstimator]s that will be applied to leaf nodes in order to
/// estimate how many tasks should be spawned for the [Stage] containing the leaf node.
pub(crate) __private_task_estimator: CombinedTaskEstimator, default = CombinedTaskEstimator::default()
Expand Down
5 changes: 3 additions & 2 deletions src/distributed_planner/task_estimator.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::config_extension_ext::set_distributed_option_extension;
use crate::{ChannelResolver, DistributedConfig, PartitionIsolatorExec};
use datafusion::catalog::memory::DataSourceExec;
use datafusion::config::ConfigOptions;
Expand Down Expand Up @@ -103,10 +104,10 @@ pub(crate) fn set_distributed_task_estimator(
} else {
let mut estimators = CombinedTaskEstimator::default();
estimators.user_provided.push(Arc::new(estimator));
opts.extensions.insert(DistributedConfig {
set_distributed_option_extension(cfg, DistributedConfig {
__private_task_estimator: estimators,
..Default::default()
});
}).expect("Calling set_distributed_option_extension with a default DistributedConfig should never fail");
}
}

Expand Down
15 changes: 15 additions & 0 deletions src/execution_plans/common.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::DistributedConfig;
use datafusion::common::{DataFusionError, plan_err};
use datafusion::physical_expr::Partitioning;
use datafusion::physical_plan::{ExecutionPlan, PlanProperties};
use http::HeaderMap;
use std::borrow::Borrow;
use std::sync::Arc;

Expand Down Expand Up @@ -40,3 +42,16 @@ pub(super) fn scale_partitioning(
Partitioning::UnknownPartitioning(p) => Partitioning::UnknownPartitioning(f(*p)),
}
}

/// Manual propagation of the [DistributedConfig] fields relevant for execution. Can be removed
/// after https://github.com/datafusion-contrib/datafusion-distributed/issues/247 is fixed, as this will become automatic.
pub(super) fn manually_propagate_distributed_config(
mut headers: HeaderMap,
d_cfg: &DistributedConfig,
) -> HeaderMap {
headers.insert(
"distributed.collect_metrics",
d_cfg.collect_metrics.to_string().parse().unwrap(),
);
headers
}
Loading