Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,14 @@ func (d *Daemon) routeMessages() {

d.logger.Info("Delivered message %s from %s to %s/%s", msg.ID, msg.From, repoName, agentName)
}

// Clean up acknowledged messages to prevent pile-up
count, err := msgMgr.DeleteAcked(repoName, agentName)
if err != nil {
d.logger.Error("Failed to clean up acked messages for %s/%s: %v", repoName, agentName, err)
} else if count > 0 {
d.logger.Debug("Cleaned up %d acked messages for %s/%s", count, repoName, agentName)
}
}
}
}
Expand Down
76 changes: 76 additions & 0 deletions internal/daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1182,6 +1182,82 @@ func TestMessageRoutingWithRealTmux(t *testing.T) {
}
}

func TestMessageRoutingCleansUpAckedMessages(t *testing.T) {
tmuxClient := tmux.NewClient()
if !tmuxClient.IsTmuxAvailable() {
t.Fatal("tmux is required for this test but not available")
}

d, cleanup := setupTestDaemon(t)
defer cleanup()

// Create a real tmux session
sessionName := "mc-test-cleanup"
if err := tmuxClient.CreateSession(context.Background(), sessionName, true); err != nil {
t.Fatalf("tmux is required for this test but cannot create sessions in this environment: %v", err)
}
defer tmuxClient.KillSession(context.Background(), sessionName)

// Create window for worker
if err := tmuxClient.CreateWindow(context.Background(), sessionName, "worker1"); err != nil {
t.Fatalf("Failed to create worker window: %v", err)
}

// Add repo and agent
repo := &state.Repository{
GithubURL: "https://github.com/test/repo",
TmuxSession: sessionName,
Agents: make(map[string]state.Agent),
}
if err := d.state.AddRepo("test-repo", repo); err != nil {
t.Fatalf("Failed to add repo: %v", err)
}

worker := state.Agent{
Type: state.AgentTypeWorker,
TmuxWindow: "worker1",
Task: "Test task",
CreatedAt: time.Now(),
}
if err := d.state.AddAgent("test-repo", "worker1", worker); err != nil {
t.Fatalf("Failed to add worker: %v", err)
}

// Create messages and immediately ack them
msgMgr := messages.NewManager(d.paths.MessagesDir)
for i := 0; i < 5; i++ {
msg, err := msgMgr.Send("test-repo", "supervisor", "worker1", "Test message")
if err != nil {
t.Fatalf("Failed to send message: %v", err)
}
// Mark as acked
if err := msgMgr.Ack("test-repo", "worker1", msg.ID); err != nil {
t.Fatalf("Failed to ack message: %v", err)
}
}

// Verify we have 5 acked messages
allMsgs, err := msgMgr.List("test-repo", "worker1")
if err != nil {
t.Fatalf("Failed to list messages: %v", err)
}
if len(allMsgs) != 5 {
t.Fatalf("Expected 5 messages, got %d", len(allMsgs))
}

// Trigger message routing which should clean up acked messages
d.TriggerMessageRouting()

// Verify acked messages were deleted
remainingMsgs, err := msgMgr.List("test-repo", "worker1")
if err != nil {
t.Fatalf("Failed to list messages after cleanup: %v", err)
}
if len(remainingMsgs) != 0 {
t.Errorf("Expected 0 messages after cleanup, got %d", len(remainingMsgs))
}
}

func TestWakeLoopUpdatesNudgeTime(t *testing.T) {
tmuxClient := tmux.NewClient()
if !tmuxClient.IsTmuxAvailable() {
Expand Down