Skip to content

Commit 04c7b2f

Browse files
committed
concurrent session support with tab view to switch between sessions
Signed-off-by: Christopher Petito <chrisjpetito@gmail.com>
1 parent d2aba4e commit 04c7b2f

39 files changed

+5543
-886
lines changed

cmd/root/new.go

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package root
22

33
import (
44
"context"
5+
"os"
56
"strings"
67

78
tea "charm.land/bubbletea/v2"
@@ -14,6 +15,7 @@ import (
1415
"github.com/docker/cagent/pkg/session"
1516
"github.com/docker/cagent/pkg/telemetry"
1617
"github.com/docker/cagent/pkg/tui"
18+
"github.com/docker/cagent/pkg/tui/background"
1719
tuiinput "github.com/docker/cagent/pkg/tui/input"
1820
)
1921

@@ -86,14 +88,14 @@ func (f *newFlags) runNewCommand(cmd *cobra.Command, args []string) error {
8688
return runTUI(ctx, rt, sess, appOpts...)
8789
}
8890

89-
func runTUI(ctx context.Context, rt runtime.Runtime, sess *session.Session, opts ...app.Opt) error {
90-
// If the runtime can provide a title generator, use it.
91+
// newTUIApp creates the shared app and wheel coalescer/filter used by both
92+
// single-session and multi-session TUI entry points.
93+
func newTUIApp(ctx context.Context, rt runtime.Runtime, sess *session.Session, opts []app.Opt) (*app.App, *tuiinput.WheelCoalescer, func(tea.Model, tea.Msg) tea.Msg) {
9194
if gen := rt.TitleGenerator(); gen != nil {
9295
opts = append(opts, app.WithTitleGenerator(gen))
9396
}
9497

9598
a := app.New(ctx, rt, sess, opts...)
96-
m := tui.New(ctx, a)
9799

98100
coalescer := tuiinput.NewWheelCoalescer()
99101
filter := func(model tea.Model, msg tea.Msg) tea.Msg {
@@ -107,10 +109,39 @@ func runTUI(ctx context.Context, rt runtime.Runtime, sess *session.Session, opts
107109
return msg
108110
}
109111

112+
return a, coalescer, filter
113+
}
114+
115+
func runTUI(ctx context.Context, rt runtime.Runtime, sess *session.Session, opts ...app.Opt) error {
116+
a, coalescer, filter := newTUIApp(ctx, rt, sess, opts)
117+
118+
m := tui.New(ctx, a)
119+
110120
p := tea.NewProgram(m, tea.WithContext(ctx), tea.WithFilter(filter))
111121
coalescer.SetSender(p.Send)
112122
go a.Subscribe(ctx, p)
113123

114124
_, err := p.Run()
115125
return err
116126
}
127+
128+
func runMultiSessionTUI(ctx context.Context, rt runtime.Runtime, sess *session.Session, spawner background.SessionSpawner, cleanup func(), opts ...app.Opt) error {
129+
a, coalescer, filter := newTUIApp(ctx, rt, sess, opts)
130+
131+
if cleanup == nil {
132+
cleanup = func() {}
133+
}
134+
wd, _ := os.Getwd()
135+
bgModel := background.New(ctx, spawner, a, wd, cleanup)
136+
137+
p := tea.NewProgram(bgModel, tea.WithContext(ctx), tea.WithFilter(filter))
138+
coalescer.SetSender(p.Send)
139+
140+
// Set program on background model for routed message sending
141+
if m, ok := bgModel.(interface{ SetProgram(p *tea.Program) }); ok {
142+
m.SetProgram(p)
143+
}
144+
145+
_, err := p.Run()
146+
return err
147+
}

cmd/root/run.go

Lines changed: 133 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@ import (
2121
"github.com/docker/cagent/pkg/paths"
2222
"github.com/docker/cagent/pkg/runtime"
2323
"github.com/docker/cagent/pkg/session"
24+
"github.com/docker/cagent/pkg/sessiontitle"
2425
"github.com/docker/cagent/pkg/teamloader"
2526
"github.com/docker/cagent/pkg/telemetry"
27+
"github.com/docker/cagent/pkg/tui/background"
2628
"github.com/docker/cagent/pkg/tui/styles"
2729
)
2830

@@ -204,45 +206,38 @@ func (f *runExecFlags) runOrExec(ctx context.Context, out *cli.Printer, args []s
204206
out.Println("Recording mode enabled, cassette: " + cassettePath)
205207
}
206208

207-
var (
208-
rt runtime.Runtime
209-
sess *session.Session
210-
cleanup func()
211-
)
209+
// Remote runtime
212210
if f.remoteAddress != "" {
213-
rt, sess, err = f.createRemoteRuntimeAndSession(ctx, agentFileName)
214-
if err != nil {
215-
return err
216-
}
217-
cleanup = func() {} // Remote runtime doesn't need local cleanup
218-
} else {
219-
agentSource, err := config.Resolve(agentFileName, f.runConfig.EnvProvider())
211+
rt, sess, err := f.createRemoteRuntimeAndSession(ctx, agentFileName)
220212
if err != nil {
221213
return err
222214
}
215+
return f.launchTUI(ctx, out, rt, sess, args, tui)
216+
}
223217

224-
loadResult, err := f.loadAgentFrom(ctx, agentSource)
225-
if err != nil {
226-
return err
227-
}
218+
// Local runtime
219+
agentSource, err := config.Resolve(agentFileName, f.runConfig.EnvProvider())
220+
if err != nil {
221+
return err
222+
}
228223

229-
rt, sess, err = f.createLocalRuntimeAndSession(ctx, loadResult)
230-
if err != nil {
231-
return err
232-
}
224+
loadResult, err := f.loadAgentFrom(ctx, agentSource)
225+
if err != nil {
226+
return err
227+
}
233228

234-
// Setup cleanup for local runtime
235-
cleanup = func() {
236-
// Use a fresh context for cleanup since the original may be canceled
237-
cleanupCtx := context.WithoutCancel(ctx)
238-
if err := loadResult.Team.StopToolSets(cleanupCtx); err != nil {
239-
slog.Error("Failed to stop tool sets", "error", err)
240-
}
229+
rt, sess, err := f.createLocalRuntimeAndSession(ctx, loadResult)
230+
if err != nil {
231+
return err
232+
}
233+
cleanup := func() {
234+
cleanupCtx := context.WithoutCancel(ctx)
235+
if err := loadResult.Team.StopToolSets(cleanupCtx); err != nil {
236+
slog.Error("Failed to stop tool sets", "error", err)
241237
}
242238
}
243239
defer cleanup()
244240

245-
// Apply theme before TUI starts
246241
if tui {
247242
applyTheme()
248243
}
@@ -256,7 +251,25 @@ func (f *runExecFlags) runOrExec(ctx context.Context, out *cli.Printer, args []s
256251
return f.handleExecMode(ctx, out, rt, sess, args)
257252
}
258253

259-
return f.handleRunMode(ctx, rt, sess, args)
254+
opts, err := f.buildAppOpts(args)
255+
if err != nil {
256+
return err
257+
}
258+
259+
// Concurrent agents: use multi-session TUI
260+
if background.IsEnabled() {
261+
var sessStore session.Store
262+
switch typedRt := rt.(type) {
263+
case *runtime.LocalRuntime:
264+
sessStore = typedRt.SessionStore()
265+
case *runtime.PersistentRuntime:
266+
sessStore = typedRt.SessionStore()
267+
}
268+
269+
return runMultiSessionTUI(ctx, rt, sess, f.createSessionSpawner(agentSource, sessStore), cleanup, opts...)
270+
}
271+
272+
return runTUI(ctx, rt, sess, opts...)
260273
}
261274

262275
func (f *runExecFlags) loadAgentFrom(ctx context.Context, agentSource config.Source) (*teamloader.LoadResult, error) {
@@ -456,12 +469,34 @@ func readInitialMessage(args []string) (*string, error) {
456469
return &args[1], nil
457470
}
458471

459-
func (f *runExecFlags) handleRunMode(ctx context.Context, rt runtime.Runtime, sess *session.Session, args []string) error {
460-
firstMessage, err := readInitialMessage(args)
472+
func (f *runExecFlags) launchTUI(ctx context.Context, out *cli.Printer, rt runtime.Runtime, sess *session.Session, args []string, tui bool) error {
473+
if tui {
474+
applyTheme()
475+
}
476+
477+
if f.dryRun {
478+
out.Println("Dry run mode enabled. Agent initialized but will not execute.")
479+
return nil
480+
}
481+
482+
if !tui {
483+
return f.handleExecMode(ctx, out, rt, sess, args)
484+
}
485+
486+
opts, err := f.buildAppOpts(args)
461487
if err != nil {
462488
return err
463489
}
464490

491+
return runTUI(ctx, rt, sess, opts...)
492+
}
493+
494+
func (f *runExecFlags) buildAppOpts(args []string) ([]app.Opt, error) {
495+
firstMessage, err := readInitialMessage(args)
496+
if err != nil {
497+
return nil, err
498+
}
499+
465500
var opts []app.Opt
466501
if firstMessage != nil {
467502
opts = append(opts, app.WithFirstMessage(*firstMessage))
@@ -472,8 +507,74 @@ func (f *runExecFlags) handleRunMode(ctx context.Context, rt runtime.Runtime, se
472507
if f.exitAfterResponse {
473508
opts = append(opts, app.WithExitAfterFirstResponse())
474509
}
510+
return opts, nil
511+
}
475512

476-
return runTUI(ctx, rt, sess, opts...)
513+
// createSessionSpawner creates a function that can spawn new sessions with different working directories.
514+
func (f *runExecFlags) createSessionSpawner(agentSource config.Source, sessStore session.Store) background.SessionSpawner {
515+
return func(spawnCtx context.Context, workingDir string) (*app.App, *session.Session, func(), error) {
516+
// Create a copy of the runtime config with the new working directory
517+
runConfigCopy := f.runConfig.Clone()
518+
runConfigCopy.WorkingDir = workingDir
519+
520+
// Load team with the new working directory
521+
loadResult, err := teamloader.LoadWithConfig(spawnCtx, agentSource, runConfigCopy, teamloader.WithModelOverrides(f.modelOverrides))
522+
if err != nil {
523+
return nil, nil, nil, err
524+
}
525+
526+
team := loadResult.Team
527+
agent, err := team.Agent(f.agentName)
528+
if err != nil {
529+
return nil, nil, nil, err
530+
}
531+
532+
// Create model switcher config
533+
modelSwitcherCfg := &runtime.ModelSwitcherConfig{
534+
Models: loadResult.Models,
535+
Providers: loadResult.Providers,
536+
ModelsGateway: runConfigCopy.ModelsGateway,
537+
EnvProvider: runConfigCopy.EnvProvider(),
538+
AgentDefaultModels: loadResult.AgentDefaultModels,
539+
}
540+
541+
// Create the local runtime
542+
localRt, err := runtime.New(team,
543+
runtime.WithSessionStore(sessStore),
544+
runtime.WithCurrentAgent(f.agentName),
545+
runtime.WithTracer(otel.Tracer(AppName)),
546+
runtime.WithModelSwitcherConfig(modelSwitcherCfg),
547+
)
548+
if err != nil {
549+
return nil, nil, nil, err
550+
}
551+
552+
// Create a new session
553+
newSess := session.New(
554+
session.WithMaxIterations(agent.MaxIterations()),
555+
session.WithToolsApproved(f.autoApprove),
556+
session.WithThinking(agent.ThinkingConfigured()),
557+
session.WithWorkingDir(workingDir),
558+
)
559+
560+
// Create cleanup function
561+
cleanup := func() {
562+
cleanupCtx := context.WithoutCancel(spawnCtx)
563+
_ = team.StopToolSets(cleanupCtx)
564+
}
565+
566+
// Create the app
567+
var appOpts []app.Opt
568+
if pr, ok := localRt.(*runtime.PersistentRuntime); ok {
569+
if model := pr.CurrentAgent().Model(); model != nil {
570+
appOpts = append(appOpts, app.WithTitleGenerator(sessiontitle.New(model)))
571+
}
572+
}
573+
574+
a := app.New(spawnCtx, localRt, newSess, appOpts...)
575+
576+
return a, newSess, cleanup, nil
577+
}
477578
}
478579

479580
// applyTheme applies the theme from user config, or the built-in default.

pkg/app/app.go

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -339,10 +339,13 @@ func (a *App) Run(ctx context.Context, cancel context.CancelFunc, message string
339339
a.session.AddMessage(session.UserMessage(message))
340340
}
341341
for event := range a.runtime.RunStream(ctx, a.session) {
342-
// If context is cancelled, continue draining but don't forward events.
343-
// This prevents the runtime from blocking on event sends.
342+
// If context is cancelled, continue draining but don't forward events
343+
// — except StreamStoppedEvent, which must always propagate so the
344+
// supervisor can mark the session as no longer running.
344345
if ctx.Err() != nil {
345-
continue
346+
if _, ok := event.(*runtime.StreamStoppedEvent); !ok {
347+
continue
348+
}
346349
}
347350

348351
// Clear titleGenerating flag when title is generated (from server for remote runtime)
@@ -379,10 +382,13 @@ func (a *App) RunWithMessage(ctx context.Context, cancel context.CancelFunc, msg
379382
go func() {
380383
a.session.AddMessage(msg)
381384
for event := range a.runtime.RunStream(ctx, a.session) {
382-
// If context is cancelled, continue draining but don't forward events.
383-
// This prevents the runtime from blocking on event sends.
385+
// If context is cancelled, continue draining but don't forward events
386+
// — except StreamStoppedEvent, which must always propagate so the
387+
// supervisor can mark the session as no longer running.
384388
if ctx.Err() != nil {
385-
continue
389+
if _, ok := event.(*runtime.StreamStoppedEvent); !ok {
390+
continue
391+
}
386392
}
387393

388394
// Clear titleGenerating flag when title is generated (from server for remote runtime)
@@ -401,6 +407,14 @@ func (a *App) RunBangCommand(ctx context.Context, command string) {
401407
}
402408

403409
func (a *App) Subscribe(ctx context.Context, program *tea.Program) {
410+
a.SubscribeWith(ctx, program.Send)
411+
}
412+
413+
// SubscribeWith subscribes to app events using a custom send function.
414+
// This allows callers to wrap or transform messages before sending them
415+
// to the Bubble Tea program. It's used by the concurrent agents feature
416+
// to tag events with their session ID for proper routing.
417+
func (a *App) SubscribeWith(ctx context.Context, send func(tea.Msg)) {
404418
throttledChan := a.throttleEvents(ctx, a.events)
405419
for {
406420
select {
@@ -411,7 +425,7 @@ func (a *App) Subscribe(ctx context.Context, program *tea.Program) {
411425
return
412426
}
413427

414-
program.Send(msg)
428+
send(msg)
415429
}
416430
}
417431
}

pkg/config/resolve_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ agents:
8686
}
8787

8888
func TestResolveAgentFile_EmptyIsDefault(t *testing.T) {
89-
t.Parallel()
89+
home := t.TempDir()
90+
t.Setenv("HOME", home)
9091

9192
resolved, err := resolve("")
9293

@@ -95,7 +96,8 @@ func TestResolveAgentFile_EmptyIsDefault(t *testing.T) {
9596
}
9697

9798
func TestResolveAgentFile_DefaultIsDefault(t *testing.T) {
98-
t.Parallel()
99+
home := t.TempDir()
100+
t.Setenv("HOME", home)
99101

100102
resolved, err := resolve("default")
101103

pkg/tui/background/feature.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
// Package background provides concurrent agents feature support.
2+
package background
3+
4+
import (
5+
"os"
6+
)
7+
8+
const (
9+
// EnvConcurrentAgents is the environment variable to enable concurrent agents.
10+
EnvConcurrentAgents = "CAGENT_EXPERIMENTAL_CONCURRENT_AGENTS"
11+
)
12+
13+
// IsEnabled returns true if the concurrent agents feature is enabled.
14+
func IsEnabled() bool {
15+
return os.Getenv(EnvConcurrentAgents) == "1"
16+
}

0 commit comments

Comments
 (0)