Skip to content

Commit 262c1f1

Browse files
docs(eino): update agent cancel and turnloop quickstart (#1550)
1 parent 0927602 commit 262c1f1

2 files changed

Lines changed: 368 additions & 192 deletions

File tree

content/en/docs/eino/core_modules/eino_adk/agent_cancel_and_turnloop_quickstart.md

Lines changed: 184 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
---
22
Description: ""
3-
date: "2026-05-17"
3+
date: "2026-05-22"
44
lastmod: ""
55
tags: []
66
title: Agent Cancel and TurnLoop Quick Start
@@ -14,19 +14,15 @@ A quick start guide for the two core features in Eino ADK: **Agent Cancel** and
1414
All examples in this document use the following generic instantiations:
1515

1616
- `T = string` (the business item type pushed to TurnLoop)
17-
- `M = *schema.Message` (the Agent message type, i.e., the standard `Message`)
18-
19-
ADK type aliases:
17+
- `M = *schema.Message` (the Agent message type, i.e., the standard `Message`) ADK type aliases:
2018

2119
```go
2220
type Agent = TypedAgent[*schema.Message]
2321
type AgentInput = TypedAgentInput[*schema.Message]
2422
type AgentEvent = TypedAgentEvent[*schema.Message]
2523
```
2624

27-
When using `*schema.AgenticMessage`, simply replace `M` with the corresponding type—all API signatures are completely symmetric.
28-
29-
---
25+
## When using `*schema.AgenticMessage`, simply replace `M` with the corresponding type—all API signatures are completely symmetric.
3026

3127
## Part 1: Agent Cancel
3228

@@ -130,8 +126,6 @@ Build a continuously running agent service: users send messages at any time, the
130126

131127
### Turn Lifecycle
132128

133-
<a href="/img/eino/XrWqwC669hGGoibW1q3c2ToTnvf.png" target="_blank"><img src="/img/eino/XrWqwC669hGGoibW1q3c2ToTnvf.png" width="100%" /></a>
134-
135129
### Basic Usage
136130

137131
```go
@@ -406,105 +400,199 @@ On normal exit (without saving a new checkpoint), TurnLoop will attempt to delet
406400

407401
## Part 4: Complete Example
408402

409-
Simulates a chat service supporting priority scheduling, preemption, and checkpoint recovery:
403+
Simulates a chat service supporting priority scheduling, preemption, and checkpoint recovery. This example can be compiled and run directly (replace `myModel` with a real ChatModel implementation).
410404

411405
```go
412406
package main
413407

414408
import (
415-
"context"
416-
"log"
417-
"strings"
418-
"time"
419-
420-
"github.com/cloudwego/eino/adk"
421-
"github.com/cloudwego/eino/schema"
409+
"context"
410+
"fmt"
411+
"log"
412+
"sort"
413+
"strings"
414+
"sync"
415+
"time"
416+
417+
"github.com/cloudwego/eino/adk"
418+
"github.com/cloudwego/eino/schema"
422419
)
423420

424-
func main() {
425-
ctx := context.Background()
426-
store := adk.NewInMemoryStore()
427-
428-
cfg := adk.TurnLoopConfig[string, *schema.Message]{
429-
GenInput: func(ctx context.Context, loop *adk.TurnLoop[string, *schema.Message], items []string) (*adk.GenInputResult[string, *schema.Message], error) {
430-
// Sort by priority, consume only the first item, keep the rest for subsequent turns
431-
sorted := sortByPriority(items)
432-
return &adk.GenInputResult[string, *schema.Message]{
433-
Input: &adk.AgentInput{Messages: []*schema.Message{schema.UserMessage(sorted[0])}},
434-
Consumed: sorted[:1],
435-
Remaining: sorted[1:], // Items not in either will be discarded
436-
}, nil
437-
},
438-
439-
GenResume: func(ctx context.Context, loop *adk.TurnLoop[string, *schema.Message], interruptedItems, unhandledItems, newItems []string) (*adk.GenResumeResult[string, *schema.Message], error) {
440-
all := append(append(interruptedItems, unhandledItems...), newItems...)
441-
return &adk.GenResumeResult[string, *schema.Message]{
442-
Consumed: all[:1],
443-
Remaining: all[1:],
444-
}, nil
445-
},
446-
447-
PrepareAgent: func(ctx context.Context, loop *adk.TurnLoop[string, *schema.Message], consumed []string) (adk.Agent, error) {
448-
return buildAgent(consumed), nil
449-
},
450-
451-
OnAgentEvents: func(ctx context.Context, tc *adk.TurnContext[string, *schema.Message], events *adk.AsyncIterator[*adk.AgentEvent]) error {
452-
for {
453-
event, ok := events.Next()
454-
if !ok {
455-
break
456-
}
457-
// Detect preemption/stop signals for cleanup
458-
select {
459-
case <-tc.Preempted:
460-
log.Println("Preempted by higher priority message")
461-
case <-tc.Stopped:
462-
log.Printf("Service shutting down: %s", tc.StopCause())
463-
default:
464-
}
465-
if event.Err != nil {
466-
// Don't propagate CancelError, framework handles it automatically
467-
return event.Err
468-
}
469-
log.Printf("[%s] %s", event.AgentName, extractText(event))
470-
}
471-
return nil
472-
},
421+
// --- 1. Implement CheckPointStore interface ---
473422

474-
Store: store,
475-
CheckpointID: "chat-session-001",
476-
}
423+
type InMemoryStore struct {
424+
mu sync.Mutex
425+
m map[string][]byte
426+
}
427+
428+
func NewInMemoryStore() *InMemoryStore {
429+
return &InMemoryStore{m: make(map[string][]byte)}
430+
}
431+
432+
func (s *InMemoryStore) Get(_ context.Context, id string) ([]byte, bool, error) {
433+
s.mu.Lock()
434+
defer s.mu.Unlock()
435+
data, ok := s.m[id]
436+
return data, ok, nil
437+
}
477438

478-
loop := adk.NewTurnLoop(cfg)
479-
loop.Push("Hello, help me check the weather")
480-
loop.Run(ctx)
481-
482-
// Send urgent message to preempt after 1 second
483-
time.AfterFunc(1*time.Second, func() {
484-
loop.Push("Stop! Handle this urgent issue first",
485-
adk.WithPreempt[string, *schema.Message](adk.AnySafePoint),
486-
)
487-
})
488-
489-
// Graceful shutdown after 5 seconds
490-
time.AfterFunc(5*time.Second, func() {
491-
loop.Stop(
492-
adk.WithGracefulTimeout(3*time.Second),
493-
adk.WithStopCause("service shutdown"),
494-
)
495-
})
496-
497-
result := loop.Wait()
498-
log.Printf("Exit reason: %v", result.ExitReason)
499-
log.Printf("Unhandled messages: %v", result.UnhandledItems)
500-
log.Printf("Stop cause: %s", result.StopCause)
501-
log.Printf("checkpoint: attempted=%v, err=%v", result.CheckpointAttempted, result.CheckpointErr)
502-
503-
// Next startup with the same cfg will automatically resume from checkpoint
439+
func (s *InMemoryStore) Set(_ context.Context, id string, data []byte) error {
440+
s.mu.Lock()
441+
defer s.mu.Unlock()
442+
s.m[id] = data
443+
return nil
444+
}
445+
446+
// Optional: implement CheckPointDeleter to support automatic cleanup of expired checkpoints
447+
func (s *InMemoryStore) Delete(_ context.Context, id string) error {
448+
s.mu.Lock()
449+
defer s.mu.Unlock()
450+
delete(s.m, id)
451+
return nil
452+
}
453+
454+
// --- 2. Implement a minimal Agent (use adk.NewChatModelAgent in production) ---
455+
456+
type echoAgent struct{}
457+
458+
func (a *echoAgent) Name(_ context.Context) string { return "EchoAgent" }
459+
func (a *echoAgent) Description(_ context.Context) string { return "echoes input" }
460+
461+
func (a *echoAgent) Run(ctx context.Context, input *adk.AgentInput, _ ...adk.AgentRunOption) *adk.AsyncIterator[*adk.AgentEvent] {
462+
iter, gen := adk.NewAsyncIteratorPair[*adk.AgentEvent]()
463+
go func() {
464+
defer gen.Close()
465+
// Simulate time-consuming processing
466+
select {
467+
case <-time.After(500 * time.Millisecond):
468+
case <-ctx.Done():
469+
gen.Send(&adk.AgentEvent{Err: ctx.Err()})
470+
return
471+
}
472+
// Return echo result
473+
reply := "Echo: "
474+
if len(input.Messages) > 0 {
475+
reply += input.Messages[0].Content
476+
}
477+
gen.Send(&adk.AgentEvent{
478+
AgentName: "EchoAgent",
479+
Output: &adk.AgentOutput{
480+
MessageOutput: &adk.MessageVariant{
481+
Message: schema.AssistantMessage(reply, nil),
482+
},
483+
},
484+
})
485+
}()
486+
return iter
487+
}
488+
489+
// --- 3. Priority sorting helper function ---
490+
491+
func sortByPriority(items []string) []string {
492+
sorted := make([]string, len(items))
493+
copy(sorted, items)
494+
sort.SliceStable(sorted, func(i, j int) bool {
495+
// Items starting with "!" are treated as high priority
496+
return strings.HasPrefix(sorted[i], "!") && !strings.HasPrefix(sorted[j], "!")
497+
})
498+
return sorted
499+
}
500+
501+
// --- 4. Main flow ---
502+
503+
func main() {
504+
ctx := context.Background()
505+
store := NewInMemoryStore()
506+
agent := &echoAgent{}
507+
508+
cfg := adk.TurnLoopConfig[string, *schema.Message]{
509+
GenInput: func(ctx context.Context, loop *adk.TurnLoop[string, *schema.Message], items []string) (*adk.GenInputResult[string, *schema.Message], error) {
510+
// Sort by priority, consume only the first item, keep the rest for subsequent turns
511+
sorted := sortByPriority(items)
512+
return &adk.GenInputResult[string, *schema.Message]{
513+
Input: &adk.AgentInput{Messages: []*schema.Message{schema.UserMessage(sorted[0])}},
514+
Consumed: sorted[:1],
515+
Remaining: sorted[1:],
516+
}, nil
517+
},
518+
519+
GenResume: func(ctx context.Context, loop *adk.TurnLoop[string, *schema.Message], interruptedItems, unhandledItems, newItems []string) (*adk.GenResumeResult[string, *schema.Message], error) {
520+
all := append(append(interruptedItems, unhandledItems...), newItems...)
521+
return &adk.GenResumeResult[string, *schema.Message]{
522+
Consumed: all[:1],
523+
Remaining: all[1:],
524+
}, nil
525+
},
526+
527+
PrepareAgent: func(ctx context.Context, loop *adk.TurnLoop[string, *schema.Message], consumed []string) (adk.Agent, error) {
528+
return agent, nil
529+
},
530+
531+
OnAgentEvents: func(ctx context.Context, tc *adk.TurnContext[string, *schema.Message], events *adk.AsyncIterator[*adk.AgentEvent]) error {
532+
for {
533+
event, ok := events.Next()
534+
if !ok {
535+
break
536+
}
537+
// Detect preemption/stop signals for cleanup
538+
select {
539+
case <-tc.Preempted:
540+
log.Println("Preempted by higher priority message")
541+
case <-tc.Stopped:
542+
log.Printf("Service shutting down: %s", tc.StopCause())
543+
default:
544+
}
545+
if event.Err != nil {
546+
// Don't propagate CancelError, framework handles it automatically
547+
return event.Err
548+
}
549+
if event.Output != nil && event.Output.MessageOutput != nil {
550+
fmt.Printf("[%s] %s\n", event.AgentName, event.Output.MessageOutput.Message.Content)
551+
}
552+
}
553+
return nil
554+
},
555+
556+
Store: store,
557+
CheckpointID: "session-123",
558+
}
559+
560+
// First run
561+
loop := adk.NewTurnLoop(cfg)
562+
loop.Push("normal message")
563+
loop.Push("low priority task")
564+
loop.Run(ctx)
565+
566+
// Simulate pushing an urgent message after a delay (triggers preemption)
567+
time.AfterFunc(200*time.Millisecond, func() {
568+
accepted, ack := loop.Push("!urgent message",
569+
adk.WithPreempt[string, *schema.Message](adk.AnySafePoint),
570+
)
571+
if accepted {
572+
<-ack
573+
log.Println("Preemption signal acknowledged")
574+
}
575+
})
576+
577+
// Graceful stop after 2 seconds
578+
time.AfterFunc(2*time.Second, func() {
579+
loop.Stop(
580+
adk.WithGraceful(),
581+
adk.WithStopCause("demo timeout"),
582+
)
583+
})
584+
585+
result := loop.Wait()
586+
fmt.Printf("Exit reason: %v\n", result.ExitReason)
587+
fmt.Printf("Stop cause: %s\n", result.StopCause)
588+
fmt.Printf("checkpoint: attempted=%v, err=%v\n", result.CheckpointAttempted, result.CheckpointErr)
589+
590+
// Second run (same cfg, containing the same CheckpointID) will automatically resume from checkpoint
504591
}
505592
```
506593

507-
---
594+
> 💡
595+
> In production, replace `echoAgent` with `adk.NewChatModelAgent(ctx, &adk.ChatModelAgentConfig{...})`. The `CheckPointStore` implementation can use Redis / database or other persistence solutions.
508596
509597
## FAQ
510598

0 commit comments

Comments
 (0)