-
Notifications
You must be signed in to change notification settings - Fork 160
Description
[BUG] AgentInterrupt::trigger() uses notify_one() — concurrent tasks silently miss interrupt signals
Summary
In crates/mofa-runtime/src/interrupt.rs, the AgentInterrupt::trigger() method uses tokio::sync::Notify::notify_one() to signal an interrupt. When an agent has multiple concurrent tasks (e.g., parallel node execution, background streamers, periodic runners) all waiting on the same AgentInterrupt, only one of them is woken up. The remaining tasks continue running indefinitely, completely unaware that an interrupt was requested.
Root Cause
// crates/mofa-runtime/src/interrupt.rs
pub struct AgentInterrupt {
is_interrupted: AtomicBool,
notify: Notify,
}
impl AgentInterrupt {
pub fn trigger(&self) {
self.is_interrupted.store(true, Ordering::SeqCst);
self.notify.notify_one(); // ← BUG: only wakes ONE waiter
}
pub async fn wait(&self) {
if self.is_interrupted.load(Ordering::SeqCst) {
return;
}
self.notify.notified().await;
}
}Why notify_one() is Wrong Here
Notify::notify_one() wakes exactly one task that is currently await-ing the notified() future. If multiple tasks share the same AgentInterrupt (which is the normal case — the interrupt handle is Arc-cloned across concurrent branches), all tasks except one will remain permanently blocked on notified().await, never observing the interrupt.
While is_interrupted is set atomically, tasks that are already parked on self.notify.notified().await will never re-check the AtomicBool flag — they are blocked on the Notify wakeup that will never arrive.
Impact
1. Zombie Tasks on User Cancellation
When a user cancels an agent execution (e.g., HTTP disconnect, CLI Ctrl+C), the interrupt is triggered but only one branch stops. All other parallel branches become zombie tasks — consuming CPU, making LLM API calls, and mutating state for a request that has already been abandoned.
2. Resource Leak in Multi-Task Agents
Agents using ParallelAgent, concurrent node execution in StateGraph, or periodic runners with background streamers will leak tasks on every interrupt. Over time this exhausts tokio's task budget and memory.
3. Inconsistent Shutdown Semantics
The AgentInterrupt API appears to provide broadcast cancellation semantics (it sets a global AtomicBool), but the notification layer only provides point-to-point semantics. This mismatch makes it impossible for consumers to reason about shutdown correctness.
Reproduction
use std::sync::Arc;
use tokio::sync::Notify;
use std::sync::atomic::{AtomicBool, Ordering};
#[tokio::main]
async fn main() {
let interrupt = Arc::new(AgentInterrupt::new());
// Simulate 3 concurrent tasks sharing the same interrupt
let mut handles = vec![];
for i in 0..3 {
let int = interrupt.clone();
handles.push(tokio::spawn(async move {
int.wait().await;
println!("Task {i} received interrupt");
}));
}
// Give tasks time to park on notified().await
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
// Trigger interrupt — only ONE task wakes up
interrupt.trigger();
// Wait a bit — tasks 1 and 2 are still blocked forever
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
// Output: Only "Task 0 received interrupt" (or whichever one was lucky)
// Tasks 1 and 2 are permanently stuck.
}Proposed Fix
Immediate: Replace notify_one() with notify_waiters()
pub fn trigger(&self) {
self.is_interrupted.store(true, Ordering::SeqCst);
- self.notify.notify_one();
+ self.notify.notify_waiters();
}notify_waiters() wakes all tasks currently awaiting the Notify, which matches the broadcast cancellation semantics that AgentInterrupt is designed to provide.
Defensive: Add a poll-based check in wait()
To handle the race where a task calls wait() after trigger() has already fired (and notify_waiters() has already drained), the wait() method should also re-check the flag after being notified:
pub async fn wait(&self) {
loop {
if self.is_interrupted.load(Ordering::SeqCst) {
return;
}
self.notify.notified().await;
// Re-check after wakeup to handle spurious wake or late arrival
}
}Affected Consumers
Any code path that:
- Clones an
Arc<AgentInterrupt>into multiple concurrenttokio::spawntasks - Calls
.wait()from more than one task - Expects
.trigger()to stop all of them
This includes (but is not limited to):
AgentRunnerwith periodic execution and background tasksParallelAgentspawned branch executionStateGraphparallel node execution- Any custom agent implementation using
AgentInterruptfor cooperative cancellation
Labels
kind/bug, area/runtime, priority/p1