Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,12 @@ datafusion = { workspace = true, features = [
"sql",
"unicode_expressions",
] }
datafusion-common = { workspace = true }
datafusion-common-runtime = { workspace = true }
dirs = "6.0.0"
env_logger = { workspace = true }
futures = { workspace = true }
is-terminal = "0.4"
log = { workspace = true }
mimalloc = { version = "0.1", default-features = false }
object_store = { workspace = true, features = ["aws", "gcp", "http"] }
Expand Down
27 changes: 25 additions & 2 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::{
helper::CliHelper,
object_storage::get_object_store,
print_options::{MaxRows, PrintOptions},
progress::ProgressReporter,
};
use datafusion::common::instant::Instant;
use datafusion::common::{plan_datafusion_err, plan_err};
Expand Down Expand Up @@ -265,6 +266,16 @@ impl StatementExecutor {
let task_ctx = ctx.task_ctx();
let options = task_ctx.session_config().options();

// Start progress reporter if enabled
let progress_reporter = if print_options.progress.should_show_progress() {
Some(
ProgressReporter::start(&physical_plan, print_options.progress.clone())
.await?,
)
} else {
None
};

// Track memory usage for the query result if it's bounded
let mut reservation =
MemoryConsumer::new("DataFusion-Cli").register(task_ctx.memory_pool());
Expand All @@ -279,9 +290,16 @@ impl StatementExecutor {
// As the input stream comes, we can generate results.
// However, memory safety is not guaranteed.
let stream = execute_stream(physical_plan, task_ctx.clone())?;
print_options
let result = print_options
.print_stream(stream, now, &options.format)
.await?;
.await;

// Stop progress reporter before returning
if let Some(reporter) = &progress_reporter {
reporter.stop().await;
}

result?;
} else {
// Bounded stream; collected results size is limited by the maxrows option
let schema = physical_plan.schema();
Expand Down Expand Up @@ -314,6 +332,11 @@ impl StatementExecutor {
reservation.free();
}

// Stop progress reporter
if let Some(reporter) = progress_reporter {
reporter.stop().await;
}

Ok(())
}

Expand Down
1 change: 1 addition & 0 deletions datafusion-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ pub mod object_storage;
pub mod pool_type;
pub mod print_format;
pub mod print_options;
pub mod progress;
25 changes: 25 additions & 0 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use datafusion_cli::{
pool_type::PoolType,
print_format::PrintFormat,
print_options::{MaxRows, PrintOptions},
progress::{ProgressConfig, ProgressEstimator, ProgressMode, ProgressStyle},
DATAFUSION_CLI_VERSION,
};

Expand Down Expand Up @@ -145,6 +146,22 @@ struct Args {
value_parser(extract_disk_limit)
)]
disk_limit: Option<usize>,

#[clap(long, value_enum, default_value_t = ProgressMode::Auto, help = "Progress bar mode")]
progress: ProgressMode,

#[clap(long, value_enum, default_value_t = ProgressStyle::Bar, help = "Progress bar style")]
progress_style: ProgressStyle,

#[clap(
long,
default_value = "200",
help = "Progress update interval in milliseconds"
)]
progress_interval: u64,

#[clap(long, value_enum, default_value_t = ProgressEstimator::Kalman, help = "ETA estimation algorithm")]
progress_estimator: ProgressEstimator,
}

#[tokio::main]
Expand Down Expand Up @@ -228,11 +245,19 @@ async fn main_inner() -> Result<()> {
)),
);

let progress_config = ProgressConfig {
mode: args.progress,
style: args.progress_style,
interval_ms: args.progress_interval,
estimator: args.progress_estimator,
};

let mut print_options = PrintOptions {
format: args.format,
quiet: args.quiet,
maxrows: args.maxrows,
color: args.color,
progress: progress_config,
};

let commands = args.command;
Expand Down
2 changes: 2 additions & 0 deletions datafusion-cli/src/print_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::pin::Pin;
use std::str::FromStr;

use crate::print_format::PrintFormat;
use crate::progress::ProgressConfig;

use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
Expand Down Expand Up @@ -73,6 +74,7 @@ pub struct PrintOptions {
pub quiet: bool,
pub maxrows: MaxRows,
pub color: bool,
pub progress: ProgressConfig,
}

// Returns the query execution details formatted
Expand Down
105 changes: 105 additions & 0 deletions datafusion-cli/src/progress/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Configuration for progress reporting

use clap::ValueEnum;
use is_terminal::IsTerminal;
use std::io;

/// Configuration for progress reporting
#[derive(Debug, Clone)]
pub struct ProgressConfig {
pub mode: ProgressMode,
pub style: ProgressStyle,
pub interval_ms: u64,
pub estimator: ProgressEstimator,
}

impl ProgressConfig {
/// Create a new progress configuration with default values
pub fn new() -> Self {
Self::default()
}

/// Check whether progress should be shown based on the configuration and environment
pub fn should_show_progress(&self) -> bool {
match self.mode {
ProgressMode::On => true,
ProgressMode::Off => false,
ProgressMode::Auto => io::stdout().is_terminal(),
}
}
}

impl Default for ProgressConfig {
fn default() -> Self {
Self {
mode: ProgressMode::Auto,
style: ProgressStyle::Bar,
interval_ms: 200,
estimator: ProgressEstimator::Kalman,
}
}
}

/// Progress bar display mode
#[derive(Debug, Clone, Copy, ValueEnum, PartialEq, Eq)]
pub enum ProgressMode {
/// Show progress bar on TTY, off otherwise
Auto,
/// Always show progress bar
On,
/// Never show progress bar
Off,
}

impl Default for ProgressMode {
fn default() -> Self {
Self::Auto
}
}

/// Progress bar visual style
#[derive(Debug, Clone, Copy, ValueEnum, PartialEq, Eq)]
pub enum ProgressStyle {
/// Show a progress bar when percent is known
Bar,
/// Show a spinner with counters
Spinner,
}

impl Default for ProgressStyle {
fn default() -> Self {
Self::Bar
}
}

/// ETA estimation algorithm
#[derive(Debug, Clone, Copy, ValueEnum, PartialEq, Eq)]
pub enum ProgressEstimator {
/// Simple linear estimation
Linear,
/// Kalman filter smoothed estimation
Kalman,
}

impl Default for ProgressEstimator {
fn default() -> Self {
Self::Kalman
}
}
Loading
Loading