From 2f95844de83f76e9d4d2c1642e93c91dc5903e41 Mon Sep 17 00:00:00 2001 From: Douwe Maan Date: Fri, 3 Oct 2025 23:15:45 +0000 Subject: [PATCH 1/6] Add Agent.run_stream_events() convenience method wrapping run(event_stream_handler=...) --- docs/agents.md | 88 ++++++---- docs/durable_execution/dbos.md | 2 +- docs/durable_execution/temporal.md | 2 +- docs/output.md | 2 +- pydantic_ai_slim/pydantic_ai/__init__.py | 5 + pydantic_ai_slim/pydantic_ai/_agent_graph.py | 8 +- .../pydantic_ai/agent/abstract.py | 158 +++++++++++++++++- .../pydantic_ai/durable_exec/dbos/_agent.py | 97 ++++++++++- .../durable_exec/temporal/_agent.py | 117 ++++++++++++- pydantic_ai_slim/pydantic_ai/result.py | 6 +- pydantic_ai_slim/pydantic_ai/run.py | 26 ++- ...test_temporal_agent_run_stream_events.yaml | 78 +++++++++ tests/test_dbos.py | 32 +++- tests/test_examples.py | 1 + tests/test_streaming.py | 38 +++++ tests/test_temporal.py | 53 +++++- 16 files changed, 653 insertions(+), 60 deletions(-) create mode 100644 tests/cassettes/test_temporal/test_temporal_agent_run_stream_events.yaml diff --git a/docs/agents.md b/docs/agents.md index 442e4330d1..f0fa59056d 100644 --- a/docs/agents.md +++ b/docs/agents.md @@ -61,17 +61,18 @@ print(result.output) ## Running Agents -There are four ways to run an agent: +There are five ways to run an agent: 1. [`agent.run()`][pydantic_ai.agent.AbstractAgent.run] — an async function which returns a [`RunResult`][pydantic_ai.agent.AgentRunResult] containing a completed response. 2. [`agent.run_sync()`][pydantic_ai.agent.AbstractAgent.run_sync] — a plain, synchronous function which returns a [`RunResult`][pydantic_ai.agent.AgentRunResult] containing a completed response (internally, this just calls `loop.run_until_complete(self.run())`). 3. [`agent.run_stream()`][pydantic_ai.agent.AbstractAgent.run_stream] — an async context manager which returns a [`StreamedRunResult`][pydantic_ai.result.StreamedRunResult], which contains methods to stream text and structured output as an async iterable. -4. [`agent.iter()`][pydantic_ai.Agent.iter] — a context manager which returns an [`AgentRun`][pydantic_ai.agent.AgentRun], an async-iterable over the nodes of the agent's underlying [`Graph`][pydantic_graph.graph.Graph]. +4. [`agent.run_stream_events()`][pydantic_ai.agent.AbstractAgent.run_stream_events] — an function which returns an async iterable of [`AgentStreamEvent`s][pydantic_ai.agent.AgentStreamEvent] and a [`AgentRunResultEvent`][pydantic_ai.agent.AgentRunResultEvent] containing the final run result. +5. [`agent.iter()`][pydantic_ai.Agent.iter] — a context manager which returns an [`AgentRun`][pydantic_ai.agent.AgentRun], an async iterable over the nodes of the agent's underlying [`Graph`][pydantic_graph.graph.Graph]. -Here's a simple example demonstrating the first three: +Here's a simple example demonstrating the first four: ```python {title="run_agent.py"} -from pydantic_ai import Agent +from pydantic_ai import Agent, AgentRunResultEvent, AgentStreamEvent agent = Agent('openai:gpt-4o') @@ -91,6 +92,22 @@ async def main(): #> The capital of #> The capital of the UK is #> The capital of the UK is London. + + events: list[AgentStreamEvent | AgentRunResultEvent] = [] + async for event in agent.run_stream_events('What is the capital of Mexico?'): + events.append(event) + print(events) + """ + [ + PartStartEvent(index=0, part=TextPart(content='The capital of ')), + FinalResultEvent(tool_name=None, tool_call_id=None), + PartDeltaEvent(index=0, delta=TextPartDelta(content_delta='Mexico is Mexico ')), + PartDeltaEvent(index=0, delta=TextPartDelta(content_delta='City.')), + AgentRunResultEvent( + result=AgentRunResult(output='The capital of Mexico is Mexico City.') + ), + ] + """ ``` _(This example is complete, it can be run "as is" — you'll need to add `asyncio.run(main())` to run `main`)_ @@ -105,13 +122,13 @@ It also takes an optional `event_stream_handler` argument that you can use to ga The example below shows how to stream events and text output. You can also [stream structured output](output.md#streaming-structured-output). !!! note - As the `run_stream()` method will consider the first output matching the `output_type` to be the final output, + As the `run_stream()` method will consider the first output matching the [output type](output.md#structured-output) to be the final output, it will stop running the agent graph and will not execute any tool calls made by the model after this "final" output. If you want to always run the agent graph to completion and stream all events from the model's streaming response and the agent's execution of tools, - use [`agent.run()`][pydantic_ai.agent.AbstractAgent.run] with an `event_stream_handler` or [`agent.iter()`][pydantic_ai.agent.AbstractAgent.iter] instead, as described in the following sections. + use [`agent.run_stream_events()`][pydantic_ai.agent.AbstractAgent.run_stream_events] or [`agent.iter()`][pydantic_ai.agent.AbstractAgent.iter] instead, as described in the following sections. -```python {title="run_stream_events.py"} +```python {title="run_stream_event_stream_handler.py"} import asyncio from collections.abc import AsyncIterable from datetime import date @@ -147,30 +164,32 @@ async def weather_forecast( output_messages: list[str] = [] +async def handle_event(event: AgentStreamEvent): + if isinstance(event, PartStartEvent): + output_messages.append(f'[Request] Starting part {event.index}: {event.part!r}') + elif isinstance(event, PartDeltaEvent): + if isinstance(event.delta, TextPartDelta): + output_messages.append(f'[Request] Part {event.index} text delta: {event.delta.content_delta!r}') + elif isinstance(event.delta, ThinkingPartDelta): + output_messages.append(f'[Request] Part {event.index} thinking delta: {event.delta.content_delta!r}') + elif isinstance(event.delta, ToolCallPartDelta): + output_messages.append(f'[Request] Part {event.index} args delta: {event.delta.args_delta}') + elif isinstance(event, FunctionToolCallEvent): + output_messages.append( + f'[Tools] The LLM calls tool={event.part.tool_name!r} with args={event.part.args} (tool_call_id={event.part.tool_call_id!r})' + ) + elif isinstance(event, FunctionToolResultEvent): + output_messages.append(f'[Tools] Tool call {event.tool_call_id!r} returned => {event.result.content}') + elif isinstance(event, FinalResultEvent): + output_messages.append(f'[Result] The model starting producing a final result (tool_name={event.tool_name})') + async def event_stream_handler( ctx: RunContext, event_stream: AsyncIterable[AgentStreamEvent], ): async for event in event_stream: - if isinstance(event, PartStartEvent): - output_messages.append(f'[Request] Starting part {event.index}: {event.part!r}') - elif isinstance(event, PartDeltaEvent): - if isinstance(event.delta, TextPartDelta): - output_messages.append(f'[Request] Part {event.index} text delta: {event.delta.content_delta!r}') - elif isinstance(event.delta, ThinkingPartDelta): - output_messages.append(f'[Request] Part {event.index} thinking delta: {event.delta.content_delta!r}') - elif isinstance(event.delta, ToolCallPartDelta): - output_messages.append(f'[Request] Part {event.index} args delta: {event.delta.args_delta}') - elif isinstance(event, FunctionToolCallEvent): - output_messages.append( - f'[Tools] The LLM calls tool={event.part.tool_name!r} with args={event.part.args} (tool_call_id={event.part.tool_call_id!r})' - ) - elif isinstance(event, FunctionToolResultEvent): - output_messages.append(f'[Tools] Tool call {event.tool_call_id!r} returned => {event.result.content}') - elif isinstance(event, FinalResultEvent): - output_messages.append(f'[Result] The model starting producing a final result (tool_name={event.tool_name})') - + await handle_event(event) async def main(): user_prompt = 'What will the weather be like in Paris on Tuesday?' @@ -209,24 +228,29 @@ Like `agent.run_stream()`, [`agent.run()`][pydantic_ai.agent.AbstractAgent.run_s argument that lets you stream all events from the model's streaming response and the agent's execution of tools. Unlike `run_stream()`, it always runs the agent graph to completion even if text was received ahead of tool calls that looked like it could've been the final result. +For convenience, a [`agent.run_stream_events()`][pydantic_ai.agent.AbstractAgent.run_stream_events] method is also available as a wrapper around `run(event_stream_handler=...)`, which returns an async iterable of [`AgentStreamEvent`s][pydantic_ai.agent.AgentStreamEvent] and a [`AgentRunResultEvent`][pydantic_ai.agent.AgentRunResultEvent] containing the final run result. + !!! note - When used with an `event_stream_handler`, the `run()` method currently requires you to piece together the streamed text yourself from the `PartStartEvent` and subsequent `PartDeltaEvent`s instead of providing a `stream_text()` convenience method. + As they return raw events as they come in, the `run_stream_events()` and `run(event_stream_handler=...)` methods require you to piece together the streamed text and structured output yourself from the `PartStartEvent` and subsequent `PartDeltaEvent`s. To get the best of both worlds, at the expense of some additional complexity, you can use [`agent.iter()`][pydantic_ai.agent.AbstractAgent.iter] as described in the next section, which lets you [iterate over the agent graph](#iterating-over-an-agents-graph) and [stream both events and output](#streaming-all-events-and-output) at every step. -```python {title="run_events.py" requires="run_stream_events.py"} +```python {title="run_events.py" requires="run_stream_event_stream_handler.py"} import asyncio -from run_stream_events import event_stream_handler, output_messages, weather_agent +from pydantic_ai import AgentRunResultEvent + +from run_stream_event_stream_handler import handle_event, output_messages, weather_agent async def main(): user_prompt = 'What will the weather be like in Paris on Tuesday?' - run = await weather_agent.run(user_prompt, event_stream_handler=event_stream_handler) - - output_messages.append(f'[Final Output] {run.output}') - + async for event in weather_agent.run_stream_events(user_prompt): + if isinstance(event, AgentRunResultEvent): + output_messages.append(f'[Final Output] {event.result.output}') + else: + await handle_event(event) if __name__ == '__main__': asyncio.run(main()) diff --git a/docs/durable_execution/dbos.md b/docs/durable_execution/dbos.md index a938b5afb5..9e3ed0c835 100644 --- a/docs/durable_execution/dbos.md +++ b/docs/durable_execution/dbos.md @@ -127,7 +127,7 @@ DBOS checkpoints workflow inputs/outputs and step outputs into a database using ### Streaming -Because DBOS cannot stream output directly to the workflow or step call site, [`Agent.run_stream()`][pydantic_ai.Agent.run_stream] is not supported when running inside of a DBOS workflow. +Because DBOS cannot stream output directly to the workflow or step call site, [`Agent.run_stream()`][pydantic_ai.Agent.run_stream] and [`Agent.run_stream_events()`][pydantic_ai.Agent.run_stream_events] are not supported when running inside of a DBOS workflow. Instead, you can implement streaming by setting an [`event_stream_handler`][pydantic_ai.agent.EventStreamHandler] on the `Agent` or `DBOSAgent` instance and using [`DBOSAgent.run()`][pydantic_ai.durable_exec.dbos.DBOSAgent.run]. The event stream handler function will receive the agent [run context][pydantic_ai.tools.RunContext] and an async iterable of events from the model's streaming response and the agent's execution of tools. For examples, see the [streaming docs](../agents.md#streaming-all-events). diff --git a/docs/durable_execution/temporal.md b/docs/durable_execution/temporal.md index 841b18c179..c29e178843 100644 --- a/docs/durable_execution/temporal.md +++ b/docs/durable_execution/temporal.md @@ -177,7 +177,7 @@ If you need one or more of these attributes to be available inside activities, y ### Streaming -Because Temporal activities cannot stream output directly to the activity call site, [`Agent.run_stream()`][pydantic_ai.Agent.run_stream] and [`Agent.iter()`][pydantic_ai.Agent.iter] are not supported. +Because Temporal activities cannot stream output directly to the activity call site, [`Agent.run_stream()`][pydantic_ai.Agent.run_stream], [`Agent.run_stream_events()`][pydantic_ai.Agent.run_stream_events], and [`Agent.iter()`][pydantic_ai.Agent.iter] are not supported. Instead, you can implement streaming by setting an [`event_stream_handler`][pydantic_ai.agent.EventStreamHandler] on the `Agent` or `TemporalAgent` instance and using [`TemporalAgent.run()`][pydantic_ai.durable_exec.temporal.TemporalAgent.run] inside the workflow. The event stream handler function will receive the agent [run context][pydantic_ai.tools.RunContext] and an async iterable of events from the model's streaming response and the agent's execution of tools. For examples, see the [streaming docs](../agents.md#streaming-all-events). diff --git a/docs/output.md b/docs/output.md index 21b7ce9b21..c8dc1741c3 100644 --- a/docs/output.md +++ b/docs/output.md @@ -520,7 +520,7 @@ There two main challenges with streamed results: it will stop running the agent graph and will not execute any tool calls made by the model after this "final" output. If you want to always run the agent graph to completion and stream all events from the model's streaming response and the agent's execution of tools, - use [`agent.run()`][pydantic_ai.agent.AbstractAgent.run] with an `event_stream_handler` ([docs](agents.md#streaming-all-events)) or [`agent.iter()`][pydantic_ai.agent.AbstractAgent.iter] ([docs](agents.md#streaming-all-events-and-output)) instead. + use [`agent.run_stream_events()`][pydantic_ai.agent.AbstractAgent.run_stream_events] ([docs](agents.md#streaming-all-events)) or [`agent.iter()`][pydantic_ai.agent.AbstractAgent.iter] ([docs](agents.md#streaming-all-events-and-output)) instead. ### Streaming Text diff --git a/pydantic_ai_slim/pydantic_ai/__init__.py b/pydantic_ai_slim/pydantic_ai/__init__.py index 37914ece89..8f6254f425 100644 --- a/pydantic_ai_slim/pydantic_ai/__init__.py +++ b/pydantic_ai_slim/pydantic_ai/__init__.py @@ -88,6 +88,7 @@ ModelProfile, ModelProfileSpec, ) +from .run import AgentRun, AgentRunResult, AgentRunResultEvent from .settings import ModelSettings from .tools import DeferredToolRequests, DeferredToolResults, RunContext, Tool, ToolApproved, ToolDefinition, ToolDenied from .toolsets import ( @@ -224,5 +225,9 @@ 'RunUsage', 'RequestUsage', 'UsageLimits', + # run + 'AgentRun', + 'AgentRunResult', + 'AgentRunResultEvent', ) __version__ = _metadata_version('pydantic_ai_slim') diff --git a/pydantic_ai_slim/pydantic_ai/_agent_graph.py b/pydantic_ai_slim/pydantic_ai/_agent_graph.py index 9919eb85b3..c76ac53882 100644 --- a/pydantic_ai_slim/pydantic_ai/_agent_graph.py +++ b/pydantic_ai_slim/pydantic_ai/_agent_graph.py @@ -87,10 +87,10 @@ class GraphAgentState: """State kept across the execution of the agent graph.""" - message_history: list[_messages.ModelMessage] - usage: _usage.RunUsage - retries: int - run_step: int + message_history: list[_messages.ModelMessage] = dataclasses.field(default_factory=list) + usage: _usage.RunUsage = dataclasses.field(default_factory=_usage.RunUsage) + retries: int = 0 + run_step: int = 0 def increment_retries(self, max_result_retries: int, error: BaseException | None = None) -> None: self.retries += 1 diff --git a/pydantic_ai_slim/pydantic_ai/agent/abstract.py b/pydantic_ai_slim/pydantic_ai/agent/abstract.py index fdd21b8065..e28c975eaa 100644 --- a/pydantic_ai_slim/pydantic_ai/agent/abstract.py +++ b/pydantic_ai_slim/pydantic_ai/agent/abstract.py @@ -1,5 +1,6 @@ from __future__ import annotations as _annotations +import asyncio import inspect from abc import ABC, abstractmethod from collections.abc import AsyncIterable, AsyncIterator, Awaitable, Callable, Iterator, Mapping, Sequence @@ -7,6 +8,7 @@ from types import FrameType from typing import TYPE_CHECKING, Any, Generic, TypeAlias, cast, overload +import anyio from typing_extensions import Self, TypeIs, TypeVar from pydantic_graph import End @@ -25,7 +27,7 @@ from .._tool_manager import ToolManager from ..output import OutputDataT, OutputSpec from ..result import AgentStream, FinalResult, StreamedRunResult -from ..run import AgentRun, AgentRunResult +from ..run import AgentRun, AgentRunResult, AgentRunResultEvent from ..settings import ModelSettings from ..tools import ( AgentDepsT, @@ -552,6 +554,160 @@ async def on_complete() -> None: if not yielded: raise exceptions.AgentRunError('Agent run finished without producing a final result') # pragma: no cover + @overload + def run_stream_events( + self, + user_prompt: str | Sequence[_messages.UserContent] | None = None, + *, + output_type: None = None, + message_history: list[_messages.ModelMessage] | None = None, + deferred_tool_results: DeferredToolResults | None = None, + model: models.Model | models.KnownModelName | str | None = None, + deps: AgentDepsT = None, + model_settings: ModelSettings | None = None, + usage_limits: _usage.UsageLimits | None = None, + usage: _usage.RunUsage | None = None, + infer_name: bool = True, + toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, + ) -> AsyncIterator[_messages.AgentStreamEvent | AgentRunResultEvent[OutputDataT]]: ... + + @overload + def run_stream_events( + self, + user_prompt: str | Sequence[_messages.UserContent] | None = None, + *, + output_type: OutputSpec[RunOutputDataT], + message_history: list[_messages.ModelMessage] | None = None, + deferred_tool_results: DeferredToolResults | None = None, + model: models.Model | models.KnownModelName | str | None = None, + deps: AgentDepsT = None, + model_settings: ModelSettings | None = None, + usage_limits: _usage.UsageLimits | None = None, + usage: _usage.RunUsage | None = None, + infer_name: bool = True, + toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, + ) -> AsyncIterator[_messages.AgentStreamEvent | AgentRunResultEvent[RunOutputDataT]]: ... + + def run_stream_events( + self, + user_prompt: str | Sequence[_messages.UserContent] | None = None, + *, + output_type: OutputSpec[RunOutputDataT] | None = None, + message_history: list[_messages.ModelMessage] | None = None, + deferred_tool_results: DeferredToolResults | None = None, + model: models.Model | models.KnownModelName | str | None = None, + deps: AgentDepsT = None, + model_settings: ModelSettings | None = None, + usage_limits: _usage.UsageLimits | None = None, + usage: _usage.RunUsage | None = None, + infer_name: bool = True, + toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, + ) -> AsyncIterator[_messages.AgentStreamEvent | AgentRunResultEvent[Any]]: + """Run the agent with a user prompt in async mode and stream events from the run. + + This is a convenience method that wraps [`self.run`][pydantic_ai.agent.AbstractAgent.run] and + uses the `event_stream_handler` kwarg to get a stream of events from the run. + + Example: + ```python + from pydantic_ai import Agent + + agent = Agent('openai:gpt-4o') + + async def main(): + async for event in agent.run_stream_events('What is the capital of France?'): + print(event) + ``` + + Arguments are the same as for [`self.run`][pydantic_ai.agent.AbstractAgent.run], + except that `event_stream_handler` is now allowed. + + Args: + user_prompt: User input to start/continue the conversation. + output_type: Custom output type to use for this run, `output_type` may only be used if the agent has no + output validators since output validators would expect an argument that matches the agent's output type. + message_history: History of the conversation so far. + deferred_tool_results: Optional results for deferred tool calls in the message history. + model: Optional model to use for this run, required if `model` was not set when creating the agent. + deps: Optional dependencies to use for this run. + model_settings: Optional settings to use for this model's request. + usage_limits: Optional limits on model request count or token usage. + usage: Optional usage to start with, useful for resuming a conversation or agents used in tools. + infer_name: Whether to try to infer the agent name from the call frame if it's not set. + toolsets: Optional additional toolsets for this run. + + Returns: + An async iterable of stream events `AgentStreamEvent` and finally a `AgentRunResultEvent` with the final + run result. + """ + # unfortunately this hack of returning a generator rather than defining it right here is + # required to allow overloads of this method to work in python's typing system, or at least with pyright + # or at least I couldn't make it work without + return self._run_stream_events( + user_prompt, + output_type=output_type, + message_history=message_history, + deferred_tool_results=deferred_tool_results, + model=model, + deps=deps, + model_settings=model_settings, + usage_limits=usage_limits, + usage=usage, + infer_name=infer_name, + toolsets=toolsets, + ) + + async def _run_stream_events( + self, + user_prompt: str | Sequence[_messages.UserContent] | None = None, + *, + output_type: OutputSpec[RunOutputDataT] | None = None, + message_history: list[_messages.ModelMessage] | None = None, + deferred_tool_results: DeferredToolResults | None = None, + model: models.Model | models.KnownModelName | str | None = None, + deps: AgentDepsT = None, + model_settings: ModelSettings | None = None, + usage_limits: _usage.UsageLimits | None = None, + usage: _usage.RunUsage | None = None, + infer_name: bool = True, + toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, + ) -> AsyncIterator[_messages.AgentStreamEvent | AgentRunResultEvent[Any]]: + send_stream, receive_stream = anyio.create_memory_object_stream[ + _messages.AgentStreamEvent | AgentRunResultEvent[Any] + ]() + + async def event_stream_handler( + _: RunContext[AgentDepsT], events: AsyncIterable[_messages.AgentStreamEvent] + ) -> None: + async for event in events: + await send_stream.send(event) + + async def run_agent() -> AgentRunResult[Any]: + async with send_stream: + return await self.run( + user_prompt, + output_type=output_type, + message_history=message_history, + deferred_tool_results=deferred_tool_results, + model=model, + deps=deps, + model_settings=model_settings, + usage_limits=usage_limits, + usage=usage, + infer_name=infer_name, + toolsets=toolsets, + event_stream_handler=event_stream_handler, + ) + + task = asyncio.create_task(run_agent()) + + async with receive_stream: + async for message in receive_stream: + yield message + + result = await task + yield AgentRunResultEvent(result) + @overload def iter( self, diff --git a/pydantic_ai_slim/pydantic_ai/durable_exec/dbos/_agent.py b/pydantic_ai_slim/pydantic_ai/durable_exec/dbos/_agent.py index d7d4987d8f..89ac4ed84e 100644 --- a/pydantic_ai_slim/pydantic_ai/durable_exec/dbos/_agent.py +++ b/pydantic_ai_slim/pydantic_ai/durable_exec/dbos/_agent.py @@ -9,6 +9,7 @@ from pydantic_ai import ( AbstractToolset, + AgentRunResultEvent, _utils, messages as _messages, models, @@ -525,9 +526,8 @@ async def main(): """ if DBOS.workflow_id is not None and DBOS.step_id is None: raise UserError( - '`agent.run_stream()` cannot currently be used inside a DBOS workflow. ' - 'Set an `event_stream_handler` on the agent and use `agent.run()` instead. ' - 'Please file an issue if this is not sufficient for your use case.' + '`agent.run_stream()` cannot be used inside a DBOS workflow. ' + 'Set an `event_stream_handler` on the agent and use `agent.run()` instead.' ) async with super().run_stream( @@ -547,6 +547,97 @@ async def main(): ) as result: yield result + @overload + def run_stream_events( + self, + user_prompt: str | Sequence[_messages.UserContent] | None = None, + *, + output_type: None = None, + message_history: list[_messages.ModelMessage] | None = None, + deferred_tool_results: DeferredToolResults | None = None, + model: models.Model | models.KnownModelName | str | None = None, + deps: AgentDepsT = None, + model_settings: ModelSettings | None = None, + usage_limits: _usage.UsageLimits | None = None, + usage: _usage.RunUsage | None = None, + infer_name: bool = True, + toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, + ) -> AsyncIterator[_messages.AgentStreamEvent | AgentRunResultEvent[OutputDataT]]: ... + + @overload + def run_stream_events( + self, + user_prompt: str | Sequence[_messages.UserContent] | None = None, + *, + output_type: OutputSpec[RunOutputDataT], + message_history: list[_messages.ModelMessage] | None = None, + deferred_tool_results: DeferredToolResults | None = None, + model: models.Model | models.KnownModelName | str | None = None, + deps: AgentDepsT = None, + model_settings: ModelSettings | None = None, + usage_limits: _usage.UsageLimits | None = None, + usage: _usage.RunUsage | None = None, + infer_name: bool = True, + toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, + ) -> AsyncIterator[_messages.AgentStreamEvent | AgentRunResultEvent[RunOutputDataT]]: ... + + def run_stream_events( + self, + user_prompt: str | Sequence[_messages.UserContent] | None = None, + *, + output_type: OutputSpec[RunOutputDataT] | None = None, + message_history: list[_messages.ModelMessage] | None = None, + deferred_tool_results: DeferredToolResults | None = None, + model: models.Model | models.KnownModelName | str | None = None, + deps: AgentDepsT = None, + model_settings: ModelSettings | None = None, + usage_limits: _usage.UsageLimits | None = None, + usage: _usage.RunUsage | None = None, + infer_name: bool = True, + toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, + ) -> AsyncIterator[_messages.AgentStreamEvent | AgentRunResultEvent[Any]]: + """Run the agent with a user prompt in async mode and stream events from the run. + + This is a convenience method that wraps [`self.run`][pydantic_ai.agent.AbstractAgent.run] and + uses the `event_stream_handler` kwarg to get a stream of events from the run. + + Example: + ```python + from pydantic_ai import Agent + + agent = Agent('openai:gpt-4o') + + async def main(): + async for event in agent.run_stream_events('What is the capital of France?'): + print(event) + ``` + + Arguments are the same as for [`self.run`][pydantic_ai.agent.AbstractAgent.run], + except that `event_stream_handler` is now allowed. + + Args: + user_prompt: User input to start/continue the conversation. + output_type: Custom output type to use for this run, `output_type` may only be used if the agent has no + output validators since output validators would expect an argument that matches the agent's output type. + message_history: History of the conversation so far. + deferred_tool_results: Optional results for deferred tool calls in the message history. + model: Optional model to use for this run, required if `model` was not set when creating the agent. + deps: Optional dependencies to use for this run. + model_settings: Optional settings to use for this model's request. + usage_limits: Optional limits on model request count or token usage. + usage: Optional usage to start with, useful for resuming a conversation or agents used in tools. + infer_name: Whether to try to infer the agent name from the call frame if it's not set. + toolsets: Optional additional toolsets for this run. + + Returns: + An async iterable of stream events `AgentStreamEvent` and finally a `AgentRunResultEvent` with the final + run result. + """ + raise UserError( + '`agent.run_stream_events()` cannot be used with DBOS. ' + 'Set an `event_stream_handler` on the agent and use `agent.run()` instead.' + ) + @overload def iter( self, diff --git a/pydantic_ai_slim/pydantic_ai/durable_exec/temporal/_agent.py b/pydantic_ai_slim/pydantic_ai/durable_exec/temporal/_agent.py index a87b5195a9..d08c7dff61 100644 --- a/pydantic_ai_slim/pydantic_ai/durable_exec/temporal/_agent.py +++ b/pydantic_ai_slim/pydantic_ai/durable_exec/temporal/_agent.py @@ -17,6 +17,7 @@ from pydantic_ai import ( AbstractToolset, + AgentRunResultEvent, _utils, messages as _messages, models, @@ -558,9 +559,8 @@ async def main(): """ if workflow.in_workflow(): raise UserError( - '`agent.run_stream()` cannot currently be used inside a Temporal workflow. ' - 'Set an `event_stream_handler` on the agent and use `agent.run()` instead. ' - 'Please file an issue if this is not sufficient for your use case.' + '`agent.run_stream()` cannot be used inside a Temporal workflow. ' + 'Set an `event_stream_handler` on the agent and use `agent.run()` instead.' ) async with super().run_stream( @@ -580,6 +580,112 @@ async def main(): ) as result: yield result + @overload + def run_stream_events( + self, + user_prompt: str | Sequence[_messages.UserContent] | None = None, + *, + output_type: None = None, + message_history: list[_messages.ModelMessage] | None = None, + deferred_tool_results: DeferredToolResults | None = None, + model: models.Model | models.KnownModelName | str | None = None, + deps: AgentDepsT = None, + model_settings: ModelSettings | None = None, + usage_limits: _usage.UsageLimits | None = None, + usage: _usage.RunUsage | None = None, + infer_name: bool = True, + toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, + ) -> AsyncIterator[_messages.AgentStreamEvent | AgentRunResultEvent[OutputDataT]]: ... + + @overload + def run_stream_events( + self, + user_prompt: str | Sequence[_messages.UserContent] | None = None, + *, + output_type: OutputSpec[RunOutputDataT], + message_history: list[_messages.ModelMessage] | None = None, + deferred_tool_results: DeferredToolResults | None = None, + model: models.Model | models.KnownModelName | str | None = None, + deps: AgentDepsT = None, + model_settings: ModelSettings | None = None, + usage_limits: _usage.UsageLimits | None = None, + usage: _usage.RunUsage | None = None, + infer_name: bool = True, + toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, + ) -> AsyncIterator[_messages.AgentStreamEvent | AgentRunResultEvent[RunOutputDataT]]: ... + + def run_stream_events( + self, + user_prompt: str | Sequence[_messages.UserContent] | None = None, + *, + output_type: OutputSpec[RunOutputDataT] | None = None, + message_history: list[_messages.ModelMessage] | None = None, + deferred_tool_results: DeferredToolResults | None = None, + model: models.Model | models.KnownModelName | str | None = None, + deps: AgentDepsT = None, + model_settings: ModelSettings | None = None, + usage_limits: _usage.UsageLimits | None = None, + usage: _usage.RunUsage | None = None, + infer_name: bool = True, + toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, + ) -> AsyncIterator[_messages.AgentStreamEvent | AgentRunResultEvent[Any]]: + """Run the agent with a user prompt in async mode and stream events from the run. + + This is a convenience method that wraps [`self.run`][pydantic_ai.agent.AbstractAgent.run] and + uses the `event_stream_handler` kwarg to get a stream of events from the run. + + Example: + ```python + from pydantic_ai import Agent + + agent = Agent('openai:gpt-4o') + + async def main(): + async for event in agent.run_stream_events('What is the capital of France?'): + print(event) + ``` + + Arguments are the same as for [`self.run`][pydantic_ai.agent.AbstractAgent.run], + except that `event_stream_handler` is now allowed. + + Args: + user_prompt: User input to start/continue the conversation. + output_type: Custom output type to use for this run, `output_type` may only be used if the agent has no + output validators since output validators would expect an argument that matches the agent's output type. + message_history: History of the conversation so far. + deferred_tool_results: Optional results for deferred tool calls in the message history. + model: Optional model to use for this run, required if `model` was not set when creating the agent. + deps: Optional dependencies to use for this run. + model_settings: Optional settings to use for this model's request. + usage_limits: Optional limits on model request count or token usage. + usage: Optional usage to start with, useful for resuming a conversation or agents used in tools. + infer_name: Whether to try to infer the agent name from the call frame if it's not set. + toolsets: Optional additional toolsets for this run. + + Returns: + An async iterable of stream events `AgentStreamEvent` and finally a `AgentRunResultEvent` with the final + run result. + """ + if workflow.in_workflow(): + raise UserError( + '`agent.run_stream_events()` cannot be used inside a Temporal workflow. ' + 'Set an `event_stream_handler` on the agent and use `agent.run()` instead.' + ) + + return super().run_stream_events( + user_prompt, + output_type=output_type, + message_history=message_history, + deferred_tool_results=deferred_tool_results, + model=model, + deps=deps, + model_settings=model_settings, + usage_limits=usage_limits, + usage=usage, + infer_name=infer_name, + toolsets=toolsets, + ) + @overload def iter( self, @@ -711,9 +817,8 @@ async def main(): if workflow.in_workflow(): if not self._temporal_overrides_active.get(): raise UserError( - '`agent.iter()` cannot currently be used inside a Temporal workflow. ' - 'Set an `event_stream_handler` on the agent and use `agent.run()` instead. ' - 'Please file an issue if this is not sufficient for your use case.' + '`agent.iter()` cannot be used inside a Temporal workflow. ' + 'Set an `event_stream_handler` on the agent and use `agent.run()` instead.' ) if model is not None: diff --git a/pydantic_ai_slim/pydantic_ai/result.py b/pydantic_ai_slim/pydantic_ai/result.py index 37dac11f9e..e41acd1a5f 100644 --- a/pydantic_ai_slim/pydantic_ai/result.py +++ b/pydantic_ai_slim/pydantic_ai/result.py @@ -4,7 +4,7 @@ from copy import deepcopy from dataclasses import dataclass, field from datetime import datetime -from typing import Generic, cast, overload +from typing import TYPE_CHECKING, Generic, cast, overload from pydantic import ValidationError from typing_extensions import TypeVar, deprecated @@ -25,9 +25,11 @@ OutputDataT, ToolOutput, ) -from .run import AgentRunResult from .usage import RunUsage, UsageLimits +if TYPE_CHECKING: + from .run import AgentRunResult + __all__ = ( 'OutputDataT', 'OutputDataT_inv', diff --git a/pydantic_ai_slim/pydantic_ai/run.py b/pydantic_ai_slim/pydantic_ai/run.py index 34ede6d5c8..58c6a6011e 100644 --- a/pydantic_ai_slim/pydantic_ai/run.py +++ b/pydantic_ai_slim/pydantic_ai/run.py @@ -10,6 +10,7 @@ from . import ( _agent_graph, + _utils, exceptions, messages as _messages, usage as _usage, @@ -245,10 +246,12 @@ class AgentRunResult(Generic[OutputDataT]): output: OutputDataT """The output data from the agent run.""" - _output_tool_name: str | None = dataclasses.field(repr=False) - _state: _agent_graph.GraphAgentState = dataclasses.field(repr=False) - _new_message_index: int = dataclasses.field(repr=False) - _traceparent_value: str | None = dataclasses.field(repr=False) + _output_tool_name: str | None = dataclasses.field(repr=False, compare=False, default=None) + _state: _agent_graph.GraphAgentState = dataclasses.field( + repr=False, compare=False, default_factory=_agent_graph.GraphAgentState + ) + _new_message_index: int = dataclasses.field(repr=False, compare=False, default=0) + _traceparent_value: str | None = dataclasses.field(repr=False, compare=False, default=None) @overload def _traceparent(self, *, required: Literal[False]) -> str | None: ... @@ -363,3 +366,18 @@ def usage(self) -> _usage.RunUsage: def timestamp(self) -> datetime: """Return the timestamp of last response.""" return self.response.timestamp + + +@dataclasses.dataclass(repr=False) +class AgentRunResultEvent(Generic[OutputDataT]): + """An event indicating the agent run ended and containing the final result of the agent run.""" + + result: AgentRunResult[OutputDataT] + """The result of the run.""" + + _: dataclasses.KW_ONLY + + event_kind: Literal['agent_run_result'] = 'agent_run_result' + """Event type identifier, used as a discriminator.""" + + __repr__ = _utils.dataclasses_no_defaults_repr diff --git a/tests/cassettes/test_temporal/test_temporal_agent_run_stream_events.yaml b/tests/cassettes/test_temporal/test_temporal_agent_run_stream_events.yaml new file mode 100644 index 0000000000..cec7cf108c --- /dev/null +++ b/tests/cassettes/test_temporal/test_temporal_agent_run_stream_events.yaml @@ -0,0 +1,78 @@ +interactions: +- request: + headers: + accept: + - application/json + accept-encoding: + - gzip, deflate + connection: + - keep-alive + content-length: + - '144' + content-type: + - application/json + host: + - api.openai.com + method: POST + parsed_body: + messages: + - content: What is the capital of Mexico? + role: user + model: gpt-4o + stream: true + stream_options: + include_usage: true + uri: https://api.openai.com/v1/chat/completions + response: + body: + string: |+ + data: {"id":"chatcmpl-CMidXKdPxbospUp4AUR74aqBOfcdf","object":"chat.completion.chunk","created":1759530835,"model":"gpt-4o-2024-08-06","service_tier":"default","system_fingerprint":"fp_cbf1785567","choices":[{"index":0,"delta":{"role":"assistant","content":"","refusal":null},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"LBj6qKSH7pm8jD"} + + data: {"id":"chatcmpl-CMidXKdPxbospUp4AUR74aqBOfcdf","object":"chat.completion.chunk","created":1759530835,"model":"gpt-4o-2024-08-06","service_tier":"default","system_fingerprint":"fp_cbf1785567","choices":[{"index":0,"delta":{"content":"The"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"VShDp33vM7vk6"} + + data: {"id":"chatcmpl-CMidXKdPxbospUp4AUR74aqBOfcdf","object":"chat.completion.chunk","created":1759530835,"model":"gpt-4o-2024-08-06","service_tier":"default","system_fingerprint":"fp_cbf1785567","choices":[{"index":0,"delta":{"content":" capital"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"bDyP4pbO"} + + data: {"id":"chatcmpl-CMidXKdPxbospUp4AUR74aqBOfcdf","object":"chat.completion.chunk","created":1759530835,"model":"gpt-4o-2024-08-06","service_tier":"default","system_fingerprint":"fp_cbf1785567","choices":[{"index":0,"delta":{"content":" of"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"YALupGELE00Iw"} + + data: {"id":"chatcmpl-CMidXKdPxbospUp4AUR74aqBOfcdf","object":"chat.completion.chunk","created":1759530835,"model":"gpt-4o-2024-08-06","service_tier":"default","system_fingerprint":"fp_cbf1785567","choices":[{"index":0,"delta":{"content":" Mexico"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"J0EmiR0Ea"} + + data: {"id":"chatcmpl-CMidXKdPxbospUp4AUR74aqBOfcdf","object":"chat.completion.chunk","created":1759530835,"model":"gpt-4o-2024-08-06","service_tier":"default","system_fingerprint":"fp_cbf1785567","choices":[{"index":0,"delta":{"content":" is"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"lrM9iPFrZHtNh"} + + data: {"id":"chatcmpl-CMidXKdPxbospUp4AUR74aqBOfcdf","object":"chat.completion.chunk","created":1759530835,"model":"gpt-4o-2024-08-06","service_tier":"default","system_fingerprint":"fp_cbf1785567","choices":[{"index":0,"delta":{"content":" Mexico"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"tKFAxYRHK"} + + data: {"id":"chatcmpl-CMidXKdPxbospUp4AUR74aqBOfcdf","object":"chat.completion.chunk","created":1759530835,"model":"gpt-4o-2024-08-06","service_tier":"default","system_fingerprint":"fp_cbf1785567","choices":[{"index":0,"delta":{"content":" City"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"o4BnDIaXctv"} + + data: {"id":"chatcmpl-CMidXKdPxbospUp4AUR74aqBOfcdf","object":"chat.completion.chunk","created":1759530835,"model":"gpt-4o-2024-08-06","service_tier":"default","system_fingerprint":"fp_cbf1785567","choices":[{"index":0,"delta":{"content":"."},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"0yk0uocAIGhLVOY"} + + data: {"id":"chatcmpl-CMidXKdPxbospUp4AUR74aqBOfcdf","object":"chat.completion.chunk","created":1759530835,"model":"gpt-4o-2024-08-06","service_tier":"default","system_fingerprint":"fp_cbf1785567","choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"stop"}],"usage":null,"obfuscation":"xoRfvFVwsC"} + + data: {"id":"chatcmpl-CMidXKdPxbospUp4AUR74aqBOfcdf","object":"chat.completion.chunk","created":1759530835,"model":"gpt-4o-2024-08-06","service_tier":"default","system_fingerprint":"fp_cbf1785567","choices":[],"usage":{"prompt_tokens":14,"completion_tokens":8,"total_tokens":22,"prompt_tokens_details":{"cached_tokens":0,"audio_tokens":0},"completion_tokens_details":{"reasoning_tokens":0,"audio_tokens":0,"accepted_prediction_tokens":0,"rejected_prediction_tokens":0}},"obfuscation":""} + + data: [DONE] + + headers: + access-control-expose-headers: + - X-Request-ID + alt-svc: + - h3=":443"; ma=86400 + connection: + - keep-alive + content-type: + - text/event-stream; charset=utf-8 + openai-organization: + - pydantic-28gund + openai-processing-ms: + - '233' + openai-project: + - proj_dKobscVY9YJxeEaDJen54e3d + openai-version: + - '2020-10-01' + strict-transport-security: + - max-age=31536000; includeSubDomains; preload + transfer-encoding: + - chunked + status: + code: 200 + message: OK +version: 1 +... diff --git a/tests/test_dbos.py b/tests/test_dbos.py index dfe95580b8..ff6c54f60a 100644 --- a/tests/test_dbos.py +++ b/tests/test_dbos.py @@ -2,6 +2,7 @@ import asyncio import os +import re import time import uuid from collections.abc import AsyncIterable, AsyncIterator, Generator, Iterator @@ -968,6 +969,18 @@ async def test_dbos_agent_run_stream(allow_model_requests: None): ) +async def test_dbos_agent_run_stream_events(allow_model_requests: None): + # This doesn't work because `run_stream_events` calls `run` internally, which is automatically wrapped in a DBOS workflow. + with pytest.raises( + UserError, + match=re.escape( + '`agent.run_stream_events()` cannot be used with DBOS. Set an `event_stream_handler` on the agent and use `agent.run()` instead.' + ), + ): + async for _ in simple_dbos_agent.run_stream_events('What is the capital of Mexico?'): + pass + + async def test_dbos_agent_iter(allow_model_requests: None): output: list[str] = [] async with simple_dbos_agent.iter('What is the capital of Mexico?') as run: @@ -1011,14 +1024,27 @@ async def run_stream_workflow(): with workflow_raises( UserError, snapshot( - '`agent.run_stream()` cannot currently be used inside a DBOS workflow. ' - 'Set an `event_stream_handler` on the agent and use `agent.run()` instead. ' - 'Please file an issue if this is not sufficient for your use case.' + '`agent.run_stream()` cannot be used inside a DBOS workflow. ' + 'Set an `event_stream_handler` on the agent and use `agent.run()` instead.' ), ): await run_stream_workflow() +async def test_dbos_agent_run_stream_events_in_workflow(allow_model_requests: None, dbos: DBOS): + @DBOS.workflow() + async def run_stream_events_workflow(): + return [event async for event in simple_dbos_agent.run_stream_events('What is the capital of Mexico?')] + + with workflow_raises( + UserError, + snapshot( + '`agent.run_stream_events()` cannot be used with DBOS. Set an `event_stream_handler` on the agent and use `agent.run()` instead.' + ), + ): + await run_stream_events_workflow() + + async def test_dbos_agent_iter_in_workflow(allow_model_requests: None, dbos: DBOS): # DBOS allows calling `iter` inside a workflow as a step. @DBOS.workflow() diff --git a/tests/test_examples.py b/tests/test_examples.py index aca1433d10..2724d6668b 100644 --- a/tests/test_examples.py +++ b/tests/test_examples.py @@ -325,6 +325,7 @@ async def call_tool( 'What is the capital of France?': 'The capital of France is Paris.', 'What is the capital of Italy?': 'The capital of Italy is Rome.', 'What is the capital of the UK?': 'The capital of the UK is London.', + 'What is the capital of Mexico?': 'The capital of Mexico is Mexico City.', 'Who was Albert Einstein?': 'Albert Einstein was a German-born theoretical physicist.', 'What was his most famous equation?': "Albert Einstein's most famous equation is (E = mc^2).", 'What is the date?': 'Hello Frank, the date today is 2032-01-02.', diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 5cdfd10dbb..82aff4eeda 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -15,6 +15,8 @@ from pydantic_ai import ( Agent, + AgentRunResult, + AgentRunResultEvent, AgentStreamEvent, FinalResultEvent, FunctionToolCallEvent, @@ -1575,3 +1577,39 @@ async def event_stream_handler(ctx: RunContext[None], stream: AsyncIterable[Agen FinalResultEvent(tool_name=None, tool_call_id=None), ] ) + + +async def test_run_stream_events(): + m = TestModel() + + test_agent = Agent(m) + assert test_agent.name is None + + @test_agent.tool_plain + async def ret_a(x: str) -> str: + return f'{x}-apple' + + events = [event async for event in test_agent.run_stream_events('Hello')] + + assert events == snapshot( + [ + PartStartEvent( + index=0, + part=ToolCallPart(tool_name='ret_a', args={'x': 'a'}, tool_call_id=IsStr()), + ), + FunctionToolCallEvent(part=ToolCallPart(tool_name='ret_a', args={'x': 'a'}, tool_call_id=IsStr())), + FunctionToolResultEvent( + result=ToolReturnPart( + tool_name='ret_a', + content='a-apple', + tool_call_id=IsStr(), + timestamp=IsNow(tz=timezone.utc), + ) + ), + PartStartEvent(index=0, part=TextPart(content='')), + FinalResultEvent(tool_name=None, tool_call_id=None), + PartDeltaEvent(index=0, delta=TextPartDelta(content_delta='{"ret_a":')), + PartDeltaEvent(index=0, delta=TextPartDelta(content_delta='"a-apple"}')), + AgentRunResultEvent(result=AgentRunResult(output='{"ret_a":"a-apple"}')), + ] + ) diff --git a/tests/test_temporal.py b/tests/test_temporal.py index 1f54e9fab7..f6a54d56be 100644 --- a/tests/test_temporal.py +++ b/tests/test_temporal.py @@ -14,6 +14,7 @@ from pydantic_ai import ( Agent, + AgentRunResultEvent, AgentStreamEvent, BinaryImage, ExternalToolset, @@ -30,6 +31,7 @@ RetryPromptPart, RunContext, TextPart, + TextPartDelta, ToolCallPart, ToolCallPartDelta, ToolReturnPart, @@ -1124,6 +1126,25 @@ async def test_temporal_agent_run_stream(allow_model_requests: None): ) +async def test_temporal_agent_run_stream_events(allow_model_requests: None): + events = [event async for event in simple_temporal_agent.run_stream_events('What is the capital of Mexico?')] + assert events == snapshot( + [ + PartStartEvent(index=0, part=TextPart(content='')), + FinalResultEvent(tool_name=None, tool_call_id=None), + PartDeltaEvent(index=0, delta=TextPartDelta(content_delta='The')), + PartDeltaEvent(index=0, delta=TextPartDelta(content_delta=' capital')), + PartDeltaEvent(index=0, delta=TextPartDelta(content_delta=' of')), + PartDeltaEvent(index=0, delta=TextPartDelta(content_delta=' Mexico')), + PartDeltaEvent(index=0, delta=TextPartDelta(content_delta=' is')), + PartDeltaEvent(index=0, delta=TextPartDelta(content_delta=' Mexico')), + PartDeltaEvent(index=0, delta=TextPartDelta(content_delta=' City')), + PartDeltaEvent(index=0, delta=TextPartDelta(content_delta='.')), + AgentRunResultEvent(result=AgentRunResult(output='The capital of Mexico is Mexico City.')), + ] + ) + + async def test_temporal_agent_iter(allow_model_requests: None): output: list[str] = [] async with simple_temporal_agent.iter('What is the capital of Mexico?') as run: @@ -1192,7 +1213,7 @@ async def test_temporal_agent_run_stream_in_workflow(allow_model_requests: None, with workflow_raises( UserError, snapshot( - '`agent.run_stream()` cannot currently be used inside a Temporal workflow. Set an `event_stream_handler` on the agent and use `agent.run()` instead. Please file an issue if this is not sufficient for your use case.' + '`agent.run_stream()` cannot be used inside a Temporal workflow. Set an `event_stream_handler` on the agent and use `agent.run()` instead.' ), ): await client.execute_workflow( # pyright: ignore[reportUnknownMemberType] @@ -1203,6 +1224,34 @@ async def test_temporal_agent_run_stream_in_workflow(allow_model_requests: None, ) +@workflow.defn +class SimpleAgentWorkflowWithRunStreamEvents: + @workflow.run + async def run(self, prompt: str) -> list[AgentStreamEvent | AgentRunResultEvent]: + return [event async for event in simple_temporal_agent.run_stream_events(prompt)] + + +async def test_temporal_agent_run_stream_events_in_workflow(allow_model_requests: None, client: Client): + async with Worker( + client, + task_queue=TASK_QUEUE, + workflows=[SimpleAgentWorkflowWithRunStreamEvents], + plugins=[AgentPlugin(simple_temporal_agent)], + ): + with workflow_raises( + UserError, + snapshot( + '`agent.run_stream_events()` cannot be used inside a Temporal workflow. Set an `event_stream_handler` on the agent and use `agent.run()` instead.' + ), + ): + await client.execute_workflow( # pyright: ignore[reportUnknownMemberType] + SimpleAgentWorkflowWithRunStreamEvents.run, + args=['What is the capital of Mexico?'], + id=SimpleAgentWorkflowWithRunStreamEvents.__name__, + task_queue=TASK_QUEUE, + ) + + @workflow.defn class SimpleAgentWorkflowWithIter: @workflow.run @@ -1223,7 +1272,7 @@ async def test_temporal_agent_iter_in_workflow(allow_model_requests: None, clien with workflow_raises( UserError, snapshot( - '`agent.iter()` cannot currently be used inside a Temporal workflow. Set an `event_stream_handler` on the agent and use `agent.run()` instead. Please file an issue if this is not sufficient for your use case.' + '`agent.iter()` cannot be used inside a Temporal workflow. Set an `event_stream_handler` on the agent and use `agent.run()` instead.' ), ): await client.execute_workflow( # pyright: ignore[reportUnknownMemberType] From 08d0a2af93923e979cecdfb001ba7e713fe6911f Mon Sep 17 00:00:00 2001 From: Douwe Maan Date: Fri, 3 Oct 2025 23:19:21 +0000 Subject: [PATCH 2/6] typo --- docs/agents.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/agents.md b/docs/agents.md index f0fa59056d..c29c1df14c 100644 --- a/docs/agents.md +++ b/docs/agents.md @@ -66,7 +66,7 @@ There are five ways to run an agent: 1. [`agent.run()`][pydantic_ai.agent.AbstractAgent.run] — an async function which returns a [`RunResult`][pydantic_ai.agent.AgentRunResult] containing a completed response. 2. [`agent.run_sync()`][pydantic_ai.agent.AbstractAgent.run_sync] — a plain, synchronous function which returns a [`RunResult`][pydantic_ai.agent.AgentRunResult] containing a completed response (internally, this just calls `loop.run_until_complete(self.run())`). 3. [`agent.run_stream()`][pydantic_ai.agent.AbstractAgent.run_stream] — an async context manager which returns a [`StreamedRunResult`][pydantic_ai.result.StreamedRunResult], which contains methods to stream text and structured output as an async iterable. -4. [`agent.run_stream_events()`][pydantic_ai.agent.AbstractAgent.run_stream_events] — an function which returns an async iterable of [`AgentStreamEvent`s][pydantic_ai.agent.AgentStreamEvent] and a [`AgentRunResultEvent`][pydantic_ai.agent.AgentRunResultEvent] containing the final run result. +4. [`agent.run_stream_events()`][pydantic_ai.agent.AbstractAgent.run_stream_events] — a function which returns an async iterable of [`AgentStreamEvent`s][pydantic_ai.agent.AgentStreamEvent] and a [`AgentRunResultEvent`][pydantic_ai.agent.AgentRunResultEvent] containing the final run result. 5. [`agent.iter()`][pydantic_ai.Agent.iter] — a context manager which returns an [`AgentRun`][pydantic_ai.agent.AgentRun], an async iterable over the nodes of the agent's underlying [`Graph`][pydantic_graph.graph.Graph]. Here's a simple example demonstrating the first four: From 2a30ff1ffe8b950073deb70c29f84b50a7016472 Mon Sep 17 00:00:00 2001 From: Douwe Maan Date: Fri, 3 Oct 2025 23:20:27 +0000 Subject: [PATCH 3/6] fix links --- docs/agents.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/agents.md b/docs/agents.md index c29c1df14c..cd917feafc 100644 --- a/docs/agents.md +++ b/docs/agents.md @@ -66,7 +66,7 @@ There are five ways to run an agent: 1. [`agent.run()`][pydantic_ai.agent.AbstractAgent.run] — an async function which returns a [`RunResult`][pydantic_ai.agent.AgentRunResult] containing a completed response. 2. [`agent.run_sync()`][pydantic_ai.agent.AbstractAgent.run_sync] — a plain, synchronous function which returns a [`RunResult`][pydantic_ai.agent.AgentRunResult] containing a completed response (internally, this just calls `loop.run_until_complete(self.run())`). 3. [`agent.run_stream()`][pydantic_ai.agent.AbstractAgent.run_stream] — an async context manager which returns a [`StreamedRunResult`][pydantic_ai.result.StreamedRunResult], which contains methods to stream text and structured output as an async iterable. -4. [`agent.run_stream_events()`][pydantic_ai.agent.AbstractAgent.run_stream_events] — a function which returns an async iterable of [`AgentStreamEvent`s][pydantic_ai.agent.AgentStreamEvent] and a [`AgentRunResultEvent`][pydantic_ai.agent.AgentRunResultEvent] containing the final run result. +4. [`agent.run_stream_events()`][pydantic_ai.agent.AbstractAgent.run_stream_events] — a function which returns an async iterable of [`AgentStreamEvent`s][pydantic_ai.messages.AgentStreamEvent] and a [`AgentRunResultEvent`][pydantic_ai.run.AgentRunResultEvent] containing the final run result. 5. [`agent.iter()`][pydantic_ai.Agent.iter] — a context manager which returns an [`AgentRun`][pydantic_ai.agent.AgentRun], an async iterable over the nodes of the agent's underlying [`Graph`][pydantic_graph.graph.Graph]. Here's a simple example demonstrating the first four: @@ -228,7 +228,7 @@ Like `agent.run_stream()`, [`agent.run()`][pydantic_ai.agent.AbstractAgent.run_s argument that lets you stream all events from the model's streaming response and the agent's execution of tools. Unlike `run_stream()`, it always runs the agent graph to completion even if text was received ahead of tool calls that looked like it could've been the final result. -For convenience, a [`agent.run_stream_events()`][pydantic_ai.agent.AbstractAgent.run_stream_events] method is also available as a wrapper around `run(event_stream_handler=...)`, which returns an async iterable of [`AgentStreamEvent`s][pydantic_ai.agent.AgentStreamEvent] and a [`AgentRunResultEvent`][pydantic_ai.agent.AgentRunResultEvent] containing the final run result. +For convenience, a [`agent.run_stream_events()`][pydantic_ai.agent.AbstractAgent.run_stream_events] method is also available as a wrapper around `run(event_stream_handler=...)`, which returns an async iterable of [`AgentStreamEvent`s][pydantic_ai.messages.AgentStreamEvent] and a [`AgentRunResultEvent`][pydantic_ai.run.AgentRunResultEvent] containing the final run result. !!! note As they return raw events as they come in, the `run_stream_events()` and `run(event_stream_handler=...)` methods require you to piece together the streamed text and structured output yourself from the `PartStartEvent` and subsequent `PartDeltaEvent`s. From 79d347dbdeeb94e9f4e06911c2f296b76eace353 Mon Sep 17 00:00:00 2001 From: Douwe Maan Date: Fri, 3 Oct 2025 23:24:54 +0000 Subject: [PATCH 4/6] add pydantic_ai.run to api docs --- docs/api/run.md | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 docs/api/run.md diff --git a/docs/api/run.md b/docs/api/run.md new file mode 100644 index 0000000000..5ab4d423fa --- /dev/null +++ b/docs/api/run.md @@ -0,0 +1,3 @@ +# `pydantic_ai.run` + +::: pydantic_ai.run From 325f6f9d38c4e41cc0b2361e10a5e724f45a7826 Mon Sep 17 00:00:00 2001 From: Douwe Maan Date: Fri, 3 Oct 2025 23:27:44 +0000 Subject: [PATCH 5/6] fix docstring examples --- pydantic_ai_slim/pydantic_ai/agent/abstract.py | 16 ++++++++++++++-- .../pydantic_ai/durable_exec/dbos/_agent.py | 16 ++++++++++++++-- .../pydantic_ai/durable_exec/temporal/_agent.py | 16 ++++++++++++++-- 3 files changed, 42 insertions(+), 6 deletions(-) diff --git a/pydantic_ai_slim/pydantic_ai/agent/abstract.py b/pydantic_ai_slim/pydantic_ai/agent/abstract.py index e28c975eaa..c5d59f7561 100644 --- a/pydantic_ai_slim/pydantic_ai/agent/abstract.py +++ b/pydantic_ai_slim/pydantic_ai/agent/abstract.py @@ -610,13 +610,25 @@ def run_stream_events( Example: ```python - from pydantic_ai import Agent + from pydantic_ai import Agent, AgentRunResultEvent, AgentStreamEvent agent = Agent('openai:gpt-4o') async def main(): + events: list[AgentStreamEvent | AgentRunResultEvent] = [] async for event in agent.run_stream_events('What is the capital of France?'): - print(event) + events.append(event) + print(events) + ''' + [ + PartStartEvent(index=0, part=TextPart(content='The capital of ')), + FinalResultEvent(tool_name=None, tool_call_id=None), + PartDeltaEvent(index=0, delta=TextPartDelta(content_delta='France is Paris. ')), + AgentRunResultEvent( + result=AgentRunResult(output='The capital of France is Paris. ') + ), + ] + ''' ``` Arguments are the same as for [`self.run`][pydantic_ai.agent.AbstractAgent.run], diff --git a/pydantic_ai_slim/pydantic_ai/durable_exec/dbos/_agent.py b/pydantic_ai_slim/pydantic_ai/durable_exec/dbos/_agent.py index 89ac4ed84e..eb609cfc4a 100644 --- a/pydantic_ai_slim/pydantic_ai/durable_exec/dbos/_agent.py +++ b/pydantic_ai_slim/pydantic_ai/durable_exec/dbos/_agent.py @@ -603,13 +603,25 @@ def run_stream_events( Example: ```python - from pydantic_ai import Agent + from pydantic_ai import Agent, AgentRunResultEvent, AgentStreamEvent agent = Agent('openai:gpt-4o') async def main(): + events: list[AgentStreamEvent | AgentRunResultEvent] = [] async for event in agent.run_stream_events('What is the capital of France?'): - print(event) + events.append(event) + print(events) + ''' + [ + PartStartEvent(index=0, part=TextPart(content='The capital of ')), + FinalResultEvent(tool_name=None, tool_call_id=None), + PartDeltaEvent(index=0, delta=TextPartDelta(content_delta='France is Paris. ')), + AgentRunResultEvent( + result=AgentRunResult(output='The capital of France is Paris. ') + ), + ] + ''' ``` Arguments are the same as for [`self.run`][pydantic_ai.agent.AbstractAgent.run], diff --git a/pydantic_ai_slim/pydantic_ai/durable_exec/temporal/_agent.py b/pydantic_ai_slim/pydantic_ai/durable_exec/temporal/_agent.py index d08c7dff61..5338f228a9 100644 --- a/pydantic_ai_slim/pydantic_ai/durable_exec/temporal/_agent.py +++ b/pydantic_ai_slim/pydantic_ai/durable_exec/temporal/_agent.py @@ -636,13 +636,25 @@ def run_stream_events( Example: ```python - from pydantic_ai import Agent + from pydantic_ai import Agent, AgentRunResultEvent, AgentStreamEvent agent = Agent('openai:gpt-4o') async def main(): + events: list[AgentStreamEvent | AgentRunResultEvent] = [] async for event in agent.run_stream_events('What is the capital of France?'): - print(event) + events.append(event) + print(events) + ''' + [ + PartStartEvent(index=0, part=TextPart(content='The capital of ')), + FinalResultEvent(tool_name=None, tool_call_id=None), + PartDeltaEvent(index=0, delta=TextPartDelta(content_delta='France is Paris. ')), + AgentRunResultEvent( + result=AgentRunResult(output='The capital of France is Paris. ') + ), + ] + ''' ``` Arguments are the same as for [`self.run`][pydantic_ai.agent.AbstractAgent.run], From 313728cbb98ace36ffb2005eaccc5ab3f58bbf38 Mon Sep 17 00:00:00 2001 From: Douwe Maan Date: Fri, 3 Oct 2025 23:31:04 +0000 Subject: [PATCH 6/6] add api/run to docs nav --- mkdocs.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/mkdocs.yml b/mkdocs.yml index 58c60a717d..b35189a8ab 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -131,6 +131,7 @@ nav: - api/profiles.md - api/providers.md - api/retries.md + - api/run.md - pydantic_evals: - api/pydantic_evals/dataset.md - api/pydantic_evals/evaluators.md