Skip to content

Add low_level public API #1599

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/api/low_level.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# `pydantic_ai.low_level`

::: pydantic_ai.low_level
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ nav:
- api/usage.md
- api/mcp.md
- api/format_as_xml.md
- api/low_level.md
- api/models/base.md
- api/models/openai.md
- api/models/anthropic.md
Expand Down
4 changes: 2 additions & 2 deletions pydantic_ai_slim/pydantic_ai/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class Agent(Generic[AgentDepsT, OutputDataT]):
model: models.Model | models.KnownModelName | str | None
"""The default model configured for this agent.

We allow str here since the actual list of allowed models changes frequently.
We allow `str` here since the actual list of allowed models changes frequently.
"""

name: str | None
Expand Down Expand Up @@ -224,7 +224,7 @@ def __init__(

Args:
model: The default model to use for this agent, if not provide,
you must provide the model when calling it. We allow str here since the actual list of allowed models changes frequently.
you must provide the model when calling it. We allow `str` here since the actual list of allowed models changes frequently.
output_type: The type of the output data, used to validate the data returned by the model,
defaults to `str`.
instructions: Instructions to use for this agent, you can also register instructions via a function with
Expand Down
190 changes: 190 additions & 0 deletions pydantic_ai_slim/pydantic_ai/low_level.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
"""Low-level methods to make requests directly to models with minimal abstraction.

These methods allow you to make requests to LLMs where the only abstraction is input and output schema
translation so you can request all models with the same API.

These methods are thin wrappers around [`Model`][pydantic_ai.models.Model] implementations.
"""

from __future__ import annotations as _annotations

from collections.abc import AsyncIterator
from contextlib import asynccontextmanager

from pydantic_graph._utils import get_event_loop as _get_event_loop

from . import messages, models, settings, usage


async def model_request(
model: models.Model | models.KnownModelName | str,
messages: list[messages.ModelMessage],
*,
model_settings: settings.ModelSettings | None = None,
model_request_parameters: models.ModelRequestParameters | None = None,
) -> tuple[messages.ModelResponse, usage.Usage]:
"""Make a non-streamed request to a model.

This method is roughly equivalent to [`Agent.run`][pydantic_ai.Agent.run].

```py title="model_request_example.py"
from pydantic_ai.low_level import model_request
from pydantic_ai.messages import ModelRequest


async def main():
model_response, request_usage = await model_request(
'anthropic:claude-3-5-haiku-latest',
[ModelRequest.user_text_prompt('What is the capital of France?')] # (1)!
)
print(model_response)
'''
ModelResponse(
parts=[TextPart(content='Paris', part_kind='text')],
model_name='claude-3-5-haiku-latest',
timestamp=datetime.datetime(...),
kind='response',
)
'''
print(request_usage)
'''
Usage(
requests=0, request_tokens=56, response_tokens=1, total_tokens=57, details=None
)
'''
```

1. See [`ModelRequest.user_text_prompt`][pydantic_ai.messages.ModelRequest.user_text_prompt] for details.

Then

Args:
model: The model to make a request to. We allow `str` here since the actual list of allowed models changes frequently.
messages: Messages to send to the model
model_settings: optional model settings
model_request_parameters: optional model request parameters

Returns:
The model response and token usage associated with the request.
"""
model_instance = models.infer_model(model)
return await model_instance.request(
messages,
model_settings,
model_request_parameters or models.ModelRequestParameters(),
)


def model_request_sync(
model: models.Model | models.KnownModelName | str,
messages: list[messages.ModelMessage],
*,
model_settings: settings.ModelSettings | None = None,
model_request_parameters: models.ModelRequestParameters | None = None,
) -> tuple[messages.ModelResponse, usage.Usage]:
"""Make a Synchronous, non-streamed request to a model.

This is a convenience method that wraps [`model_request`][pydantic_ai.low_level.model_request] with
`loop.run_until_complete(...)`. You therefore can't use this method inside async code or if there's an active event loop.

This method is roughly equivalent to [`Agent.run_sync`][pydantic_ai.Agent.run_sync].


```py title="model_request_sync_example.py"
from pydantic_ai.low_level import model_request_sync
from pydantic_ai.messages import ModelRequest

model_response, _ = model_request_sync(
'anthropic:claude-3-5-haiku-latest',
[ModelRequest.user_text_prompt('What is the capital of France?')]
)
print(model_response)
'''
ModelResponse(
parts=[TextPart(content='Paris', part_kind='text')],
model_name='claude-3-5-haiku-latest',
timestamp=datetime.datetime(...),
kind='response',
)
'''
```

Args:
model: The model to make a request to. We allow `str` here since the actual list of allowed models changes frequently.
messages: Messages to send to the model
model_settings: optional model settings
model_request_parameters: optional model request parameters

Returns:
The model response and token usage associated with the request.
"""
return _get_event_loop().run_until_complete(
model_request(model, messages, model_settings=model_settings, model_request_parameters=model_request_parameters)
)


@asynccontextmanager
async def model_request_stream(
model: models.Model | models.KnownModelName | str,
messages: list[messages.ModelMessage],
*,
model_settings: settings.ModelSettings | None = None,
model_request_parameters: models.ModelRequestParameters | None = None,
) -> AsyncIterator[models.StreamedResponse]:
"""Make a streamed request to a model.

This method is roughly equivalent to [`Agent.run_stream`][pydantic_ai.Agent.run_stream].

```py {title="model_request_stream_example.py"}

from pydantic_ai.low_level import model_request_stream
from pydantic_ai.messages import ModelRequest


async def main():
messages = [ModelRequest.user_text_prompt('Who was Albert Einstein?')]
async with model_request_stream( 'openai:gpt-4.1-mini', messages) as stream:
chunks = []
async for chunk in stream:
chunks.append(chunk)
print(chunks)
'''
[
PartStartEvent(
index=0,
part=TextPart(content='Albert Einstein was ', part_kind='text'),
event_kind='part_start',
),
PartDeltaEvent(
index=0,
delta=TextPartDelta(
content_delta='a German-born theoretical ', part_delta_kind='text'
),
event_kind='part_delta',
),
PartDeltaEvent(
index=0,
delta=TextPartDelta(content_delta='physicist.', part_delta_kind='text'),
event_kind='part_delta',
),
]
'''
```

Args:
model: The model to make a request to. We allow `str` here since the actual list of allowed models changes frequently.
messages: Messages to send to the model
model_settings: optional model settings
model_request_parameters: optional model request parameters

Returns:
A [stream response][pydantic_ai.models.StreamedResponse] async context manager.
"""
model_instance = models.infer_model(model)
stream_cxt_mgr = model_instance.request_stream(
messages,
model_settings,
model_request_parameters or models.ModelRequestParameters(),
)
async with stream_cxt_mgr as streamed_response:
yield streamed_response
5 changes: 5 additions & 0 deletions pydantic_ai_slim/pydantic_ai/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,11 @@ class ModelRequest:
kind: Literal['request'] = 'request'
"""Message type identifier, this is available on all parts as a discriminator."""

@classmethod
def user_text_prompt(cls, user_prompt: str, *, instructions: str | None = None) -> ModelRequest:
"""Create a `ModelRequest` with a single user prompt as text."""
return cls(parts=[UserPromptPart(user_prompt)], instructions=instructions)


@dataclass
class TextPart:
Expand Down
6 changes: 3 additions & 3 deletions pydantic_ai_slim/pydantic_ai/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,9 @@
class ModelRequestParameters:
"""Configuration for an agent's request to a model, specifically related to tools and output handling."""

function_tools: list[ToolDefinition]
allow_text_output: bool
output_tools: list[ToolDefinition]
function_tools: list[ToolDefinition] = field(default_factory=list)
allow_text_output: bool = True
output_tools: list[ToolDefinition] = field(default_factory=list)


class Model(ABC):
Expand Down
79 changes: 79 additions & 0 deletions tests/test_low_level.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from datetime import timezone

import pytest
from inline_snapshot import snapshot

from pydantic_ai.low_level import model_request, model_request_stream, model_request_sync
from pydantic_ai.messages import (
ModelRequest,
ModelResponse,
PartDeltaEvent,
PartStartEvent,
TextPart,
TextPartDelta,
ToolCallPart,
)
from pydantic_ai.models import ModelRequestParameters
from pydantic_ai.tools import ToolDefinition
from pydantic_ai.usage import Usage

from .conftest import IsNow, IsStr

pytestmark = pytest.mark.anyio


async def test_model_request():
model_response, request_usage = await model_request('test', [ModelRequest.user_text_prompt('x')])
assert model_response == snapshot(
ModelResponse(
parts=[TextPart(content='success (no tool calls)')],
model_name='test',
timestamp=IsNow(tz=timezone.utc),
)
)
assert request_usage == snapshot(Usage(request_tokens=51, response_tokens=4, total_tokens=55))


async def test_model_request_tool_call():
model_response, request_usage = await model_request(
'test',
[ModelRequest.user_text_prompt('x')],
model_request_parameters=ModelRequestParameters(
function_tools=[ToolDefinition(name='tool_name', description='', parameters_json_schema={})],
allow_text_output=False,
),
)
assert model_response == snapshot(
ModelResponse(
parts=[ToolCallPart(tool_name='tool_name', args='a', tool_call_id=IsStr(regex='pyd_ai_.*'))],
model_name='test',
timestamp=IsNow(tz=timezone.utc),
)
)
assert request_usage == snapshot(Usage(request_tokens=51, response_tokens=2, total_tokens=53))


def test_model_request_sync():
model_response, request_usage = model_request_sync('test', [ModelRequest.user_text_prompt('x')])
assert model_response == snapshot(
ModelResponse(
parts=[TextPart(content='success (no tool calls)')],
model_name='test',
timestamp=IsNow(tz=timezone.utc),
)
)
assert request_usage == snapshot(Usage(request_tokens=51, response_tokens=4, total_tokens=55))


async def test_model_request_stream():
async with model_request_stream('test', [ModelRequest.user_text_prompt('x')]) as stream:
chunks = [chunk async for chunk in stream]
assert chunks == snapshot(
[
PartStartEvent(index=0, part=TextPart(content='')),
PartDeltaEvent(index=0, delta=TextPartDelta(content_delta='success ')),
PartDeltaEvent(index=0, delta=TextPartDelta(content_delta='(no ')),
PartDeltaEvent(index=0, delta=TextPartDelta(content_delta='tool ')),
PartDeltaEvent(index=0, delta=TextPartDelta(content_delta='calls)')),
]
)