Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,12 @@ datafusion = { workspace = true, features = [
"sql",
"unicode_expressions",
] }
datafusion-common = { workspace = true }
datafusion-common-runtime = { workspace = true }
dirs = "6.0.0"
env_logger = { workspace = true }
futures = { workspace = true }
is-terminal = "0.4"
log = { workspace = true }
mimalloc = { version = "0.1", default-features = false }
object_store = { workspace = true, features = ["aws", "gcp", "http"] }
Expand Down
1 change: 1 addition & 0 deletions datafusion-cli/examples/cli-session-context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ pub async fn main() {
quiet: false,
maxrows: datafusion_cli::print_options::MaxRows::Unlimited,
color: true,
progress: datafusion_cli::progress::ProgressConfig::default(),
instrumented_registry: Arc::new(InstrumentedObjectStoreRegistry::new()),
};

Expand Down
2 changes: 2 additions & 0 deletions datafusion-cli/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ mod tests {
InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry,
},
print_options::MaxRows,
progress::ProgressConfig,
};

use super::*;
Expand All @@ -300,6 +301,7 @@ mod tests {
quiet: false,
maxrows: MaxRows::Unlimited,
color: true,
progress: ProgressConfig::default(),
instrumented_registry: Arc::new(InstrumentedObjectStoreRegistry::new()),
};

Expand Down
27 changes: 25 additions & 2 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::{
helper::CliHelper,
object_storage::get_object_store,
print_options::{MaxRows, PrintOptions},
progress::ProgressReporter,
};
use datafusion::common::instant::Instant;
use datafusion::common::{plan_datafusion_err, plan_err};
Expand Down Expand Up @@ -265,6 +266,16 @@ impl StatementExecutor {
let task_ctx = ctx.task_ctx();
let options = task_ctx.session_config().options();

// Start progress reporter if enabled
let progress_reporter = if print_options.progress.should_show_progress() {
Some(
ProgressReporter::start(&physical_plan, print_options.progress.clone())
.await?,
)
} else {
None
};

// Track memory usage for the query result if it's bounded
let mut reservation =
MemoryConsumer::new("DataFusion-Cli").register(task_ctx.memory_pool());
Expand All @@ -279,9 +290,16 @@ impl StatementExecutor {
// As the input stream comes, we can generate results.
// However, memory safety is not guaranteed.
let stream = execute_stream(physical_plan, task_ctx.clone())?;
print_options
let result = print_options
.print_stream(stream, now, &options.format)
.await?;
.await;

// Stop progress reporter before returning
if let Some(reporter) = &progress_reporter {
reporter.stop().await;
}

result?;
} else {
// Bounded stream; collected results size is limited by the maxrows option
let schema = physical_plan.schema();
Expand Down Expand Up @@ -314,6 +332,11 @@ impl StatementExecutor {
reservation.free();
}

// Stop progress reporter
if let Some(reporter) = progress_reporter {
reporter.stop().await;
}

Ok(())
}

Expand Down
1 change: 1 addition & 0 deletions datafusion-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ pub mod object_storage;
pub mod pool_type;
pub mod print_format;
pub mod print_options;
pub mod progress;
25 changes: 25 additions & 0 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use datafusion_cli::{
pool_type::PoolType,
print_format::PrintFormat,
print_options::{MaxRows, PrintOptions},
progress::{ProgressConfig, ProgressEstimator, ProgressMode, ProgressStyle},
DATAFUSION_CLI_VERSION,
};

Expand Down Expand Up @@ -149,6 +150,22 @@ struct Args {
)]
disk_limit: Option<usize>,

#[clap(long, value_enum, default_value_t = ProgressMode::Auto, help = "Progress bar mode")]
progress: ProgressMode,

#[clap(long, value_enum, default_value_t = ProgressStyle::Bar, help = "Progress bar style")]
progress_style: ProgressStyle,

#[clap(
long,
default_value = "200",
help = "Progress update interval in milliseconds"
)]
progress_interval: u64,

#[clap(long, value_enum, default_value_t = ProgressEstimator::Alpha, help = "ETA estimation algorithm")]
progress_estimator: ProgressEstimator,

#[clap(
long,
help = "Specify the default object_store_profiling mode, defaults to 'disabled'.\n[possible values: disabled, enabled]",
Expand Down Expand Up @@ -244,11 +261,19 @@ async fn main_inner() -> Result<()> {
)),
);

let progress_config = ProgressConfig {
mode: args.progress,
style: args.progress_style,
interval_ms: args.progress_interval,
estimator: args.progress_estimator,
};

let mut print_options = PrintOptions {
format: args.format,
quiet: args.quiet,
maxrows: args.maxrows,
color: args.color,
progress: progress_config,
instrumented_registry: Arc::clone(&instrumented_registry),
};

Expand Down
3 changes: 3 additions & 0 deletions datafusion-cli/src/print_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::object_storage::instrumented::{
InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry,
};
use crate::print_format::PrintFormat;
use crate::progress::ProgressConfig;

use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
Expand Down Expand Up @@ -79,6 +80,7 @@ pub struct PrintOptions {
pub quiet: bool,
pub maxrows: MaxRows,
pub color: bool,
pub progress: ProgressConfig,
pub instrumented_registry: Arc<InstrumentedObjectStoreRegistry>,
}

Expand Down Expand Up @@ -225,6 +227,7 @@ mod tests {
quiet: true,
maxrows: MaxRows::Unlimited,
color: true,
progress: ProgressConfig::default(),
instrumented_registry: Arc::clone(&instrumented_registry),
};

Expand Down
105 changes: 105 additions & 0 deletions datafusion-cli/src/progress/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Configuration for progress reporting

use clap::ValueEnum;
use is_terminal::IsTerminal;
use std::io;

/// Configuration for progress reporting
#[derive(Debug, Clone)]
pub struct ProgressConfig {
pub mode: ProgressMode,
pub style: ProgressStyle,
pub interval_ms: u64,
pub estimator: ProgressEstimator,
}

impl ProgressConfig {
/// Create a new progress configuration with default values
pub fn new() -> Self {
Self::default()
}

/// Check whether progress should be shown based on the configuration and environment
pub fn should_show_progress(&self) -> bool {
match self.mode {
ProgressMode::On => true,
ProgressMode::Off => false,
ProgressMode::Auto => io::stdout().is_terminal(),
}
}
}

impl Default for ProgressConfig {
fn default() -> Self {
Self {
mode: ProgressMode::Auto,
style: ProgressStyle::Bar,
interval_ms: 200,
estimator: ProgressEstimator::Alpha,
}
}
}

/// Progress bar display mode
#[derive(Debug, Clone, Copy, ValueEnum, PartialEq, Eq)]
pub enum ProgressMode {
/// Show progress bar on TTY, off otherwise
Auto,
/// Always show progress bar
On,
/// Never show progress bar
Off,
}

impl Default for ProgressMode {
fn default() -> Self {
Self::Auto
}
}

/// Progress bar visual style
#[derive(Debug, Clone, Copy, ValueEnum, PartialEq, Eq)]
pub enum ProgressStyle {
/// Show a progress bar when percent is known
Bar,
/// Show a spinner with counters
Spinner,
}

impl Default for ProgressStyle {
fn default() -> Self {
Self::Bar
}
}

/// ETA estimation algorithm
#[derive(Debug, Clone, Copy, ValueEnum, PartialEq, Eq)]
pub enum ProgressEstimator {
/// Simple linear estimation
Linear,
/// Alpha filter (exponential moving average) smoothed estimation
Alpha,
}

impl Default for ProgressEstimator {
fn default() -> Self {
Self::Alpha
}
}
Loading