Skip to content

Commit c59d20e

Browse files
feat: unify feedback queue and stored commands into single task queue (#43)
Replace the feedback-only `Vec<String>` queue with a tagged `QueueItem` enum supporting both Feedback and Command variants. This lets users queue commands on running tasks the same way they queue feedback. Key changes: - Add `QueueItem` enum with `#[serde(tag = "type")]` for clean JSON - Rename `feedback_queue.json` → `queue.json` with migration on read - Rename all `feedback_queue` methods to generic `queue` equivalents - Commands are queued when task is running, run immediately when stopped - Stranded queue processor handles both feedback and command items - Queue view shows typed items with `[feedback]`/`[cmd]` prefixes - Agent flow runner only processes Feedback items (not commands) - Add tests for queue_command, mixed queue, and migration path Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent fab3904 commit c59d20e

6 files changed

Lines changed: 472 additions & 163 deletions

File tree

src/agent.rs

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::process::{Command, Stdio};
44

55
use crate::config::Config;
66
use crate::flow::{AgentStep, Flow, FlowStep, StopCondition};
7-
use crate::task::{Task, TaskStatus};
7+
use crate::task::{QueueItem, Task, TaskStatus};
88
use crate::tmux::Tmux;
99

1010
pub struct Agent {
@@ -497,21 +497,32 @@ impl AgentRunner {
497497
}
498498
}
499499

500-
/// Process queued feedback if any exists
500+
/// Process queued feedback if any exists (skips non-feedback items).
501501
/// Returns Some(StopCondition) if feedback was processed and a new flow completed,
502-
/// or None if no feedback was queued
502+
/// or None if no feedback was queued.
503503
fn process_queued_feedback(&self, task: &mut Task) -> Result<Option<StopCondition>> {
504-
// Queue is stored in a separate file (feedback_queue.json), so no need
504+
// Queue is stored in a separate file (queue.json), so no need
505505
// to reload meta — the queue methods read directly from disk.
506-
if !task.has_queued_feedback() {
506+
if !task.has_queued_items() {
507507
return Ok(None);
508508
}
509509

510+
// Peek at the first item — only process feedback, leave commands for the TUI
511+
let queue = task.read_queue();
512+
match queue.first() {
513+
Some(QueueItem::Feedback { .. }) => {}
514+
_ => return Ok(None),
515+
}
516+
510517
println!();
511-
println!("=== Processing queued feedback ({} items) ===", task.queued_feedback_count());
518+
println!("=== Processing queued feedback ({} items in queue) ===", task.queued_item_count());
512519

513-
// Pop the first feedback item
514-
let feedback = task.pop_feedback_queue()?.expect("Queue was not empty");
520+
// Pop the first item (we already verified it's Feedback)
521+
let item = task.pop_queue()?.expect("Queue was not empty");
522+
let feedback = match item {
523+
QueueItem::Feedback { text } => text,
524+
_ => return Ok(None), // Shouldn't happen, but be safe
525+
};
515526

516527
println!("Feedback: {}", if feedback.len() > 100 {
517528
format!("{}...", &feedback[..100])

src/task.rs

Lines changed: 69 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,14 @@ use std::path::PathBuf;
55

66
use crate::config::Config;
77

8+
/// A single item in the task queue (feedback or command).
9+
#[derive(Debug, Clone, Serialize, Deserialize)]
10+
#[serde(tag = "type")]
11+
pub enum QueueItem {
12+
Feedback { text: String },
13+
Command { command_id: String, branch: Option<String> },
14+
}
15+
816
#[derive(Debug, Clone, Copy)]
917
enum SectionKind {
1018
AgentOutput,
@@ -778,25 +786,50 @@ impl Task {
778786
Ok(())
779787
}
780788

781-
/// Path to the separate feedback queue file (avoids meta.json write conflicts)
789+
/// Path to the task queue file
782790
fn queue_file_path(&self) -> PathBuf {
791+
self.dir.join("queue.json")
792+
}
793+
794+
/// Path to the old feedback queue file (for migration)
795+
fn old_queue_file_path(&self) -> PathBuf {
783796
self.dir.join("feedback_queue.json")
784797
}
785798

786-
/// Read the feedback queue from its dedicated file
787-
fn read_queue_file(&self) -> Vec<String> {
799+
/// Read the queue from its dedicated file, with migration from old format
800+
fn read_queue_file(&self) -> Vec<QueueItem> {
788801
let path = self.queue_file_path();
789-
if !path.exists() {
790-
return Vec::new();
802+
if path.exists() {
803+
return match std::fs::read_to_string(&path) {
804+
Ok(content) => serde_json::from_str(&content).unwrap_or_default(),
805+
Err(_) => Vec::new(),
806+
};
791807
}
792-
match std::fs::read_to_string(&path) {
793-
Ok(content) => serde_json::from_str(&content).unwrap_or_default(),
794-
Err(_) => Vec::new(),
808+
809+
// Migration: if old feedback_queue.json exists, convert it
810+
let old_path = self.old_queue_file_path();
811+
if old_path.exists() {
812+
let old_items: Vec<String> = match std::fs::read_to_string(&old_path) {
813+
Ok(content) => serde_json::from_str(&content).unwrap_or_default(),
814+
Err(_) => return Vec::new(),
815+
};
816+
let migrated: Vec<QueueItem> = old_items
817+
.into_iter()
818+
.map(|text| QueueItem::Feedback { text })
819+
.collect();
820+
// Write new format and delete old file
821+
if let Ok(content) = serde_json::to_string_pretty(&migrated) {
822+
let _ = std::fs::write(&path, content);
823+
}
824+
let _ = std::fs::remove_file(&old_path);
825+
return migrated;
795826
}
827+
828+
Vec::new()
796829
}
797830

798-
/// Write the feedback queue to its dedicated file (deletes file if empty)
799-
fn write_queue_file(&self, queue: &[String]) -> Result<()> {
831+
/// Write the queue to its dedicated file (deletes file if empty)
832+
fn write_queue_file(&self, queue: &[QueueItem]) -> Result<()> {
800833
let path = self.queue_file_path();
801834
if queue.is_empty() {
802835
if path.exists() {
@@ -813,38 +846,49 @@ impl Task {
813846
pub fn queue_feedback(&self, feedback: &str) -> Result<()> {
814847
let mut queue = self.read_queue_file();
815848
tracing::debug!(task_id = %self.meta.task_id(), queue_size = queue.len() + 1, "queuing feedback");
816-
queue.push(feedback.to_string());
849+
queue.push(QueueItem::Feedback { text: feedback.to_string() });
850+
self.write_queue_file(&queue)
851+
}
852+
853+
/// Queue a command to be run when the task stops
854+
pub fn queue_command(&self, command_id: &str, branch: Option<&str>) -> Result<()> {
855+
let mut queue = self.read_queue_file();
856+
tracing::debug!(task_id = %self.meta.task_id(), queue_size = queue.len() + 1, command_id, "queuing command");
857+
queue.push(QueueItem::Command {
858+
command_id: command_id.to_string(),
859+
branch: branch.map(|b| b.to_string()),
860+
});
817861
self.write_queue_file(&queue)
818862
}
819863

820-
/// Pop the first feedback item from the queue
821-
pub fn pop_feedback_queue(&self) -> Result<Option<String>> {
864+
/// Pop the first item from the queue
865+
pub fn pop_queue(&self) -> Result<Option<QueueItem>> {
822866
let mut queue = self.read_queue_file();
823867
if queue.is_empty() {
824868
return Ok(None);
825869
}
826-
let feedback = queue.remove(0);
870+
let item = queue.remove(0);
827871
self.write_queue_file(&queue)?;
828-
Ok(Some(feedback))
872+
Ok(Some(item))
829873
}
830874

831-
/// Check if there's queued feedback
832-
pub fn has_queued_feedback(&self) -> bool {
875+
/// Check if there are queued items
876+
pub fn has_queued_items(&self) -> bool {
833877
!self.read_queue_file().is_empty()
834878
}
835879

836-
/// Get the number of queued feedback items
837-
pub fn queued_feedback_count(&self) -> usize {
880+
/// Get the number of queued items
881+
pub fn queued_item_count(&self) -> usize {
838882
self.read_queue_file().len()
839883
}
840884

841-
/// Read all queued feedback items (for display purposes)
842-
pub fn read_feedback_queue(&self) -> Vec<String> {
885+
/// Read all queued items (for display purposes)
886+
pub fn read_queue(&self) -> Vec<QueueItem> {
843887
self.read_queue_file()
844888
}
845889

846-
/// Remove a single feedback item by index
847-
pub fn remove_feedback_queue_item(&self, index: usize) -> Result<()> {
890+
/// Remove a single queue item by index
891+
pub fn remove_queue_item(&self, index: usize) -> Result<()> {
848892
let mut queue = self.read_queue_file();
849893
if index < queue.len() {
850894
queue.remove(index);
@@ -853,8 +897,8 @@ impl Task {
853897
Ok(())
854898
}
855899

856-
/// Clear all queued feedback
857-
pub fn clear_feedback_queue(&self) -> Result<()> {
900+
/// Clear all queued items
901+
pub fn clear_queue(&self) -> Result<()> {
858902
let path = self.queue_file_path();
859903
if path.exists() {
860904
std::fs::remove_file(&path)?;

0 commit comments

Comments
 (0)