Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 56 additions & 32 deletions docs/agents.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,18 @@ print(result.output)

## Running Agents

There are four ways to run an agent:
There are five ways to run an agent:

1. [`agent.run()`][pydantic_ai.agent.AbstractAgent.run] — an async function which returns a [`RunResult`][pydantic_ai.agent.AgentRunResult] containing a completed response.
2. [`agent.run_sync()`][pydantic_ai.agent.AbstractAgent.run_sync] — a plain, synchronous function which returns a [`RunResult`][pydantic_ai.agent.AgentRunResult] containing a completed response (internally, this just calls `loop.run_until_complete(self.run())`).
3. [`agent.run_stream()`][pydantic_ai.agent.AbstractAgent.run_stream] — an async context manager which returns a [`StreamedRunResult`][pydantic_ai.result.StreamedRunResult], which contains methods to stream text and structured output as an async iterable.
4. [`agent.iter()`][pydantic_ai.Agent.iter] — a context manager which returns an [`AgentRun`][pydantic_ai.agent.AgentRun], an async-iterable over the nodes of the agent's underlying [`Graph`][pydantic_graph.graph.Graph].
4. [`agent.run_stream_events()`][pydantic_ai.agent.AbstractAgent.run_stream_events] — a function which returns an async iterable of [`AgentStreamEvent`s][pydantic_ai.messages.AgentStreamEvent] and a [`AgentRunResultEvent`][pydantic_ai.run.AgentRunResultEvent] containing the final run result.
5. [`agent.iter()`][pydantic_ai.Agent.iter] — a context manager which returns an [`AgentRun`][pydantic_ai.agent.AgentRun], an async iterable over the nodes of the agent's underlying [`Graph`][pydantic_graph.graph.Graph].

Here's a simple example demonstrating the first three:
Here's a simple example demonstrating the first four:

```python {title="run_agent.py"}
from pydantic_ai import Agent
from pydantic_ai import Agent, AgentRunResultEvent, AgentStreamEvent

agent = Agent('openai:gpt-4o')

Expand All @@ -91,6 +92,22 @@ async def main():
#> The capital of
#> The capital of the UK is
#> The capital of the UK is London.

events: list[AgentStreamEvent | AgentRunResultEvent] = []
async for event in agent.run_stream_events('What is the capital of Mexico?'):
events.append(event)
print(events)
"""
[
PartStartEvent(index=0, part=TextPart(content='The capital of ')),
FinalResultEvent(tool_name=None, tool_call_id=None),
PartDeltaEvent(index=0, delta=TextPartDelta(content_delta='Mexico is Mexico ')),
PartDeltaEvent(index=0, delta=TextPartDelta(content_delta='City.')),
AgentRunResultEvent(
result=AgentRunResult(output='The capital of Mexico is Mexico City.')
),
]
"""
```

_(This example is complete, it can be run "as is" — you'll need to add `asyncio.run(main())` to run `main`)_
Expand All @@ -105,13 +122,13 @@ It also takes an optional `event_stream_handler` argument that you can use to ga
The example below shows how to stream events and text output. You can also [stream structured output](output.md#streaming-structured-output).

!!! note
As the `run_stream()` method will consider the first output matching the `output_type` to be the final output,
As the `run_stream()` method will consider the first output matching the [output type](output.md#structured-output) to be the final output,
it will stop running the agent graph and will not execute any tool calls made by the model after this "final" output.

If you want to always run the agent graph to completion and stream all events from the model's streaming response and the agent's execution of tools,
use [`agent.run()`][pydantic_ai.agent.AbstractAgent.run] with an `event_stream_handler` or [`agent.iter()`][pydantic_ai.agent.AbstractAgent.iter] instead, as described in the following sections.
use [`agent.run_stream_events()`][pydantic_ai.agent.AbstractAgent.run_stream_events] or [`agent.iter()`][pydantic_ai.agent.AbstractAgent.iter] instead, as described in the following sections.

```python {title="run_stream_events.py"}
```python {title="run_stream_event_stream_handler.py"}
import asyncio
from collections.abc import AsyncIterable
from datetime import date
Expand Down Expand Up @@ -147,30 +164,32 @@ async def weather_forecast(

output_messages: list[str] = []

async def handle_event(event: AgentStreamEvent):
if isinstance(event, PartStartEvent):
output_messages.append(f'[Request] Starting part {event.index}: {event.part!r}')
elif isinstance(event, PartDeltaEvent):
if isinstance(event.delta, TextPartDelta):
output_messages.append(f'[Request] Part {event.index} text delta: {event.delta.content_delta!r}')
elif isinstance(event.delta, ThinkingPartDelta):
output_messages.append(f'[Request] Part {event.index} thinking delta: {event.delta.content_delta!r}')
elif isinstance(event.delta, ToolCallPartDelta):
output_messages.append(f'[Request] Part {event.index} args delta: {event.delta.args_delta}')
elif isinstance(event, FunctionToolCallEvent):
output_messages.append(
f'[Tools] The LLM calls tool={event.part.tool_name!r} with args={event.part.args} (tool_call_id={event.part.tool_call_id!r})'
)
elif isinstance(event, FunctionToolResultEvent):
output_messages.append(f'[Tools] Tool call {event.tool_call_id!r} returned => {event.result.content}')
elif isinstance(event, FinalResultEvent):
output_messages.append(f'[Result] The model starting producing a final result (tool_name={event.tool_name})')


async def event_stream_handler(
ctx: RunContext,
event_stream: AsyncIterable[AgentStreamEvent],
):
async for event in event_stream:
if isinstance(event, PartStartEvent):
output_messages.append(f'[Request] Starting part {event.index}: {event.part!r}')
elif isinstance(event, PartDeltaEvent):
if isinstance(event.delta, TextPartDelta):
output_messages.append(f'[Request] Part {event.index} text delta: {event.delta.content_delta!r}')
elif isinstance(event.delta, ThinkingPartDelta):
output_messages.append(f'[Request] Part {event.index} thinking delta: {event.delta.content_delta!r}')
elif isinstance(event.delta, ToolCallPartDelta):
output_messages.append(f'[Request] Part {event.index} args delta: {event.delta.args_delta}')
elif isinstance(event, FunctionToolCallEvent):
output_messages.append(
f'[Tools] The LLM calls tool={event.part.tool_name!r} with args={event.part.args} (tool_call_id={event.part.tool_call_id!r})'
)
elif isinstance(event, FunctionToolResultEvent):
output_messages.append(f'[Tools] Tool call {event.tool_call_id!r} returned => {event.result.content}')
elif isinstance(event, FinalResultEvent):
output_messages.append(f'[Result] The model starting producing a final result (tool_name={event.tool_name})')

await handle_event(event)

async def main():
user_prompt = 'What will the weather be like in Paris on Tuesday?'
Expand Down Expand Up @@ -209,24 +228,29 @@ Like `agent.run_stream()`, [`agent.run()`][pydantic_ai.agent.AbstractAgent.run_s
argument that lets you stream all events from the model's streaming response and the agent's execution of tools.
Unlike `run_stream()`, it always runs the agent graph to completion even if text was received ahead of tool calls that looked like it could've been the final result.

For convenience, a [`agent.run_stream_events()`][pydantic_ai.agent.AbstractAgent.run_stream_events] method is also available as a wrapper around `run(event_stream_handler=...)`, which returns an async iterable of [`AgentStreamEvent`s][pydantic_ai.messages.AgentStreamEvent] and a [`AgentRunResultEvent`][pydantic_ai.run.AgentRunResultEvent] containing the final run result.

!!! note
When used with an `event_stream_handler`, the `run()` method currently requires you to piece together the streamed text yourself from the `PartStartEvent` and subsequent `PartDeltaEvent`s instead of providing a `stream_text()` convenience method.
As they return raw events as they come in, the `run_stream_events()` and `run(event_stream_handler=...)` methods require you to piece together the streamed text and structured output yourself from the `PartStartEvent` and subsequent `PartDeltaEvent`s.

To get the best of both worlds, at the expense of some additional complexity, you can use [`agent.iter()`][pydantic_ai.agent.AbstractAgent.iter] as described in the next section, which lets you [iterate over the agent graph](#iterating-over-an-agents-graph) and [stream both events and output](#streaming-all-events-and-output) at every step.

```python {title="run_events.py" requires="run_stream_events.py"}
```python {title="run_events.py" requires="run_stream_event_stream_handler.py"}
import asyncio

from run_stream_events import event_stream_handler, output_messages, weather_agent
from pydantic_ai import AgentRunResultEvent

from run_stream_event_stream_handler import handle_event, output_messages, weather_agent


async def main():
user_prompt = 'What will the weather be like in Paris on Tuesday?'

run = await weather_agent.run(user_prompt, event_stream_handler=event_stream_handler)

output_messages.append(f'[Final Output] {run.output}')

async for event in weather_agent.run_stream_events(user_prompt):
if isinstance(event, AgentRunResultEvent):
output_messages.append(f'[Final Output] {event.result.output}')
else:
await handle_event(event)

if __name__ == '__main__':
asyncio.run(main())
Expand Down
3 changes: 3 additions & 0 deletions docs/api/run.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# `pydantic_ai.run`

::: pydantic_ai.run
2 changes: 1 addition & 1 deletion docs/durable_execution/dbos.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ DBOS checkpoints workflow inputs/outputs and step outputs into a database using

### Streaming

Because DBOS cannot stream output directly to the workflow or step call site, [`Agent.run_stream()`][pydantic_ai.Agent.run_stream] is not supported when running inside of a DBOS workflow.
Because DBOS cannot stream output directly to the workflow or step call site, [`Agent.run_stream()`][pydantic_ai.Agent.run_stream] and [`Agent.run_stream_events()`][pydantic_ai.Agent.run_stream_events] are not supported when running inside of a DBOS workflow.

Instead, you can implement streaming by setting an [`event_stream_handler`][pydantic_ai.agent.EventStreamHandler] on the `Agent` or `DBOSAgent` instance and using [`DBOSAgent.run()`][pydantic_ai.durable_exec.dbos.DBOSAgent.run].
The event stream handler function will receive the agent [run context][pydantic_ai.tools.RunContext] and an async iterable of events from the model's streaming response and the agent's execution of tools. For examples, see the [streaming docs](../agents.md#streaming-all-events).
Expand Down
2 changes: 1 addition & 1 deletion docs/durable_execution/temporal.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ If you need one or more of these attributes to be available inside activities, y

### Streaming

Because Temporal activities cannot stream output directly to the activity call site, [`Agent.run_stream()`][pydantic_ai.Agent.run_stream] and [`Agent.iter()`][pydantic_ai.Agent.iter] are not supported.
Because Temporal activities cannot stream output directly to the activity call site, [`Agent.run_stream()`][pydantic_ai.Agent.run_stream], [`Agent.run_stream_events()`][pydantic_ai.Agent.run_stream_events], and [`Agent.iter()`][pydantic_ai.Agent.iter] are not supported.

Instead, you can implement streaming by setting an [`event_stream_handler`][pydantic_ai.agent.EventStreamHandler] on the `Agent` or `TemporalAgent` instance and using [`TemporalAgent.run()`][pydantic_ai.durable_exec.temporal.TemporalAgent.run] inside the workflow.
The event stream handler function will receive the agent [run context][pydantic_ai.tools.RunContext] and an async iterable of events from the model's streaming response and the agent's execution of tools. For examples, see the [streaming docs](../agents.md#streaming-all-events).
Expand Down
2 changes: 1 addition & 1 deletion docs/output.md
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ There two main challenges with streamed results:
it will stop running the agent graph and will not execute any tool calls made by the model after this "final" output.

If you want to always run the agent graph to completion and stream all events from the model's streaming response and the agent's execution of tools,
use [`agent.run()`][pydantic_ai.agent.AbstractAgent.run] with an `event_stream_handler` ([docs](agents.md#streaming-all-events)) or [`agent.iter()`][pydantic_ai.agent.AbstractAgent.iter] ([docs](agents.md#streaming-all-events-and-output)) instead.
use [`agent.run_stream_events()`][pydantic_ai.agent.AbstractAgent.run_stream_events] ([docs](agents.md#streaming-all-events)) or [`agent.iter()`][pydantic_ai.agent.AbstractAgent.iter] ([docs](agents.md#streaming-all-events-and-output)) instead.

### Streaming Text

Expand Down
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ nav:
- api/profiles.md
- api/providers.md
- api/retries.md
- api/run.md
- pydantic_evals:
- api/pydantic_evals/dataset.md
- api/pydantic_evals/evaluators.md
Expand Down
5 changes: 5 additions & 0 deletions pydantic_ai_slim/pydantic_ai/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
ModelProfile,
ModelProfileSpec,
)
from .run import AgentRun, AgentRunResult, AgentRunResultEvent
from .settings import ModelSettings
from .tools import DeferredToolRequests, DeferredToolResults, RunContext, Tool, ToolApproved, ToolDefinition, ToolDenied
from .toolsets import (
Expand Down Expand Up @@ -224,5 +225,9 @@
'RunUsage',
'RequestUsage',
'UsageLimits',
# run
'AgentRun',
'AgentRunResult',
'AgentRunResultEvent',
)
__version__ = _metadata_version('pydantic_ai_slim')
8 changes: 4 additions & 4 deletions pydantic_ai_slim/pydantic_ai/_agent_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@
class GraphAgentState:
"""State kept across the execution of the agent graph."""

message_history: list[_messages.ModelMessage]
usage: _usage.RunUsage
retries: int
run_step: int
message_history: list[_messages.ModelMessage] = dataclasses.field(default_factory=list)
usage: _usage.RunUsage = dataclasses.field(default_factory=_usage.RunUsage)
retries: int = 0
run_step: int = 0

def increment_retries(self, max_result_retries: int, error: BaseException | None = None) -> None:
self.retries += 1
Expand Down
Loading