-
-
Notifications
You must be signed in to change notification settings - Fork 12
Streaming Tool Call Architecture
This document specifies how the dartantic_ai compatibility layer handles streaming messages and tool calls across different providers using the new orchestrator-based architecture.
- Architecture Overview
- Core Concepts
- Provider Capabilities
- Streaming Patterns
- Orchestration Layer
- Tool Execution Layer
- State Management
- Implementation Details
- Provider-Specific Details
- Testing and Validation
The system operates through a six-layer architecture with specialized components for streaming and tool execution:
flowchart TB
subgraph API["API Layer"]
A1["Agent: User-facing interface (run/runStream)"]
A2["Orchestrator selection and coordination"]
A3["Final result formatting and validation"]
end
subgraph ORCH["Orchestration Layer"]
O1["StreamingOrchestrator: Workflow coordination"]
O2["Business logic and streaming management"]
O3["Provider-agnostic tool execution orchestration"]
O4["Streaming state transitions and UX enhancement"]
end
subgraph PABS["Provider Abstraction Layer"]
P1["ChatModel: Provider-agnostic interface"]
P2["MessageAccumulator: Provider-specific message accumulation"]
P3["ProviderCaps: Capability-based feature detection"]
end
subgraph PIMP["Provider Implementation Layer"]
I1["Provider-specific models and mappers"]
I2["Protocol-specific streaming accumulation"]
I3["Tool ID assignment and argument handling"]
end
subgraph INFRA["Infrastructure Layer"]
N1["ToolExecutor: Centralized tool execution"]
N2["StreamingState: Mutable state encapsulation"]
N3["Resource management via try/finally patterns"]
N4["RetryHttpClient: Cross-cutting HTTP concerns"]
end
subgraph PROTO["Protocol Layer"]
R1["HTTP clients for each provider"]
R2["Network communication and error handling"]
R3["Request/response serialization"]
end
API --> ORCH
ORCH --> PABS
PABS --> PIMP
PIMP --> INFRA
INFRA --> PROTO
style API fill:#f9f,stroke:#333,stroke-width:2px
style ORCH fill:#bbf,stroke:#333,stroke-width:2px
style PABS fill:#bfb,stroke:#333,stroke-width:2px
style PIMP fill:#fbf,stroke:#333,stroke-width:2px
style INFRA fill:#ffb,stroke:#333,stroke-width:2px
style PROTO fill:#fbb,stroke:#333,stroke-width:2px
A critical aspect of streaming is preserving metadata through the accumulation process:
-
During Streaming: Metadata flows through chunks via
ChatResult.metadata -
Message Accumulation: The
MessageAccumulatormerges metadata from chunks:- Text parts are accumulated and consolidated
- Metadata from each chunk is merged into the accumulated message
- Provider-specific metadata (e.g., session IDs) must be preserved
-
Final Message Construction: When streaming completes:
-
With streamed text: Provider returns metadata-only message in
outputfield (empty parts array but full metadata) to avoid text duplication - Without streaming: Provider returns complete message with metadata
- The orchestrator's accumulator merges this final metadata
-
With streamed text: Provider returns metadata-only message in
This ensures critical metadata like OpenAI Responses' _responses_session is preserved
for features like session persistence.
- Text Streaming: Immediate output of text chunks to users
- Tool Accumulation: Building complete tool calls across chunks
- Message Boundaries: Preserving complete messages in history
- Streaming UX: Visual separation between tool calls and responses
- Tool Calls: LLM-initiated function invocations with structured arguments
- Tool Results: Responses from tool execution returned to LLM
- ID Matching: Ensuring tool calls and results are properly paired
- Error Handling: Tool errors returned to LLM for recovery
| Provider | Streaming | Tools | Tool IDs | Streaming Format |
|---|---|---|---|---|
| OpenAI | ✅ | ✅ | ✅ | Partial chunks |
| OpenAI Responses | ✅ | ✅ | ✅ | Event-based (stateful) |
| OpenRouter | ✅ | ✅ | ✅ | OpenAI-compatible |
| Anthropic | ✅ | ✅ | ✅ | Event-based |
| ✅ | ✅ | ❌ | Complete chunks | |
| Ollama | ✅ | ✅ | ❌ | Complete chunks |
| Mistral | ✅ | ❌ | N/A | Text only |
| Cohere | ✅ | ✅ | ✅ | Custom format |
| Together | ✅ | ✅ | ✅ | OpenAI-compatible |
Tool calls are built incrementally across multiple chunks, with arguments accumulating across chunks.
Each chunk contains complete information with already parsed arguments.
Structured event sequence with state tracking across events.
The orchestration layer coordinates streaming workflows through the StreamingOrchestrator interface.
ChatModel model,
StreamingState state, {
JsonSchema? outputSchema,
});
/// Finalize the orchestrator after streaming completes void finalize(StreamingState state); }
### Orchestrator Selection
The Agent selects the appropriate orchestrator based on request characteristics:
```dart
StreamingOrchestrator _selectOrchestrator({
JsonSchema? outputSchema,
List<Tool>? tools,
}) {
// Select TypedOutputStreamingOrchestrator for structured output
if (outputSchema != null) {
return const TypedOutputStreamingOrchestrator();
}
// Default orchestrator for regular chat and tool calls
return const DefaultStreamingOrchestrator();
}
Handles standard streaming patterns:
- Stream Model Response: Process chunks until stream closes
- Message Accumulation: Use MessageAccumulator strategy
- Tool Detection: Identify tool calls in consolidated message
- Tool Execution: Delegate to ToolExecutor
- Continuation Logic: Loop until no more tool calls
// Main iteration pattern
await for (final result in model.sendStream(state.conversationHistory)) {
// Stream text chunks immediately
if (textOutput.isNotEmpty) {
yield StreamingIterationResult(output: textOutput, shouldContinue: true);
}
// Accumulate complete message
state.accumulatedMessage = state.accumulator.accumulate(
state.accumulatedMessage,
result.output,
);
}
// Process consolidated message
final consolidatedMessage = state.accumulator.consolidate(state.accumulatedMessage);
final toolCalls = consolidatedMessage.parts.whereType<ToolPart>()
.where((p) => p.kind == ToolPartKind.call).toList();
if (toolCalls.isNotEmpty) {
// Execute tools and continue streaming
final results = await state.executor.executeBatch(toolCalls, state.toolMap);
// ... add results to conversation and continue
}Centralized tool execution with robust error handling:
class ToolExecutor {
/// Execute multiple tools sequentially
Future<List<ToolExecutionResult>> executeBatch(
List<ToolPart> toolCalls,
Map<String, Tool> toolMap,
);
/// Execute a single tool with error handling
Future<ToolExecutionResult> executeSingle(
ToolPart toolCall,
Map<String, Tool> toolMap,
);
}The ToolExecutor handles tool execution with robust error handling, argument extraction, and result consolidation.
flowchart TD
A[Tool Calls from LLM] --> B{For Each Tool Call}
B --> C[Look up Tool in Map]
C --> D{Tool Found?}
D -->|No| E[Create Error Result]
D -->|Yes| F[Execute tool.call]
F --> G{Execution Success?}
G -->|Yes| H[Format Result as JSON]
G -->|No| I[Format Error as JSON]
E --> J[Create ToolPart.result]
H --> J
I --> J
J --> K[Add to Results]
K --> B
K --> L[Consolidate All Results]
L --> M[Create User Message]
style A fill:#f9f,stroke:#333,stroke-width:2px
style M fill:#9f9,stroke:#333,stroke-width:2px
sequenceDiagram
participant S as Stream
participant A as Accumulator
participant C as Consolidator
participant H as History
S->>A: Chunk 1 (Text: "I'll help")
A->>A: Add TextPart
S->>A: Chunk 2 (Text: " you with")
A->>A: Append to existing TextPart
S->>A: Chunk 3 (Tool: get_weather partial)
A->>A: Store partial tool
S->>A: Chunk 4 (Tool: get_weather complete)
A->>A: Merge tool arguments
S->>C: Stream ends
C->>C: Consolidate TextParts
C->>C: Finalize ToolParts
C->>H: Add complete message
Note over C: Single TextPart: "I'll help you with"
Note over C: Complete ToolPart with parsed args
Encapsulates all mutable state during streaming operations:
class StreamingState {
/// Conversation history being built during streaming
final List<ChatMessage> conversationHistory;
/// Available tools mapped by name
final Map<String, Tool> toolMap;
/// Provider-specific message accumulation
final MessageAccumulator accumulator;
/// Tool execution handler
final ToolExecutor executor;
/// Tool ID coordination across conversation
final ToolIdCoordinator toolIdCoordinator;
/// Workflow control flags
bool done = false;
bool shouldPrefixNextMessage = false;
bool isFirstChunkOfMessage = true;
/// Current message being accumulated from stream
ChatMessage accumulatedMessage;
/// Last result from model stream
ChatResult<ChatMessage> lastResult;
}stateDiagram-v2
[*] --> Initialization: Create StreamingState
Initialization --> MessageReset: Start iteration
MessageReset --> Accumulation: Stream chunks
Accumulation --> Accumulation: More chunks
Accumulation --> Consolidation: Stream ends
Consolidation --> ToolDetection: Check for tools
ToolDetection --> ToolExecution: Tools found
ToolDetection --> Complete: No tools
ToolExecution --> UpdateHistory: Add results
UpdateHistory --> ContinuationCheck: More needed?
ContinuationCheck --> MessageReset: Continue
ContinuationCheck --> Complete: Done
Complete --> [*]
- Initialization: Create state with conversation history and tools
- Message Reset: Clear accumulated message before each model call
- Accumulation: Build message from streaming chunks
- Consolidation: Finalize message and extract tool calls
- Tool Execution: Process tools and update conversation
- Continuation: Check if more streaming needed
// Streaming UX enhancement tracking
bool _shouldPrefixNewline(StreamingState state) {
return state.shouldPrefixNextMessage && state.isFirstChunkOfMessage;
}
// Update state after streaming text
void _updateStreamingState(StreamingState state, String textOutput) {
if (textOutput.isNotEmpty) {
state.isFirstChunkOfMessage = false;
}
}
// Set UX flags after tool execution
void _setToolExecutionFlags(StreamingState state) {
state.shouldPrefixNextMessage = true; // Next AI message needs newline prefix
}For providers without tool IDs (Google, Ollama):
// In mapper
final toolId = Uuid().v4();
return ToolPart.call(
id: toolId,
name: functionCall.name,
arguments: functionCall.args,
);flowchart LR
A[Provider Response] --> B{Has Tool ID?}
B -->|Yes| C[Use Provider ID]
B -->|No| D[Generate UUID]
C --> E[Create ToolPart.call]
D --> E
E --> F[LLM Processes]
F --> G[Tool Execution]
G --> H[Create ToolPart.result]
H --> I{Match IDs}
I -->|Same ID| J[Pair Call & Result]
style A fill:#f9f,stroke:#333,stroke-width:2px
style J fill:#9f9,stroke:#333,stroke-width:2px
All mappers handle arguments consistently:
// OpenAI mapper
// Accumulates raw JSON during streaming, parses when complete
arguments: json.decode(argumentsAccumulator.toString())
// Anthropic transformer
// Accumulates raw JSON during streaming, parses when complete
arguments: json.decode(argsJson)
// Google/Ollama mapper
// Arguments already parsed by provider
arguments: functionCall.argsTool execution errors are included in the consolidated tool result message:
catch (error, stackTrace) {
_logger.warning('Tool ${toolPart.name} execution failed: $error');
// Add error result part to collection
toolResultParts.add(
ToolPart.result(
id: toolPart.id,
name: toolPart.name,
result: json.encode({'error': error.toString()}),
),
);
}- Streaming: Partial chunks with index-based accumulation
- Tool IDs: Provided by API
- Arguments: Streamed as raw JSON string, parsed by mapper when complete
- ToolPart Creation: Only after streaming completes with parsed arguments
- Streaming: Event-based with stateful session management
-
Tool IDs: Provided by API with
call_idfield -
Arguments: Streamed as deltas via
function_call_arguments_deltaevents -
Session Management:
- Maintains tool execution state across session continuations
- Stores
responseIdfor resuming conversations - Handles pending tool outputs in session metadata
-
Tool Flow Differences:
- Validates message alternation before accepting tool results
- Supports server-side tools (web search, file search, code interpreter)
- Can resume mid-tool-execution with pending outputs
-
Special Features:
- Code interpreter with container management
- Web search with staged progress events
- File search with result telemetry
- MCP (Model Context Protocol) tool support
-
Implementation Notes:
- Uses OpenAIResponsesEventMapper to handle tool-related events
- Accumulates tool metadata in streaming state
- Preserves tool call IDs for proper result routing
- Streaming: Event-based with explicit stages
- Tool IDs: Provided by API
- Arguments: Accumulated via InputJsonBlockDelta
- Special: ContentBlockStop triggers emission
- Streaming: Complete chunks per message
- Tool IDs: Generated by mapper (UUID)
- Arguments: Provided as parsed objects
- Conversion: Mapper converts to JSON string
- Streaming: Complete chunks
- Tool IDs: Generated by mapper (UUID)
- Arguments: Provided as parsed objects
- Note: Both native and OpenAI-compatible endpoints
- Streaming: Custom format with <|python_tag|>
- Tool IDs: Provided by API
- Arguments: Special parsing for "null" string
- Edge Case: Sends "null" for parameterless tools
- Streaming Integrity: No dropped chunks or text
- Tool Accumulation: Arguments built correctly across chunks
- ID Matching: Tool calls and results properly paired
- Error Recovery: Tool errors handled gracefully
- UX Features: Newline prefixing works correctly
- Message History Validation: User/model alternation maintained
- Tool Result Consolidation: Multiple results in single message
// debug_streaming_tool_calls.dart
// Tests streaming with multiple tool calls
// debug_tool_accumulation.dart
// Verifies argument accumulation across chunks-
Empty Arguments: Some providers send
arguments: {}- handled as empty map -
Parameterless Tools: Cohere sends
"null"string - parsed to empty map - Multiple Same Tools: Ensure IDs differentiate calls
- Streaming Interruption: Partial tool calls not exposed until complete
- Streaming First: Optimize for real-time user experience
- Orchestrator Coordination: Complex workflows handled by specialized orchestrators
- State Encapsulation: All mutable state isolated in StreamingState
- Provider Abstraction: MessageAccumulator handles provider-specific streaming
- Provider Abstraction: Agent and orchestrators agnostic to provider details
- Complete Tool Calls: ToolPart only created when arguments are fully parsed
- Resource Management: Guaranteed cleanup through try/finally patterns
- Error Transparency: Tool errors returned to LLM with full context
- Clean Separation: Each layer has focused responsibilities
- Maintainability: 56% reduction in Agent complexity (1,091 → 475 lines)
- Testability: Each component can be tested in isolation
- Extensibility: New orchestrators and executors can be added without changing core logic
- Debugging: Clear layer boundaries make issue isolation easier
- Resource Safety: Centralized lifecycle management prevents leaks
- Provider Isolation: Quirks contained in implementation layers
- Workflow Specialization: Different orchestrators for different use cases
- Provider Agnostic: Same orchestrator works across all providers
- Streaming Optimization: Purpose-built for streaming coordination
- State Management: Clean state transitions and isolation
- UX Enhancement: Consistent streaming experience across providers
- Parallel Tool Execution: ParallelToolExecutor for concurrent tool calls
- Custom Orchestrators: Provider-specific orchestrators for unique workflows
- Advanced State Management: Persistent state across conversations
- Performance Monitoring: Built-in metrics and monitoring hooks
- Caching Integration: Smart caching strategies in orchestration layer
- New Orchestrators: Specialized workflows (e.g., multi-step reasoning)
- Tool Execution: Extensions to the ToolExecutor class
- Message Accumulators: Novel streaming patterns and optimizations
- State Managers: Advanced state persistence and recovery
- Lifecycle Hooks: Custom resource management and cleanup logic
class CustomStreamingOrchestrator implements StreamingOrchestrator {
@override
String get providerHint => 'custom';
@override
void initialize(StreamingState state) {
// Custom initialization logic
}
@override
Stream<StreamingIterationResult> processIteration(
ChatModel<ChatModelOptions> model,
StreamingState state, {
JsonSchema? outputSchema,
}) async* {
// Custom streaming workflow
}
@override
void finalize(StreamingState state) {
// Custom cleanup logic
}
}// Example: Adding parallel execution to ToolExecutor
class ParallelToolExecutor extends ToolExecutor {
@override
Future<List<ToolExecutionResult>> executeBatch(
List<ToolPart> toolCalls,
Map<String, Tool> toolMap,
) async {
// Execute tools in parallel using Future.wait
final futures = toolCalls.map((call) => executeSingle(call, toolMap));
return await Future.wait(futures);
}
}class OptimizedMessageAccumulator extends MessageAccumulator {
@override
String get providerHint => 'optimized';
@override
ChatMessage accumulate(ChatMessage existing, ChatMessage newChunk) {
// Provider-specific accumulation optimizations
}
@override
ChatMessage consolidate(ChatMessage accumulated) {
// Final message processing and optimization
}
}