diff --git a/benchmarks/cdk/bin/datafusion-bench.ts b/benchmarks/cdk/bin/datafusion-bench.ts index 7363130..bc0953a 100644 --- a/benchmarks/cdk/bin/datafusion-bench.ts +++ b/benchmarks/cdk/bin/datafusion-bench.ts @@ -16,6 +16,7 @@ async function main () { .option('--files-per-task ', 'Files per task', '4') .option('--cardinality-task-sf ', 'Cardinality task scale factor', '2') .option('--shuffle-batch-size ', 'Shuffle batch coalescing size (number of rows)', '8192') + .option('--collect-metrics ', 'Propagates metric collection', 'true') .option('--query ', 'A specific query to run', undefined) .parse(process.argv); @@ -26,6 +27,7 @@ async function main () { const filesPerTask = parseInt(options.filesPerTask); const cardinalityTaskSf = parseInt(options.cardinalityTaskSf); const shuffleBatchSize = parseInt(options.shuffleBatchSize); + const collectMetrics = options.collectMetrics === 'true' || options.collectMetrics === 1 // Compare with previous results first const results: BenchmarkResults = { queries: [] }; @@ -36,7 +38,8 @@ async function main () { await query(` SET distributed.files_per_task=${filesPerTask}; SET distributed.cardinality_task_count_factor=${cardinalityTaskSf}; - SET distributed.shuffle_batch_size=${shuffleBatchSize} + SET distributed.shuffle_batch_size=${shuffleBatchSize}; + SET distributed.collect_metrics=${collectMetrics} `) for (let id of IDS) { diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs index 72e6088..b5f9451 100644 --- a/benchmarks/src/tpch/run.rs +++ b/benchmarks/src/tpch/run.rs @@ -120,6 +120,10 @@ pub struct RunOpt { /// Task count scale factor for when nodes in stages change the cardinality of the data #[structopt(long)] cardinality_task_sf: Option, + + /// Collects metrics across network boundaries + #[structopt(long)] + collect_metrics: bool, } #[async_trait] @@ -147,7 +151,8 @@ impl DistributedSessionBuilder for RunOpt { )? .with_distributed_cardinality_effect_task_scale_factor( self.cardinality_task_sf.unwrap_or(1.0), - )?; + )? + .with_distributed_metrics_collection(self.collect_metrics)?; if self.mem_table { builder = builder.with_physical_optimizer_rule(Arc::new(InMemoryDataSourceRule)); diff --git a/src/channel_resolver_ext.rs b/src/channel_resolver_ext.rs index a9e7fd6..7d48fb0 100644 --- a/src/channel_resolver_ext.rs +++ b/src/channel_resolver_ext.rs @@ -1,4 +1,5 @@ use crate::DistributedConfig; +use crate::config_extension_ext::set_distributed_option_extension; use arrow_flight::flight_service_client::FlightServiceClient; use async_trait::async_trait; use datafusion::common::exec_err; @@ -17,10 +18,10 @@ pub(crate) fn set_distributed_channel_resolver( if let Some(distributed_cfg) = opts.extensions.get_mut::() { distributed_cfg.__private_channel_resolver = channel_resolver_ext; } else { - opts.extensions.insert(DistributedConfig { + set_distributed_option_extension(cfg, DistributedConfig { __private_channel_resolver: channel_resolver_ext, ..Default::default() - }); + }).expect("Calling set_distributed_option_extension with a default DistributedConfig should never fail"); } } diff --git a/src/config_extension_ext.rs b/src/config_extension_ext.rs index a287bf2..9462fe5 100644 --- a/src/config_extension_ext.rs +++ b/src/config_extension_ext.rs @@ -19,6 +19,14 @@ pub(crate) fn set_distributed_option_extension( let mut meta = HeaderMap::new(); for entry in t.entries() { + // assume that fields starting with "__" are private, and are not supposed to be sent + // over the wire. This accounts for the fact that we need to send our DistributedConfig + // options without setting the __private_task_estimator and __private_channel_resolver. + // Ideally those two fields should not even be there on the first place, but until + // https://github.com/apache/datafusion/pull/18739 we need to put them there. + if entry.key.starts_with("__") { + continue; + } if let Some(value) = entry.value { meta.insert( HeaderName::from_str(&format!( @@ -44,29 +52,51 @@ pub(crate) fn set_distributed_option_extension( Ok(()) } -pub(crate) fn set_distributed_option_extension_from_headers( - cfg: &mut SessionConfig, +pub(crate) fn set_distributed_option_extension_from_headers<'a, T: ConfigExtension + Default>( + cfg: &'a mut SessionConfig, headers: &HeaderMap, -) -> Result<(), DataFusionError> { - let mut result = T::default(); - let mut found_some = false; +) -> Result<&'a T, DataFusionError> { + enum MutOrOwned<'a, T> { + Mut(&'a mut T), + Owned(T), + } + + impl<'a, T> MutOrOwned<'a, T> { + fn as_mut(&mut self) -> &mut T { + match self { + MutOrOwned::Mut(v) => v, + MutOrOwned::Owned(v) => v, + } + } + } + + // If the config extension existed before, we want to modify instead of adding a new one from + // scratch. If not, we'll start from scratch with a new one. + let mut result = match cfg.options_mut().extensions.get_mut::() { + Some(v) => MutOrOwned::Mut(v), + None => MutOrOwned::Owned(T::default()), + }; + for (k, v) in headers.iter() { let key = k.as_str().trim_start_matches(FLIGHT_METADATA_CONFIG_PREFIX); let prefix = format!("{}.", T::PREFIX); if key.starts_with(&prefix) { - found_some = true; - result.set( + result.as_mut().set( key.trim_start_matches(&prefix), v.to_str() .map_err(|err| internal_datafusion_err!("Cannot parse header value: {err}"))?, )?; } } - if !found_some { - return Ok(()); + + // Only insert the extension if it is not already there. If this is otherwise MutOrOwned::Mut it + // means that the extension was already there, and we already modified it. + if let MutOrOwned::Owned(v) = result { + cfg.options_mut().extensions.insert(v); } - cfg.options_mut().extensions.insert(result); - Ok(()) + cfg.options().extensions.get().ok_or_else(|| { + internal_datafusion_err!("ProgrammingError: a config option extension was just inserted, but it was not immediately retrievable") + }) } #[derive(Clone, Debug, Default)] @@ -190,8 +220,15 @@ mod tests { &Default::default(), )?; - let extension = config.options().extensions.get::(); - assert!(extension.is_none()); + let extension = config + .options() + .extensions + .get::() + .unwrap(); + let default = CustomExtension::default(); + assert_eq!(extension.foo, default.foo); + assert_eq!(extension.bar, default.bar); + assert_eq!(extension.baz, default.baz); Ok(()) } @@ -207,8 +244,15 @@ mod tests { set_distributed_option_extension_from_headers::(&mut config, &header_map)?; - let extension = config.options().extensions.get::(); - assert!(extension.is_none()); + let extension = config + .options() + .extensions + .get::() + .unwrap(); + let default = CustomExtension::default(); + assert_eq!(extension.foo, default.foo); + assert_eq!(extension.bar, default.bar); + assert_eq!(extension.baz, default.baz); Ok(()) } diff --git a/src/distributed_ext.rs b/src/distributed_ext.rs index 6efdda0..49e797f 100644 --- a/src/distributed_ext.rs +++ b/src/distributed_ext.rs @@ -77,8 +77,10 @@ pub trait DistributedExt: Sized { /// method with their own extensions to be able to access them in any place in the /// plan. /// - /// This method also adds the provided [ConfigExtension] to the current session option - /// extensions, the same as calling [SessionConfig::with_option_extension]. + /// - If there was a [ConfigExtension] of the same type already present, it's updated with an + /// in-place mutation base on the headers that came over the wire. + /// - If there was no [ConfigExtension] set before, it will get added, as if + /// [SessionConfig::with_option_extension] was being called. /// /// Example: /// @@ -358,6 +360,13 @@ pub trait DistributedExt: Sized { &mut self, factor: f64, ) -> Result<(), DataFusionError>; + + /// Enables metrics collection across network boundaries so that all the metrics gather in + /// each node are accessible from the head stage that started running the query. + fn with_distributed_metrics_collection(self, enabled: bool) -> Result; + + /// Same as [DistributedExt::with_distributed_metrics_collection] but with an in-place mutation. + fn set_distributed_metrics_collection(&mut self, enabled: bool) -> Result<(), DataFusionError>; } impl DistributedExt for SessionConfig { @@ -372,7 +381,8 @@ impl DistributedExt for SessionConfig { &mut self, headers: &HeaderMap, ) -> Result<(), DataFusionError> { - set_distributed_option_extension_from_headers::(self, headers) + set_distributed_option_extension_from_headers::(self, headers)?; + Ok(()) } fn set_distributed_user_codec(&mut self, codec: T) { @@ -415,6 +425,12 @@ impl DistributedExt for SessionConfig { Ok(()) } + fn set_distributed_metrics_collection(&mut self, enabled: bool) -> Result<(), DataFusionError> { + let d_cfg = DistributedConfig::from_config_options_mut(self.options_mut())?; + d_cfg.collect_metrics = enabled; + Ok(()) + } + delegate! { to self { #[call(set_distributed_option_extension)] @@ -448,6 +464,10 @@ impl DistributedExt for SessionConfig { #[call(set_distributed_cardinality_effect_task_scale_factor)] #[expr($?;Ok(self))] fn with_distributed_cardinality_effect_task_scale_factor(mut self, factor: f64) -> Result; + + #[call(set_distributed_metrics_collection)] + #[expr($?;Ok(self))] + fn with_distributed_metrics_collection(mut self, enabled: bool) -> Result; } } } @@ -494,6 +514,11 @@ impl DistributedExt for SessionStateBuilder { #[call(set_distributed_cardinality_effect_task_scale_factor)] #[expr($?;Ok(self))] fn with_distributed_cardinality_effect_task_scale_factor(mut self, factor: f64) -> Result; + + fn set_distributed_metrics_collection(&mut self, enabled: bool) -> Result<(), DataFusionError>; + #[call(set_distributed_metrics_collection)] + #[expr($?;Ok(self))] + fn with_distributed_metrics_collection(mut self, enabled: bool) -> Result; } } } @@ -540,6 +565,11 @@ impl DistributedExt for SessionState { #[call(set_distributed_cardinality_effect_task_scale_factor)] #[expr($?;Ok(self))] fn with_distributed_cardinality_effect_task_scale_factor(mut self, factor: f64) -> Result; + + fn set_distributed_metrics_collection(&mut self, enabled: bool) -> Result<(), DataFusionError>; + #[call(set_distributed_metrics_collection)] + #[expr($?;Ok(self))] + fn with_distributed_metrics_collection(mut self, enabled: bool) -> Result; } } } @@ -586,6 +616,11 @@ impl DistributedExt for SessionContext { #[call(set_distributed_cardinality_effect_task_scale_factor)] #[expr($?;Ok(self))] fn with_distributed_cardinality_effect_task_scale_factor(self, factor: f64) -> Result; + + fn set_distributed_metrics_collection(&mut self, enabled: bool) -> Result<(), DataFusionError>; + #[call(set_distributed_metrics_collection)] + #[expr($?;Ok(self))] + fn with_distributed_metrics_collection(self, enabled: bool) -> Result; } } } diff --git a/src/distributed_planner/distributed_config.rs b/src/distributed_planner/distributed_config.rs index 22294ae..135b716 100644 --- a/src/distributed_planner/distributed_config.rs +++ b/src/distributed_planner/distributed_config.rs @@ -31,6 +31,9 @@ extensions_options! { /// batches over the wire. /// If set to 0, batch coalescing is disabled on network shuffle operations. pub shuffle_batch_size: usize, default = 8192 + /// Propagate collected metrics from all nodes in the plan across network boundaries + /// so that they can be reconstructed on the head node of the plan. + pub collect_metrics: bool, default = false /// Collection of [TaskEstimator]s that will be applied to leaf nodes in order to /// estimate how many tasks should be spawned for the [Stage] containing the leaf node. pub(crate) __private_task_estimator: CombinedTaskEstimator, default = CombinedTaskEstimator::default() diff --git a/src/distributed_planner/task_estimator.rs b/src/distributed_planner/task_estimator.rs index 36d3c4a..01c7e06 100644 --- a/src/distributed_planner/task_estimator.rs +++ b/src/distributed_planner/task_estimator.rs @@ -1,3 +1,4 @@ +use crate::config_extension_ext::set_distributed_option_extension; use crate::{ChannelResolver, DistributedConfig, PartitionIsolatorExec}; use datafusion::catalog::memory::DataSourceExec; use datafusion::config::ConfigOptions; @@ -103,10 +104,10 @@ pub(crate) fn set_distributed_task_estimator( } else { let mut estimators = CombinedTaskEstimator::default(); estimators.user_provided.push(Arc::new(estimator)); - opts.extensions.insert(DistributedConfig { + set_distributed_option_extension(cfg, DistributedConfig { __private_task_estimator: estimators, ..Default::default() - }); + }).expect("Calling set_distributed_option_extension with a default DistributedConfig should never fail"); } } diff --git a/src/execution_plans/common.rs b/src/execution_plans/common.rs index f085b3e..7238ca0 100644 --- a/src/execution_plans/common.rs +++ b/src/execution_plans/common.rs @@ -1,6 +1,8 @@ +use crate::DistributedConfig; use datafusion::common::{DataFusionError, plan_err}; use datafusion::physical_expr::Partitioning; use datafusion::physical_plan::{ExecutionPlan, PlanProperties}; +use http::HeaderMap; use std::borrow::Borrow; use std::sync::Arc; @@ -40,3 +42,16 @@ pub(super) fn scale_partitioning( Partitioning::UnknownPartitioning(p) => Partitioning::UnknownPartitioning(f(*p)), } } + +/// Manual propagation of the [DistributedConfig] fields relevant for execution. Can be removed +/// after https://github.com/datafusion-contrib/datafusion-distributed/issues/247 is fixed, as this will become automatic. +pub(super) fn manually_propagate_distributed_config( + mut headers: HeaderMap, + d_cfg: &DistributedConfig, +) -> HeaderMap { + headers.insert( + "distributed.collect_metrics", + d_cfg.collect_metrics.to_string().parse().unwrap(), + ); + headers +} diff --git a/src/execution_plans/network_coalesce.rs b/src/execution_plans/network_coalesce.rs index f6653f2..cbbe347 100644 --- a/src/execution_plans/network_coalesce.rs +++ b/src/execution_plans/network_coalesce.rs @@ -1,13 +1,15 @@ use crate::channel_resolver_ext::get_distributed_channel_resolver; use crate::config_extension_ext::ContextGrpcMetadata; use crate::distributed_planner::{InputStageInfo, NetworkBoundary, limit_tasks_err}; -use crate::execution_plans::common::{require_one_child, scale_partitioning_props}; +use crate::execution_plans::common::{ + manually_propagate_distributed_config, require_one_child, scale_partitioning_props, +}; use crate::flight_service::DoGet; use crate::metrics::MetricsCollectingStream; use crate::metrics::proto::MetricsSetProto; use crate::protobuf::{StageKey, map_flight_to_datafusion_error, map_status_to_datafusion_error}; use crate::stage::{MaybeEncodedPlan, Stage}; -use crate::{ChannelResolver, DistributedTaskContext}; +use crate::{ChannelResolver, DistributedConfig, DistributedTaskContext}; use arrow_flight::Ticket; use arrow_flight::decode::FlightRecordBatchStream; use arrow_flight::error::FlightError; @@ -18,7 +20,7 @@ use datafusion::error::DataFusionError; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; -use futures::{TryFutureExt, TryStreamExt}; +use futures::{StreamExt, TryFutureExt, TryStreamExt}; use http::Extensions; use prost::Message; use std::any::Any; @@ -258,6 +260,9 @@ impl ExecutionPlan for NetworkCoalesceExec { // get the channel manager and current stage from our context let channel_resolver = get_distributed_channel_resolver(context.session_config())?; + let d_cfg = DistributedConfig::from_config_options(context.session_config().options())?; + let retrieve_metrics = d_cfg.collect_metrics; + let input_stage = &self_ready.input_stage; let encoded_input_plan = input_stage.plan.encoded()?; @@ -273,8 +278,10 @@ impl ExecutionPlan for NetworkCoalesceExec { let target_task = partition / partitions_per_task; let target_partition = partition % partitions_per_task; + // TODO: this propagation should be automatic + let context_headers = manually_propagate_distributed_config(context_headers, d_cfg); let ticket = Request::from_parts( - MetadataMap::from_headers(context_headers.clone()), + MetadataMap::from_headers(context_headers), Extensions::default(), Ticket { ticket: DoGet { @@ -311,13 +318,14 @@ impl ExecutionPlan for NetworkCoalesceExec { .into_inner() .map_err(|err| FlightError::Tonic(Box::new(err))); - let metrics_collecting_stream = - MetricsCollectingStream::new(stream, metrics_collection_capture); + let stream = if retrieve_metrics { + MetricsCollectingStream::new(stream, metrics_collection_capture).left_stream() + } else { + stream.right_stream() + }; - Ok( - FlightRecordBatchStream::new_from_flight_data(metrics_collecting_stream) - .map_err(map_flight_to_datafusion_error), - ) + Ok(FlightRecordBatchStream::new_from_flight_data(stream) + .map_err(map_flight_to_datafusion_error)) } .try_flatten_stream(); diff --git a/src/execution_plans/network_shuffle.rs b/src/execution_plans/network_shuffle.rs index fe0c34a..a2aea64 100644 --- a/src/execution_plans/network_shuffle.rs +++ b/src/execution_plans/network_shuffle.rs @@ -1,13 +1,17 @@ use crate::channel_resolver_ext::get_distributed_channel_resolver; use crate::config_extension_ext::ContextGrpcMetadata; -use crate::execution_plans::common::{require_one_child, scale_partitioning}; +use crate::execution_plans::common::{ + manually_propagate_distributed_config, require_one_child, scale_partitioning, +}; use crate::flight_service::DoGet; use crate::metrics::MetricsCollectingStream; use crate::metrics::proto::MetricsSetProto; use crate::protobuf::StageKey; use crate::protobuf::{map_flight_to_datafusion_error, map_status_to_datafusion_error}; use crate::stage::{MaybeEncodedPlan, Stage}; -use crate::{ChannelResolver, DistributedTaskContext, InputStageInfo, NetworkBoundary}; +use crate::{ + ChannelResolver, DistributedConfig, DistributedTaskContext, InputStageInfo, NetworkBoundary, +}; use arrow_flight::Ticket; use arrow_flight::decode::FlightRecordBatchStream; use arrow_flight::error::FlightError; @@ -325,6 +329,9 @@ impl ExecutionPlan for NetworkShuffleExec { // get the channel manager and current stage from our context let channel_resolver = get_distributed_channel_resolver(context.session_config())?; + let d_cfg = DistributedConfig::from_config_options(context.session_config().options())?; + let retrieve_metrics = d_cfg.collect_metrics; + let input_stage = &self_ready.input_stage; let encoded_input_plan = input_stage.plan.encoded()?; @@ -337,6 +344,8 @@ impl ExecutionPlan for NetworkShuffleExec { let task_context = DistributedTaskContext::from_ctx(&context); let off = self_ready.properties.partitioning.partition_count() * task_context.task_index; + // TODO: this propagation should be automatic + let context_headers = manually_propagate_distributed_config(context_headers, d_cfg); let stream = input_stage_tasks.into_iter().enumerate().map(|(i, task)| { let channel_resolver = Arc::clone(&channel_resolver); @@ -370,13 +379,14 @@ impl ExecutionPlan for NetworkShuffleExec { .into_inner() .map_err(|err| FlightError::Tonic(Box::new(err))); - let metrics_collecting_stream = - MetricsCollectingStream::new(stream, metrics_collection_capture); + let stream = if retrieve_metrics { + MetricsCollectingStream::new(stream, metrics_collection_capture).left_stream() + } else { + stream.right_stream() + }; - Ok( - FlightRecordBatchStream::new_from_flight_data(metrics_collecting_stream) - .map_err(map_flight_to_datafusion_error), - ) + Ok(FlightRecordBatchStream::new_from_flight_data(stream) + .map_err(map_flight_to_datafusion_error)) } .try_flatten_stream() .boxed() diff --git a/src/flight_service/do_get.rs b/src/flight_service/do_get.rs index 6e03324..f030f38 100644 --- a/src/flight_service/do_get.rs +++ b/src/flight_service/do_get.rs @@ -1,6 +1,7 @@ -use crate::DistributedTaskContext; use crate::common::map_last_stream; -use crate::config_extension_ext::ContextGrpcMetadata; +use crate::config_extension_ext::{ + ContextGrpcMetadata, set_distributed_option_extension_from_headers, +}; use crate::flight_service::service::ArrowFlightEndpoint; use crate::flight_service::session_builder::DistributedSessionBuilderContext; use crate::metrics::TaskMetricsCollector; @@ -9,6 +10,7 @@ use crate::protobuf::{ AppMetadata, DistributedCodec, FlightAppMetadata, MetricsCollection, StageKey, TaskMetrics, datafusion_error_to_tonic_status, }; +use crate::{DistributedConfig, DistributedTaskContext}; use arrow_flight::FlightData; use arrow_flight::Ticket; use arrow_flight::encode::{DictionaryHandling, FlightDataEncoderBuilder}; @@ -74,11 +76,12 @@ impl ArrowFlightEndpoint { Status::invalid_argument(format!("Cannot decode DoGet message: {err}")) })?; + let headers = metadata.into_headers(); let mut session_state = self .session_builder .build_session_state(DistributedSessionBuilderContext { runtime_env: Arc::clone(&self.runtime), - headers: metadata.clone().into_headers(), + headers: headers.clone(), }) .await .map_err(|err| datafusion_error_to_tonic_status(&err))?; @@ -114,7 +117,11 @@ impl ArrowFlightEndpoint { // Find out which partition group we are executing let cfg = session_state.config_mut(); - cfg.set_extension(Arc::new(ContextGrpcMetadata(metadata.into_headers()))); + let d_cfg = + set_distributed_option_extension_from_headers::(cfg, &headers) + .map_err(|err| datafusion_error_to_tonic_status(&err))?; + let send_metrics = d_cfg.collect_metrics; + cfg.set_extension(Arc::new(ContextGrpcMetadata(headers))); cfg.set_extension(Arc::new(DistributedTaskContext { task_index: doget.target_task_index as usize, task_count: doget.target_task_count as usize, @@ -171,7 +178,11 @@ impl ArrowFlightEndpoint { let stream = map_last_stream(stream, move |last| { if num_partitions_remaining.fetch_sub(1, Ordering::SeqCst) == 1 { task_data_entries.remove(key.clone()); - return last.and_then(|el| collect_and_create_metrics_flight_data(key, plan, el)); + return if send_metrics { + last.and_then(|el| collect_and_create_metrics_flight_data(key, plan, el)) + } else { + last + }; } last }); @@ -345,7 +356,9 @@ mod tests { for (task_number, task_key) in task_keys.iter().enumerate() { for partition in 0..num_partitions_per_task - 1 { let result = do_get(partition as u64, task_number as u64, task_key.clone()).await; - assert!(result.is_ok()); + if let Err(err) = result { + panic!("do_get call failed with error: {err}") + } } } // As many plans as tasks should have been received. diff --git a/src/metrics/task_metrics_collector.rs b/src/metrics/task_metrics_collector.rs index 5d8665f..0642a7b 100644 --- a/src/metrics/task_metrics_collector.rs +++ b/src/metrics/task_metrics_collector.rs @@ -154,6 +154,8 @@ mod tests { .with_distributed_channel_resolver(InMemoryChannelResolver::new(10)) .with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule)) .with_distributed_task_estimator(2) + .with_distributed_metrics_collection(true) + .unwrap() .build(); let ctx = SessionContext::from(state); diff --git a/src/metrics/task_metrics_rewriter.rs b/src/metrics/task_metrics_rewriter.rs index fa8c963..4c7d359 100644 --- a/src/metrics/task_metrics_rewriter.rs +++ b/src/metrics/task_metrics_rewriter.rs @@ -241,6 +241,8 @@ mod tests { if distributed { builder = builder .with_distributed_channel_resolver(InMemoryChannelResolver::new(10)) + .with_distributed_metrics_collection(true) + .unwrap() .with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule)) .with_distributed_task_estimator(2) } diff --git a/src/protobuf/errors/mod.rs b/src/protobuf/errors/mod.rs index 3466fe9..20bf80c 100644 --- a/src/protobuf/errors/mod.rs +++ b/src/protobuf/errors/mod.rs @@ -1,5 +1,7 @@ #![allow(clippy::upper_case_acronyms, clippy::vec_box)] +use std::borrow::Borrow; + use arrow_flight::error::FlightError; use datafusion::common::internal_datafusion_err; use datafusion::error::DataFusionError; diff --git a/tests/tpch_explain_analyze.rs b/tests/tpch_explain_analyze.rs index 85aa41f..f298156 100644 --- a/tests/tpch_explain_analyze.rs +++ b/tests/tpch_explain_analyze.rs @@ -4,7 +4,9 @@ mod tests { use datafusion::prelude::SessionContext; use datafusion_distributed::test_utils::localhost::start_localhost_context; use datafusion_distributed::test_utils::tpch; - use datafusion_distributed::{DefaultSessionBuilder, assert_snapshot, explain_analyze}; + use datafusion_distributed::{ + DefaultSessionBuilder, DistributedExt, assert_snapshot, explain_analyze, + }; use futures::TryStreamExt; use std::error::Error; use std::fs; @@ -1158,7 +1160,8 @@ mod tests { } async fn test_tpch_query(query_id: u8) -> Result> { - let (ctx, _guard) = start_localhost_context(4, DefaultSessionBuilder).await; + let (mut ctx, _guard) = start_localhost_context(4, DefaultSessionBuilder).await; + ctx.set_distributed_metrics_collection(true)?; run_tpch_query(ctx, query_id).await }