Skip to content

Conversation

@chenquan
Copy link
Collaborator

@chenquan chenquan commented Jul 9, 2025

Summary by CodeRabbit

  • New Features
    • Remote configuration via a new CLI subcommand to fetch and manage stream pipelines from a remote server.
    • Configurable polling interval and optional authentication for remote config; automatic start/stop/restart of streams on remote changes.
  • Improvements
    • Local config is optional when using remote; --validate can check local config without starting the engine.
    • Graceful cancellation and signal-aware shutdown for both remote and local runs.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jul 9, 2025

Walkthrough

Adds a remote configuration subsystem and CLI support for a "remote" subcommand, introduces a RemoteConfigManager module that polls and manages stream lifecycles, updates the engine to accept an external CancellationToken, and adds the reqwest dependency with JSON support.

Changes

Cohort / File(s) Change Summary
Crate manifest
crates/arkflow-core/Cargo.toml
Added reqwest dependency with the "json" feature, using workspace version.
CLI
crates/arkflow-core/src/cli/mod.rs
Added remote subcommand and args; Cli gains pub remote_config_manager: Option<RemoteConfigManager>; parse logic initializes manager when used; --config is optional if remote used; run became async and cancellation-aware, creating a CancellationToken and wiring signal handlers to cancel it; init_logging visibility changed to pub(crate).
Library root
crates/arkflow-core/src/lib.rs
Declared new public module: pub mod remote_config;.
Remote config module
crates/arkflow-core/src/remote_config.rs
New module providing RemoteConfigManager, request/response types (RemoteConfigResponse, StreamInfo, StreamStatus), polling loop, version-aware stream start/stop/restart, per-stream runtimes with cancellation tokens, Reqwest client usage, and logging/error handling.
Engine
crates/arkflow-core/src/engine/mod.rs
Engine::run signature changed to accept token: CancellationToken; removed internal signal handling and creation of its own cancellation token (caller now provides cancellation).

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant CLI
    participant RemoteConfigManager
    participant Engine

    User->>CLI: Run with "remote" subcommand
    CLI->>RemoteConfigManager: new(url, interval, token)
    CLI->>RemoteConfigManager: run(token)
    loop every poll interval
        RemoteConfigManager->>RemoteConfigManager: fetch remote config
        alt version changed
            RemoteConfigManager->>RemoteConfigManager: start/stop/restart streams
        end
    end
    User->>CLI: SIGINT/SIGTERM
    CLI->>RemoteConfigManager: cancel token
    RemoteConfigManager->>RemoteConfigManager: graceful shutdown
Loading
sequenceDiagram
    participant User
    participant CLI
    participant Engine

    User->>CLI: Run with local config
    CLI->>Engine: construct(...)
    CLI->>Engine: run(token)
    User->>CLI: SIGINT/SIGTERM
    CLI->>Engine: cancel token
    Engine->>Engine: graceful shutdown
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Poem

I am a rabbit, code in paw,
Polling skies for config law.
Streams awake or take a nap,
Tokens hold the tidy map.
Hopping logs and restful hum—🐇✨

Tip

🔌 Remote MCP (Model Context Protocol) integration is now available!

Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats.

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/remote-manage

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Nitpick comments (4)
docs/remote-config.md (1)

36-40: Add language specifiers to fenced code blocks.

The static analysis correctly identifies that fenced code blocks should have language specifiers for better syntax highlighting.

Apply these diffs to add language specifiers:

For line 36:

-```
+```http
 GET /pipelines
 Authorization: Bearer <token>  # 如果提供了token
 Content-Type: application/json

For line 176:
```diff
-```
+```log
 2024-01-15T10:30:00Z INFO Starting remote configuration manager
 2024-01-15T10:30:00Z INFO Polling interval: 30 seconds
 2024-01-15T10:30:00Z INFO API endpoint: http://api.example.com/pipelines
 2024-01-15T10:30:30Z INFO Configuration changed, updating pipelines (version: v1.0.1)
 2024-01-15T10:30:30Z INFO Starting new pipeline 'Data Processing Pipeline'
 2024-01-15T10:30:30Z INFO Pipeline 'Data Processing Pipeline' started successfully


Also applies to: 176-183

</blockquote></details>
<details>
<summary>crates/arkflow-core/src/remote_config.rs (3)</summary><blockquote>

`210-214`: **Consider graceful task termination instead of abort.**

Using `abort()` on task handles forcefully terminates tasks without cleanup. Consider awaiting cancellation instead.

Instead of immediately aborting, give tasks a chance to shut down gracefully:

```diff
 // Stop existing stream
 existing.cancellation_token.cancel();
 if let Some(handle) = &existing.handle {
-    let _ = handle.abort();
+    // Give the task a chance to shut down gracefully
+    tokio::select! {
+        _ = tokio::time::sleep(Duration::from_secs(5)) => {
+            // Timeout - force abort
+            handle.abort();
+        }
+        _ = handle => {
+            // Task completed gracefully
+        }
+    }
 }

Also applies to: 228-231, 237-241, 257-260


248-248: Fix typo in comment.

-        // Remove stream that are no longer in the configuration
+        // Remove streams that are no longer in the configuration

168-186: Add retry logic for transient network failures.

Network requests can fail due to transient issues. Consider adding retry logic with exponential backoff.

Consider using a retry mechanism for better resilience:

use tokio_retry::{Retry, strategy::ExponentialBackoff};

async fn fetch_remote_config_with_retry(&self) -> Result<RemoteConfigResponse, Box<dyn std::error::Error>> {
    let retry_strategy = ExponentialBackoff::from_millis(100)
        .max_delay(Duration::from_secs(10))
        .take(3);
    
    Retry::spawn(retry_strategy, || async {
        self.fetch_remote_config().await
    }).await
}

This would require adding the tokio-retry crate as a dependency.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d7abf3f and 94d6f96.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (5)
  • crates/arkflow-core/Cargo.toml (1 hunks)
  • crates/arkflow-core/src/cli/mod.rs (3 hunks)
  • crates/arkflow-core/src/lib.rs (1 hunks)
  • crates/arkflow-core/src/remote_config.rs (1 hunks)
  • docs/remote-config.md (1 hunks)
🧰 Additional context used
🪛 markdownlint-cli2 (0.17.2)
docs/remote-config.md

36-36: Fenced code blocks should have a language specified

(MD040, fenced-code-language)


176-176: Fenced code blocks should have a language specified

(MD040, fenced-code-language)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: build
  • GitHub Check: Build Docusaurus
  • GitHub Check: build
🔇 Additional comments (1)
crates/arkflow-core/src/lib.rs (1)

38-38: LGTM!

The module declaration follows the existing naming convention and properly exposes the remote configuration functionality.

remote_manager.run().await?;
} else {
// Run with local configuration
let config = self.config.clone().unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid unwrap() on potentially None config.

While the logic ensures config is Some when remote_config_manager is None, using unwrap() here is fragile and could panic if the logic changes.

Apply this diff to handle the None case explicitly:

-            let config = self.config.clone().unwrap();
+            let config = self.config.clone().ok_or("Configuration not found")?;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let config = self.config.clone().unwrap();
let config = self.config.clone().ok_or("Configuration not found")?;
🤖 Prompt for AI Agents
In crates/arkflow-core/src/cli/mod.rs at line 121, avoid using unwrap() on
self.config as it may panic if None. Instead, explicitly handle the None case by
matching on self.config or using if let to safely extract the config value,
returning an error or handling the absence gracefully to prevent potential
runtime panics.

@chenquan chenquan force-pushed the feat/remote-manage branch from e0b0dc2 to 622819b Compare July 9, 2025 15:23
@chenquan chenquan marked this pull request as draft July 9, 2025 16:08
@chenquan chenquan self-assigned this Jul 11, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
crates/arkflow-core/src/cli/mod.rs (1)

131-131: Avoid unwrap() on self.config

This can panic if parse() returned early under --validate, leaving self.config as None. Prefer returning a proper error.

Apply this diff:

-            let config = self.config.clone().unwrap();
+            let config = self
+                .config
+                .clone()
+                .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::Other, "Configuration not found"))?;
🧹 Nitpick comments (7)
crates/arkflow-core/src/cli/mod.rs (7)

107-111: Don’t exit the process inside library-style parsing; propagate errors instead

Calling process::exit(1) here prevents callers from handling the error and makes testing harder. Return the error and let the top-level binary decide.

-                Err(e) => {
-                    println!("Failed to load configuration file: {}", e);
-                    process::exit(1);
-                }
+                Err(e) => {
+                    eprintln!("Failed to load configuration file: {}", e);
+                    return Err(Box::new(e));
+                }

113-121: --validate early return can leave self.config unset; set it before returning

If callers invoke run() after parse(), this leads to a panic at the unwrap(). To be safe, set self.config before the early return.

-            // If you just verify the configuration, exit it
-            if matches.get_flag("validate") {
-                info!("The config is validated.");
-                return Ok(());
-            }
-
-            self.config = Some(config);
+            // If you just verify the configuration, exit it
+            if matches.get_flag("validate") {
+                self.config = Some(config);
+                info!("The config is validated.");
+                return Ok(());
+            }
+
+            self.config = Some(config);

58-80: Consider making --token optional and allowing ENV override for flexibility

RemoteConfigManager::new takes Option<String>, but the CLI requires --token. Making it optional (and supporting an env var) lets users access unauthenticated endpoints or configure tokens via environment.

-                .arg(
-                    Arg::new("token")
-                        .long("token")
-                        .value_name("TOKEN")
-                        .help("Authentication token for remote configuration API")
-                        .required( true),
-                ))
+                .arg(
+                    Arg::new("token")
+                        .long("token")
+                        .value_name("TOKEN")
+                        .help("Authentication token for remote configuration API")
+                        .env("ARKFLOW_REMOTE_TOKEN")
+                ))

91-96: Avoid expect() for URL; propagate a user-friendly error instead

Even though clap enforces required(true), prefer consistent error handling by returning an error rather than panicking.

-            let remote_url = remote
-                .get_one::<String>("url")
-                .expect("Remote configuration URL not found");
+            let remote_url = remote
+                .get_one::<String>("url")
+                .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidInput, "Remote configuration URL not found"))?;

20-21: Unix-only signal handling; consider cross-platform support

Importing tokio::signal::unix limits builds to Unix targets. If Windows support is desired, add a ctrl_c fallback with conditional compilation.

Example changes:

- use tokio::signal::unix::{signal, SignalKind};
+ #[cfg(unix)]
+ use tokio::signal::unix::{signal, SignalKind};
+ #[cfg(not(unix))]
+ use tokio::signal;

And in run():

#[cfg(unix)]
// existing SIGINT/SIGTERM handling

#[cfg(not(unix))]
tokio::spawn({
    let token = token.clone();
    async move {
        if signal::ctrl_c().await.is_ok() {
            info!("Received Ctrl+C, exiting...");
        }
        token.cancel();
    }
});

156-224: Nit: remove explicit -> () and avoid .pretty() with JSON format

Minor polish:

  • Returning () explicitly is redundant in Rust.
  • When using .json(), .pretty() is ignored; drop it to reduce confusion.
-pub(crate) fn init_logging(config: &EngineConfig) -> () {
+pub(crate) fn init_logging(config: &EngineConfig) {
@@
-                    LogFormat::JSON => {
+                    LogFormat::JSON => {
                         let subscriber = subscriber_builder
                             .with_writer(std::sync::Mutex::new(file))
-                            .pretty()
                             .json()
                             .finish();
                         tracing::subscriber::set_global_default(subscriber)
                             .expect("You can't set a global default log subscriber");
                     }
@@
     match config.logging.format {
         LogFormat::JSON => {
-            let subscriber = subscriber_builder.pretty().json().finish();
+            let subscriber = subscriber_builder.json().finish();
             tracing::subscriber::set_global_default(subscriber)
                 .expect("You can't set a global default log subscriber");
         }

43-55: Nit: help text wording improvements

Tighten phrasing for clarity and correctness.

-            .about("High-performance Rust stream processing engine, providing powerful data stream processing capabilities, supporting multiple input/output sources and processors")
+            .about("High-performance Rust stream processing engine with multiple inputs/outputs and processors")
@@
-                    .help("Specify the profile path")
+                    .help("Path to the configuration file")
@@
-                    .help("Only the profile is verified, not the engine is started")
+                    .help("Validate the configuration file without starting the engine")
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 17da896 and c75ad66.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (1)
  • crates/arkflow-core/src/cli/mod.rs (2 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
crates/arkflow-core/src/cli/mod.rs (2)
crates/arkflow-core/src/config.rs (3)
  • default (148-156)
  • default (165-171)
  • from_file (89-106)
crates/arkflow-core/src/remote_config.rs (2)
  • new (93-107)
  • run (110-134)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: build
  • GitHub Check: build
🔇 Additional comments (1)
crates/arkflow-core/src/cli/mod.rs (1)

86-90: Interval parsing: good defensive handling

Nice use of .and_then(|s| s.parse::<u64>().ok()).unwrap_or(30) to avoid panics and provide a sane default.

Comment on lines +101 to +104
let config_path = matches
.get_one::<String>("config")
.ok_or("Configuration not found")?;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Compilation bug: using ok_or("...")? with &str won't compile

&str doesn’t implement std::error::Error, so ? cannot convert it into Box<dyn std::error::Error>. Use an error type that implements Error (e.g., std::io::Error) or handle it explicitly.

Apply this diff:

-            let config_path = matches
-                .get_one::<String>("config")
-                .ok_or("Configuration not found")?;
+            let config_path = matches
+                .get_one::<String>("config")
+                .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidInput, "Configuration not found"))?;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let config_path = matches
.get_one::<String>("config")
.ok_or("Configuration not found")?;
let config_path = matches
.get_one::<String>("config")
.ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidInput, "Configuration not found"))?;
🤖 Prompt for AI Agents
crates/arkflow-core/src/cli/mod.rs around lines 101 to 104: the use of
.ok_or("Configuration not found")? fails because &str does not implement
std::error::Error and cannot be converted by ?. Replace the call so it produces
a concrete error type that implements Error (for example use .ok_or_else(||
std::io::Error::new(std::io::ErrorKind::Other, "Configuration not found"))?), or
if the project uses anyhow, use .ok_or_else(|| anyhow::anyhow!("Configuration
not found"))?; ensure the resulting error type matches the function return type
(or convert/wrap it into Box<dyn std::error::Error> as needed).

Comment on lines +137 to +152
// Set up signal handlers
let mut sigint = signal(SignalKind::interrupt()).expect("Failed to set signal handler");
let mut sigterm = signal(SignalKind::terminate()).expect("Failed to set signal handler");

tokio::spawn(async move {
tokio::select! {
_ = sigint.recv() => {
info!("Received SIGINT, exiting...");

},
_ = sigterm.recv() => {
info!("Received SIGTERM, exiting...");
}
}
token.cancel();
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Signal handlers are set up after the run completes (unreachable), breaking graceful shutdown

You await the engine/remote manager before spawning signal listeners. As a result, the token is never cancelled on SIGINT/SIGTERM and the process won’t shut down gracefully.

Move the signal setup before awaiting run(...):

 pub async fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
-        let token = CancellationToken::new();
+        let token = CancellationToken::new();
+
+        // Set up signal handlers BEFORE running the engine/remote manager
+        let token_for_signals = token.clone();
+        #[cfg(unix)]
+        tokio::spawn(async move {
+            let mut sigint = signal(SignalKind::interrupt()).expect("Failed to set signal handler");
+            let mut sigterm = signal(SignalKind::terminate()).expect("Failed to set signal handler");
+            tokio::select! {
+                _ = sigint.recv() => {
+                    info!("Received SIGINT, exiting...");
+                },
+                _ = sigterm.recv() => {
+                    info!("Received SIGTERM, exiting...");
+                }
+            }
+            token_for_signals.cancel();
+        });
+
+        #[cfg(not(unix))]
+        tokio::spawn(async move {
+            if tokio::signal::ctrl_c().await.is_ok() {
+                info!("Received Ctrl+C, exiting...");
+            }
+            token_for_signals.cancel();
+        });
 
-        if let Some(remote_manager) = &self.remote_config_manager {
+        if let Some(remote_manager) = &self.remote_config_manager {
             // Run with remote configuration management
             remote_manager.run(token.clone()).await?;
         } else {
             // Run with local configuration
             let config = self.config.clone().unwrap();
             init_logging(&config);
             let engine = Engine::new(config);
             engine.run(token.clone()).await?;
         }
 
-        // Set up signal handlers
-        let mut sigint = signal(SignalKind::interrupt()).expect("Failed to set signal handler");
-        let mut sigterm = signal(SignalKind::terminate()).expect("Failed to set signal handler");
-
-        tokio::spawn(async move {
-            tokio::select! {
-                _ = sigint.recv() => {
-                    info!("Received SIGINT, exiting...");
-
-                },
-                _ = sigterm.recv() => {
-                    info!("Received SIGTERM, exiting...");
-                }
-            }
-            token.cancel();
-        });
         Ok(())
     }
🤖 Prompt for AI Agents
In crates/arkflow-core/src/cli/mod.rs around lines 137 to 152, the signal
handler setup currently runs after awaiting run(...), so SIGINT/SIGTERM can't
cancel the token; move the signal creation and tokio::spawn block to before the
call to run(...) (so the signal streams and spawned task live while run
executes), ensure any cancellation token used inside the spawned task is
cloned/moved into the async block correctly, and then await run(...) — this
guarantees signals are observed and token.cancel() is called on SIGINT/SIGTERM
for graceful shutdown.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants