Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 19 additions & 8 deletions src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::process::{Command, Stdio};

use crate::config::Config;
use crate::flow::{AgentStep, Flow, FlowStep, StopCondition};
use crate::task::{Task, TaskStatus};
use crate::task::{QueueItem, Task, TaskStatus};
use crate::tmux::Tmux;

pub struct Agent {
Expand Down Expand Up @@ -497,21 +497,32 @@ impl AgentRunner {
}
}

/// Process queued feedback if any exists
/// Process queued feedback if any exists (skips non-feedback items).
/// Returns Some(StopCondition) if feedback was processed and a new flow completed,
/// or None if no feedback was queued
/// or None if no feedback was queued.
fn process_queued_feedback(&self, task: &mut Task) -> Result<Option<StopCondition>> {
// Queue is stored in a separate file (feedback_queue.json), so no need
// Queue is stored in a separate file (queue.json), so no need
// to reload meta — the queue methods read directly from disk.
if !task.has_queued_feedback() {
if !task.has_queued_items() {
return Ok(None);
}

// Peek at the first item — only process feedback, leave commands for the TUI
let queue = task.read_queue();
match queue.first() {
Some(QueueItem::Feedback { .. }) => {}
_ => return Ok(None),
}

println!();
println!("=== Processing queued feedback ({} items) ===", task.queued_feedback_count());
println!("=== Processing queued feedback ({} items in queue) ===", task.queued_item_count());

// Pop the first feedback item
let feedback = task.pop_feedback_queue()?.expect("Queue was not empty");
// Pop the first item (we already verified it's Feedback)
let item = task.pop_queue()?.expect("Queue was not empty");
let feedback = match item {
QueueItem::Feedback { text } => text,
_ => return Ok(None), // Shouldn't happen, but be safe
};

println!("Feedback: {}", if feedback.len() > 100 {
format!("{}...", &feedback[..100])
Expand Down
94 changes: 69 additions & 25 deletions src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ use std::path::PathBuf;

use crate::config::Config;

/// A single item in the task queue (feedback or command).
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum QueueItem {
Feedback { text: String },
Command { command_id: String, branch: Option<String> },
}

#[derive(Debug, Clone, Copy)]
enum SectionKind {
AgentOutput,
Expand Down Expand Up @@ -778,25 +786,50 @@ impl Task {
Ok(())
}

/// Path to the separate feedback queue file (avoids meta.json write conflicts)
/// Path to the task queue file
fn queue_file_path(&self) -> PathBuf {
self.dir.join("queue.json")
}

/// Path to the old feedback queue file (for migration)
fn old_queue_file_path(&self) -> PathBuf {
self.dir.join("feedback_queue.json")
}

/// Read the feedback queue from its dedicated file
fn read_queue_file(&self) -> Vec<String> {
/// Read the queue from its dedicated file, with migration from old format
fn read_queue_file(&self) -> Vec<QueueItem> {
let path = self.queue_file_path();
if !path.exists() {
return Vec::new();
if path.exists() {
return match std::fs::read_to_string(&path) {
Ok(content) => serde_json::from_str(&content).unwrap_or_default(),
Err(_) => Vec::new(),
};
}
match std::fs::read_to_string(&path) {
Ok(content) => serde_json::from_str(&content).unwrap_or_default(),
Err(_) => Vec::new(),

// Migration: if old feedback_queue.json exists, convert it
let old_path = self.old_queue_file_path();
if old_path.exists() {
let old_items: Vec<String> = match std::fs::read_to_string(&old_path) {
Ok(content) => serde_json::from_str(&content).unwrap_or_default(),
Err(_) => return Vec::new(),
};
let migrated: Vec<QueueItem> = old_items
.into_iter()
.map(|text| QueueItem::Feedback { text })
.collect();
// Write new format and delete old file
if let Ok(content) = serde_json::to_string_pretty(&migrated) {
let _ = std::fs::write(&path, content);
}
let _ = std::fs::remove_file(&old_path);
return migrated;
}

Vec::new()
}

/// Write the feedback queue to its dedicated file (deletes file if empty)
fn write_queue_file(&self, queue: &[String]) -> Result<()> {
/// Write the queue to its dedicated file (deletes file if empty)
fn write_queue_file(&self, queue: &[QueueItem]) -> Result<()> {
let path = self.queue_file_path();
if queue.is_empty() {
if path.exists() {
Expand All @@ -813,38 +846,49 @@ impl Task {
pub fn queue_feedback(&self, feedback: &str) -> Result<()> {
let mut queue = self.read_queue_file();
tracing::debug!(task_id = %self.meta.task_id(), queue_size = queue.len() + 1, "queuing feedback");
queue.push(feedback.to_string());
queue.push(QueueItem::Feedback { text: feedback.to_string() });
self.write_queue_file(&queue)
}

/// Queue a command to be run when the task stops
pub fn queue_command(&self, command_id: &str, branch: Option<&str>) -> Result<()> {
let mut queue = self.read_queue_file();
tracing::debug!(task_id = %self.meta.task_id(), queue_size = queue.len() + 1, command_id, "queuing command");
queue.push(QueueItem::Command {
command_id: command_id.to_string(),
branch: branch.map(|b| b.to_string()),
});
self.write_queue_file(&queue)
}

/// Pop the first feedback item from the queue
pub fn pop_feedback_queue(&self) -> Result<Option<String>> {
/// Pop the first item from the queue
pub fn pop_queue(&self) -> Result<Option<QueueItem>> {
let mut queue = self.read_queue_file();
if queue.is_empty() {
return Ok(None);
}
let feedback = queue.remove(0);
let item = queue.remove(0);
self.write_queue_file(&queue)?;
Ok(Some(feedback))
Ok(Some(item))
}

/// Check if there's queued feedback
pub fn has_queued_feedback(&self) -> bool {
/// Check if there are queued items
pub fn has_queued_items(&self) -> bool {
!self.read_queue_file().is_empty()
}

/// Get the number of queued feedback items
pub fn queued_feedback_count(&self) -> usize {
/// Get the number of queued items
pub fn queued_item_count(&self) -> usize {
self.read_queue_file().len()
}

/// Read all queued feedback items (for display purposes)
pub fn read_feedback_queue(&self) -> Vec<String> {
/// Read all queued items (for display purposes)
pub fn read_queue(&self) -> Vec<QueueItem> {
self.read_queue_file()
}

/// Remove a single feedback item by index
pub fn remove_feedback_queue_item(&self, index: usize) -> Result<()> {
/// Remove a single queue item by index
pub fn remove_queue_item(&self, index: usize) -> Result<()> {
let mut queue = self.read_queue_file();
if index < queue.len() {
queue.remove(index);
Expand All @@ -853,8 +897,8 @@ impl Task {
Ok(())
}

/// Clear all queued feedback
pub fn clear_feedback_queue(&self) -> Result<()> {
/// Clear all queued items
pub fn clear_queue(&self) -> Result<()> {
let path = self.queue_file_path();
if path.exists() {
std::fs::remove_file(&path)?;
Expand Down
Loading