Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/arkflow-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ clap = { workspace = true }
colored = { workspace = true }
flume = { workspace = true }
axum = { workspace = true }
reqwest = { workspace = true, features = ["json"] }
num_cpus = "1.17.0"
123 changes: 98 additions & 25 deletions crates/arkflow-core/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,24 @@

use crate::config::{EngineConfig, LogFormat};
use crate::engine::Engine;
use crate::remote_config::RemoteConfigManager;
use clap::{Arg, Command};
use std::process;
use tokio::signal::unix::{signal, SignalKind};
use tokio_util::sync::CancellationToken;
use tracing::{info, Level};
use tracing_subscriber::fmt;

pub struct Cli {
pub config: Option<EngineConfig>,
pub remote_config_manager: Option<RemoteConfigManager>,
}
impl Default for Cli {
fn default() -> Self {
Self { config: None }
Self {
config: None,
remote_config_manager: None,
}
}
}

Expand All @@ -33,54 +40,120 @@ impl Cli {
let matches = Command::new("arkflow")
.version("0.4.0-rc1")
.author("chenquan")
.about("High-performance Rust stream processing engine, providing powerful data stream processing capabilities, supporting multiple input/output sources and processors.")
.about("High-performance Rust stream processing engine, providing powerful data stream processing capabilities, supporting multiple input/output sources and processors")
.arg(
Arg::new("config")
.short('c')
.long("config")
.value_name("FILE")
.help("Specify the profile path.")
.required(true),
.help("Specify the profile path")
)
.arg(
Arg::new("validate")
.short('v')
.long("validate")
.help("Only the profile is verified, not the engine is started.")
.help("Only the profile is verified, not the engine is started")
.action(clap::ArgAction::SetTrue),
)
.subcommand(
Command::new("remote").about("Use remote configuration for automatic stream management")
.arg(
Arg::new("url")
.long("url")
.value_name("URL")
.help("Remote configuration API endpoint URL for automatic stream management")
.required( true)
)
.arg(
Arg::new("interval")
.long("interval")
.value_name("SECONDS")
.help("Interval in seconds for polling remote configuration")
.default_value("30"),
)
.arg(
Arg::new("token")
.long("token")
.value_name("TOKEN")
.help("Authentication token for remote configuration API")
.required( true),
))
.get_matches();

// Get the profile path
let config_path = matches.get_one::<String>("config").unwrap();
// Check if using remote configuration
if let Some(remote) = matches.subcommand_matches("remote") {
// Initialize remote configuration manager
let interval = remote
.get_one::<String>("interval")
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(30);
let token = remote.get_one::<String>("token").cloned();
let remote_url = remote
.get_one::<String>("url")
.expect("Remote configuration URL not found");

// Get the profile path
let config = match EngineConfig::from_file(config_path) {
Ok(config) => config,
Err(e) => {
println!("Failed to load configuration file: {}", e);
process::exit(1);
let remote_manager = RemoteConfigManager::new(remote_url.clone(), interval, token);

self.remote_config_manager.replace(remote_manager);
info!("Using remote configuration from: {}", remote_url);
} else {
// Use local configuration file
let config_path = matches
.get_one::<String>("config")
.ok_or("Configuration not found")?;

Comment on lines +101 to +104
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Compilation bug: using ok_or("...")? with &str won't compile

&str doesn’t implement std::error::Error, so ? cannot convert it into Box<dyn std::error::Error>. Use an error type that implements Error (e.g., std::io::Error) or handle it explicitly.

Apply this diff:

-            let config_path = matches
-                .get_one::<String>("config")
-                .ok_or("Configuration not found")?;
+            let config_path = matches
+                .get_one::<String>("config")
+                .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidInput, "Configuration not found"))?;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let config_path = matches
.get_one::<String>("config")
.ok_or("Configuration not found")?;
let config_path = matches
.get_one::<String>("config")
.ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidInput, "Configuration not found"))?;
🤖 Prompt for AI Agents
crates/arkflow-core/src/cli/mod.rs around lines 101 to 104: the use of
.ok_or("Configuration not found")? fails because &str does not implement
std::error::Error and cannot be converted by ?. Replace the call so it produces
a concrete error type that implements Error (for example use .ok_or_else(||
std::io::Error::new(std::io::ErrorKind::Other, "Configuration not found"))?), or
if the project uses anyhow, use .ok_or_else(|| anyhow::anyhow!("Configuration
not found"))?; ensure the resulting error type matches the function return type
(or convert/wrap it into Box<dyn std::error::Error> as needed).

let config = match EngineConfig::from_file(config_path) {
Ok(config) => config,
Err(e) => {
println!("Failed to load configuration file: {}", e);
process::exit(1);
}
};

// If you just verify the configuration, exit it
if matches.get_flag("validate") {
info!("The config is validated.");
return Ok(());
}
};

// If you just verify the configuration, exit it
if matches.get_flag("validate") {
info!("The config is validated.");
return Ok(());
self.config = Some(config);
}
self.config = Some(config);
Ok(())
}
pub async fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
// Initialize the logging system
let config = self.config.clone().unwrap();
init_logging(&config);
let engine = Engine::new(config);
engine.run().await?;
let token = CancellationToken::new();

if let Some(remote_manager) = &self.remote_config_manager {
// Run with remote configuration management
remote_manager.run(token.clone()).await?;
} else {
// Run with local configuration
let config = self.config.clone().unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid unwrap() on potentially None config.

While the logic ensures config is Some when remote_config_manager is None, using unwrap() here is fragile and could panic if the logic changes.

Apply this diff to handle the None case explicitly:

-            let config = self.config.clone().unwrap();
+            let config = self.config.clone().ok_or("Configuration not found")?;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let config = self.config.clone().unwrap();
let config = self.config.clone().ok_or("Configuration not found")?;
🤖 Prompt for AI Agents
In crates/arkflow-core/src/cli/mod.rs at line 121, avoid using unwrap() on
self.config as it may panic if None. Instead, explicitly handle the None case by
matching on self.config or using if let to safely extract the config value,
returning an error or handling the absence gracefully to prevent potential
runtime panics.

init_logging(&config);
let engine = Engine::new(config);
engine.run(token.clone()).await?;
}

// Set up signal handlers
let mut sigint = signal(SignalKind::interrupt()).expect("Failed to set signal handler");
let mut sigterm = signal(SignalKind::terminate()).expect("Failed to set signal handler");

tokio::spawn(async move {
tokio::select! {
_ = sigint.recv() => {
info!("Received SIGINT, exiting...");

},
_ = sigterm.recv() => {
info!("Received SIGTERM, exiting...");
}
}
token.cancel();
});
Comment on lines +137 to +152
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Signal handlers are set up after the run completes (unreachable), breaking graceful shutdown

You await the engine/remote manager before spawning signal listeners. As a result, the token is never cancelled on SIGINT/SIGTERM and the process won’t shut down gracefully.

Move the signal setup before awaiting run(...):

 pub async fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
-        let token = CancellationToken::new();
+        let token = CancellationToken::new();
+
+        // Set up signal handlers BEFORE running the engine/remote manager
+        let token_for_signals = token.clone();
+        #[cfg(unix)]
+        tokio::spawn(async move {
+            let mut sigint = signal(SignalKind::interrupt()).expect("Failed to set signal handler");
+            let mut sigterm = signal(SignalKind::terminate()).expect("Failed to set signal handler");
+            tokio::select! {
+                _ = sigint.recv() => {
+                    info!("Received SIGINT, exiting...");
+                },
+                _ = sigterm.recv() => {
+                    info!("Received SIGTERM, exiting...");
+                }
+            }
+            token_for_signals.cancel();
+        });
+
+        #[cfg(not(unix))]
+        tokio::spawn(async move {
+            if tokio::signal::ctrl_c().await.is_ok() {
+                info!("Received Ctrl+C, exiting...");
+            }
+            token_for_signals.cancel();
+        });
 
-        if let Some(remote_manager) = &self.remote_config_manager {
+        if let Some(remote_manager) = &self.remote_config_manager {
             // Run with remote configuration management
             remote_manager.run(token.clone()).await?;
         } else {
             // Run with local configuration
             let config = self.config.clone().unwrap();
             init_logging(&config);
             let engine = Engine::new(config);
             engine.run(token.clone()).await?;
         }
 
-        // Set up signal handlers
-        let mut sigint = signal(SignalKind::interrupt()).expect("Failed to set signal handler");
-        let mut sigterm = signal(SignalKind::terminate()).expect("Failed to set signal handler");
-
-        tokio::spawn(async move {
-            tokio::select! {
-                _ = sigint.recv() => {
-                    info!("Received SIGINT, exiting...");
-
-                },
-                _ = sigterm.recv() => {
-                    info!("Received SIGTERM, exiting...");
-                }
-            }
-            token.cancel();
-        });
         Ok(())
     }
🤖 Prompt for AI Agents
In crates/arkflow-core/src/cli/mod.rs around lines 137 to 152, the signal
handler setup currently runs after awaiting run(...), so SIGINT/SIGTERM can't
cancel the token; move the signal creation and tokio::spawn block to before the
call to run(...) (so the signal streams and spawned task live while run
executes), ensure any cancellation token used inside the spawned task is
cloned/moved into the async block correctly, and then await run(...) — this
guarantees signals are observed and token.cancel() is called on SIGINT/SIGTERM
for graceful shutdown.

Ok(())
}
}
fn init_logging(config: &EngineConfig) -> () {
pub(crate) fn init_logging(config: &EngineConfig) -> () {
let log_level = match config.logging.level.as_str() {
"trace" => Level::TRACE,
"debug" => Level::DEBUG,
Expand Down
22 changes: 1 addition & 21 deletions crates/arkflow-core/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use crate::config::EngineConfig;
use std::process;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::signal::unix::{signal, SignalKind};
use tokio_util::sync::CancellationToken;
use tracing::{error, info};

Expand Down Expand Up @@ -217,9 +216,7 @@ impl Engine {
/// 5. Waits for all streams to complete
///
/// Returns an error if any part of the initialization or execution fails
pub async fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
let token = CancellationToken::new();

pub async fn run(&self, token: CancellationToken) -> Result<(), Box<dyn std::error::Error>> {
// Start the health check server
self.start_health_check_server(token.clone()).await?;

Expand All @@ -243,23 +240,6 @@ impl Engine {

// Set the readiness status
self.health_state.is_ready.store(true, Ordering::SeqCst);
// Set up signal handlers
let mut sigint = signal(SignalKind::interrupt()).expect("Failed to set signal handler");
let mut sigterm = signal(SignalKind::terminate()).expect("Failed to set signal handler");
let token_clone = token.clone();
tokio::spawn(async move {
tokio::select! {
_ = sigint.recv() => {
info!("Received SIGINT, exiting...");

},
_ = sigterm.recv() => {
info!("Received SIGTERM, exiting...");
}
}

token_clone.cancel();
});

for (i, mut stream) in streams.into_iter().enumerate() {
info!("Starting flow #{}", i + 1);
Expand Down
1 change: 1 addition & 0 deletions crates/arkflow-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub mod input;
pub mod output;
pub mod pipeline;
pub mod processor;
pub mod remote_config;
pub mod stream;
pub mod temporary;

Expand Down
Loading
Loading