diff --git a/internal/daemon/daemon.go b/internal/daemon/daemon.go index c755412..83f51ad 100644 --- a/internal/daemon/daemon.go +++ b/internal/daemon/daemon.go @@ -419,6 +419,14 @@ func (d *Daemon) routeMessages() { d.logger.Info("Delivered message %s from %s to %s/%s", msg.ID, msg.From, repoName, agentName) } + + // Clean up acknowledged messages to prevent pile-up + count, err := msgMgr.DeleteAcked(repoName, agentName) + if err != nil { + d.logger.Error("Failed to clean up acked messages for %s/%s: %v", repoName, agentName, err) + } else if count > 0 { + d.logger.Debug("Cleaned up %d acked messages for %s/%s", count, repoName, agentName) + } } } } diff --git a/internal/daemon/daemon_test.go b/internal/daemon/daemon_test.go index a882edb..d814d59 100644 --- a/internal/daemon/daemon_test.go +++ b/internal/daemon/daemon_test.go @@ -1182,6 +1182,82 @@ func TestMessageRoutingWithRealTmux(t *testing.T) { } } +func TestMessageRoutingCleansUpAckedMessages(t *testing.T) { + tmuxClient := tmux.NewClient() + if !tmuxClient.IsTmuxAvailable() { + t.Fatal("tmux is required for this test but not available") + } + + d, cleanup := setupTestDaemon(t) + defer cleanup() + + // Create a real tmux session + sessionName := "mc-test-cleanup" + if err := tmuxClient.CreateSession(context.Background(), sessionName, true); err != nil { + t.Fatalf("tmux is required for this test but cannot create sessions in this environment: %v", err) + } + defer tmuxClient.KillSession(context.Background(), sessionName) + + // Create window for worker + if err := tmuxClient.CreateWindow(context.Background(), sessionName, "worker1"); err != nil { + t.Fatalf("Failed to create worker window: %v", err) + } + + // Add repo and agent + repo := &state.Repository{ + GithubURL: "https://github.com/test/repo", + TmuxSession: sessionName, + Agents: make(map[string]state.Agent), + } + if err := d.state.AddRepo("test-repo", repo); err != nil { + t.Fatalf("Failed to add repo: %v", err) + } + + worker := state.Agent{ + Type: state.AgentTypeWorker, + TmuxWindow: "worker1", + Task: "Test task", + CreatedAt: time.Now(), + } + if err := d.state.AddAgent("test-repo", "worker1", worker); err != nil { + t.Fatalf("Failed to add worker: %v", err) + } + + // Create messages and immediately ack them + msgMgr := messages.NewManager(d.paths.MessagesDir) + for i := 0; i < 5; i++ { + msg, err := msgMgr.Send("test-repo", "supervisor", "worker1", "Test message") + if err != nil { + t.Fatalf("Failed to send message: %v", err) + } + // Mark as acked + if err := msgMgr.Ack("test-repo", "worker1", msg.ID); err != nil { + t.Fatalf("Failed to ack message: %v", err) + } + } + + // Verify we have 5 acked messages + allMsgs, err := msgMgr.List("test-repo", "worker1") + if err != nil { + t.Fatalf("Failed to list messages: %v", err) + } + if len(allMsgs) != 5 { + t.Fatalf("Expected 5 messages, got %d", len(allMsgs)) + } + + // Trigger message routing which should clean up acked messages + d.TriggerMessageRouting() + + // Verify acked messages were deleted + remainingMsgs, err := msgMgr.List("test-repo", "worker1") + if err != nil { + t.Fatalf("Failed to list messages after cleanup: %v", err) + } + if len(remainingMsgs) != 0 { + t.Errorf("Expected 0 messages after cleanup, got %d", len(remainingMsgs)) + } +} + func TestWakeLoopUpdatesNudgeTime(t *testing.T) { tmuxClient := tmux.NewClient() if !tmuxClient.IsTmuxAvailable() {