diff --git a/docs/api/low_level.md b/docs/api/low_level.md new file mode 100644 index 000000000..8ed6ffb70 --- /dev/null +++ b/docs/api/low_level.md @@ -0,0 +1,3 @@ +# `pydantic_ai.low_level` + +::: pydantic_ai.low_level diff --git a/docs/low_level.md b/docs/low_level.md new file mode 100644 index 000000000..64004554f --- /dev/null +++ b/docs/low_level.md @@ -0,0 +1,136 @@ +# Low-Level Model Requests + +The low-level module provides direct access to language models with minimal abstraction. These methods allow you to make requests to LLMs where the only abstraction is input and output schema translation, enabling you to request all models with the same API. + +These methods are thin wrappers around the [`Model`][pydantic_ai.models.Model] implementations, offering a simpler interface when you don't need the full functionality of an [`Agent`][pydantic_ai.Agent]. + +The following functions are available: + +- [`model_request`][pydantic_ai.low_level.model_request]: Make a non-streamed async request to a model +- [`model_request_sync`][pydantic_ai.low_level.model_request_sync]: Make a synchronous non-streamed request to a model +- [`model_request_stream`][pydantic_ai.low_level.model_request_stream]: Make a streamed async request to a model + +## Basic Example + +Here's a simple example demonstrating how to use the low-level API to make a basic request: + +```python title="low_level_basic.py" +from pydantic_ai.low_level import model_request_sync +from pydantic_ai.messages import ModelRequest + +# Make a synchronous request to the model +model_response, usage_info = model_request_sync( + 'anthropic:claude-3-5-haiku-latest', + [ModelRequest.user_text_prompt('What is the capital of France?')] +) + +print(model_response.parts[0].content) +#> Paris +print(usage_info) +""" +Usage(requests=0, request_tokens=56, response_tokens=1, total_tokens=57, details=None) +""" +``` + +## Advanced Example with Tool Calling + +You can also use the low-level API to work with function/tool calling. + +Even here we can use Pydantic to generate the JSON schema for the tool: + +```python +from pydantic import BaseModel +from typing_extensions import Literal + +from pydantic_ai.low_level import model_request +from pydantic_ai.messages import ModelRequest +from pydantic_ai.models import ModelRequestParameters +from pydantic_ai.tools import ToolDefinition + + +class Divide(BaseModel): + """Divide two numbers.""" + + numerator: float + denominator: float + on_inf: Literal['error', 'infinity'] = 'infinity' + + +async def main(): + # Make a request to the model with tool access + model_response, cost = await model_request( + 'openai:gpt-4.1-nano', + [ModelRequest.user_text_prompt('What is 123 / 456?')], + model_request_parameters=ModelRequestParameters( + function_tools=[ + ToolDefinition( + name=Divide.__name__.lower(), + description=Divide.__doc__ or '', + parameters_json_schema=Divide.model_json_schema(), + ) + ], + allow_text_output=True, # Allow model to either use tools or respond directly + ), + ) + print(model_response) + """ + ModelResponse( + parts=[ + ToolCallPart( + tool_name='divide', + args={'numerator': '123', 'denominator': '456'}, + tool_call_id='pyd_ai_2e0e396768a14fe482df90a29a78dc7b', + part_kind='tool-call', + ) + ], + model_name='gpt-4.1-nano', + timestamp=datetime.datetime(...), + kind='response', + ) + """ + print(cost) + """ + Usage( + requests=0, request_tokens=55, response_tokens=7, total_tokens=62, details=None + ) + """ +``` + +## When to Use Low-Level API vs Agent + +The low-level API is ideal when: + +1. You need more direct control over model interactions +2. You want to implement custom behavior around model requests +3. You're building your own abstractions on top of model interactions + +For most application use cases, the higher-level [`Agent`][pydantic_ai.Agent] API provides a more convenient interface with additional features such as built-in tool execution, structured output parsing, and more. + +## OpenTelemetry Instrumentation + +As with [agents][pydantic_ai.Agent] you can enable OpenTelemetry/logfire instrumentation with just a few extra lines + +```python {title="low_level_instrumented.py" hl_lines="1 6 7"} +import logfire + +from pydantic_ai.low_level import model_request_sync +from pydantic_ai.messages import ModelRequest + +logfire.configure() +logfire.instrument_pydantic_ai() + +# Make a synchronous request to the model +model_response, usage_info = model_request_sync( + 'anthropic:claude-3-5-haiku-latest', + [ModelRequest.user_text_prompt('What is the capital of France?')] +) + +print(model_response.parts[0].content) +#> Paris +print(usage_info) +""" +Usage(requests=0, request_tokens=56, response_tokens=1, total_tokens=57, details=None) +""" +``` + +See [Debugging and Monitoring](logfire.md) for more details. diff --git a/mkdocs.yml b/mkdocs.yml index 02cf3e703..d137a63bf 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -38,6 +38,7 @@ nav: - graph.md - evals.md - input.md + - low_level.md - MCP: - mcp/index.md - mcp/client.md @@ -67,6 +68,7 @@ nav: - api/usage.md - api/mcp.md - api/format_as_xml.md + - api/low_level.md - api/models/base.md - api/models/openai.md - api/models/anthropic.md @@ -208,6 +210,7 @@ plugins: # 3 because docs are in pages with an H2 just above them heading_level: 3 import: + - url: https://logfire.pydantic.dev/docs/objects.inv - url: https://docs.python.org/3/objects.inv - url: https://docs.pydantic.dev/latest/objects.inv - url: https://dirty-equals.helpmanual.io/latest/objects.inv diff --git a/pydantic_ai_slim/pydantic_ai/agent.py b/pydantic_ai_slim/pydantic_ai/agent.py index 2dc880fda..669e1f249 100644 --- a/pydantic_ai_slim/pydantic_ai/agent.py +++ b/pydantic_ai_slim/pydantic_ai/agent.py @@ -27,7 +27,7 @@ result, usage as _usage, ) -from .models.instrumented import InstrumentationSettings, InstrumentedModel +from .models.instrumented import InstrumentationSettings, InstrumentedModel, instrument_model from .result import FinalResult, OutputDataT, StreamedRunResult, ToolOutput from .settings import ModelSettings, merge_model_settings from .tools import ( @@ -99,7 +99,7 @@ class Agent(Generic[AgentDepsT, OutputDataT]): model: models.Model | models.KnownModelName | str | None """The default model configured for this agent. - We allow str here since the actual list of allowed models changes frequently. + We allow `str` here since the actual list of allowed models changes frequently. """ name: str | None @@ -224,7 +224,7 @@ def __init__( Args: model: The default model to use for this agent, if not provide, - you must provide the model when calling it. We allow str here since the actual list of allowed models changes frequently. + you must provide the model when calling it. We allow `str` here since the actual list of allowed models changes frequently. output_type: The type of the output data, used to validate the data returned by the model, defaults to `str`. instructions: Instructions to use for this agent, you can also register instructions via a function with @@ -1512,13 +1512,7 @@ def _get_model(self, model: models.Model | models.KnownModelName | str | None) - if instrument is None: instrument = self._instrument_default - if instrument and not isinstance(model_, InstrumentedModel): - if instrument is True: - instrument = InstrumentationSettings() - - model_ = InstrumentedModel(model_, instrument) - - return model_ + return instrument_model(model_, instrument) def _get_deps(self: Agent[T, OutputDataT], deps: T) -> T: """Get deps for a run. diff --git a/pydantic_ai_slim/pydantic_ai/low_level.py b/pydantic_ai_slim/pydantic_ai/low_level.py new file mode 100644 index 000000000..8da449dec --- /dev/null +++ b/pydantic_ai_slim/pydantic_ai/low_level.py @@ -0,0 +1,220 @@ +"""Low-level methods to make requests directly to models with minimal abstraction. + +These methods allow you to make requests to LLMs where the only abstraction is input and output schema +translation so you can request all models with the same API. + +These methods are thin wrappers around [`Model`][pydantic_ai.models.Model] implementations. +""" + +from __future__ import annotations as _annotations + +from collections.abc import AsyncIterator +from contextlib import asynccontextmanager + +from pydantic_graph._utils import get_event_loop as _get_event_loop + +from . import agent, messages, models, settings, usage +from .models import instrumented as instrumented_models + +__all__ = 'model_request', 'model_request_sync', 'model_request_stream' + + +async def model_request( + model: models.Model | models.KnownModelName | str, + messages: list[messages.ModelMessage], + *, + model_settings: settings.ModelSettings | None = None, + model_request_parameters: models.ModelRequestParameters | None = None, + instrument: instrumented_models.InstrumentationSettings | bool | None = None, +) -> tuple[messages.ModelResponse, usage.Usage]: + """Make a non-streamed request to a model. + + This method is roughly equivalent to [`Agent.run`][pydantic_ai.Agent.run]. + + ```py title="model_request_example.py" + from pydantic_ai.low_level import model_request + from pydantic_ai.messages import ModelRequest + + + async def main(): + model_response, request_usage = await model_request( + 'anthropic:claude-3-5-haiku-latest', + [ModelRequest.user_text_prompt('What is the capital of France?')] # (1)! + ) + print(model_response) + ''' + ModelResponse( + parts=[TextPart(content='Paris', part_kind='text')], + model_name='claude-3-5-haiku-latest', + timestamp=datetime.datetime(...), + kind='response', + ) + ''' + print(request_usage) + ''' + Usage( + requests=0, request_tokens=56, response_tokens=1, total_tokens=57, details=None + ) + ''' + ``` + + 1. See [`ModelRequest.user_text_prompt`][pydantic_ai.messages.ModelRequest.user_text_prompt] for details. + + Then + + Args: + model: The model to make a request to. We allow `str` here since the actual list of allowed models changes frequently. + messages: Messages to send to the model + model_settings: optional model settings + model_request_parameters: optional model request parameters + instrument: Whether to instrument the request with OpenTelemetry/logfire, if `None` the value from + [`logfire.instrument_pydantic_ai`][logfire.Logfire.instrument_pydantic_ai] is used. + + Returns: + The model response and token usage associated with the request. + """ + model_instance = _prepare_model(model, instrument) + return await model_instance.request( + messages, + model_settings, + model_request_parameters or models.ModelRequestParameters(), + ) + + +def model_request_sync( + model: models.Model | models.KnownModelName | str, + messages: list[messages.ModelMessage], + *, + model_settings: settings.ModelSettings | None = None, + model_request_parameters: models.ModelRequestParameters | None = None, + instrument: instrumented_models.InstrumentationSettings | bool | None = None, +) -> tuple[messages.ModelResponse, usage.Usage]: + """Make a Synchronous, non-streamed request to a model. + + This is a convenience method that wraps [`model_request`][pydantic_ai.low_level.model_request] with + `loop.run_until_complete(...)`. You therefore can't use this method inside async code or if there's an active event loop. + + This method is roughly equivalent to [`Agent.run_sync`][pydantic_ai.Agent.run_sync]. + + + ```py title="model_request_sync_example.py" + from pydantic_ai.low_level import model_request_sync + from pydantic_ai.messages import ModelRequest + + model_response, _ = model_request_sync( + 'anthropic:claude-3-5-haiku-latest', + [ModelRequest.user_text_prompt('What is the capital of France?')] + ) + print(model_response) + ''' + ModelResponse( + parts=[TextPart(content='Paris', part_kind='text')], + model_name='claude-3-5-haiku-latest', + timestamp=datetime.datetime(...), + kind='response', + ) + ''' + ``` + + Args: + model: The model to make a request to. We allow `str` here since the actual list of allowed models changes frequently. + messages: Messages to send to the model + model_settings: optional model settings + model_request_parameters: optional model request parameters + instrument: Whether to instrument the request with OpenTelemetry/logfire, if `None` the value from + [`logfire.instrument_pydantic_ai`][logfire.Logfire.instrument_pydantic_ai] is used. + + Returns: + The model response and token usage associated with the request. + """ + return _get_event_loop().run_until_complete( + model_request( + model, + messages, + model_settings=model_settings, + model_request_parameters=model_request_parameters, + instrument=instrument, + ) + ) + + +@asynccontextmanager +async def model_request_stream( + model: models.Model | models.KnownModelName | str, + messages: list[messages.ModelMessage], + *, + model_settings: settings.ModelSettings | None = None, + model_request_parameters: models.ModelRequestParameters | None = None, + instrument: instrumented_models.InstrumentationSettings | bool | None = None, +) -> AsyncIterator[models.StreamedResponse]: + """Make a streamed async request to a model. + + This method is roughly equivalent to [`Agent.run_stream`][pydantic_ai.Agent.run_stream]. + + ```py {title="model_request_stream_example.py"} + + from pydantic_ai.low_level import model_request_stream + from pydantic_ai.messages import ModelRequest + + + async def main(): + messages = [ModelRequest.user_text_prompt('Who was Albert Einstein?')] + async with model_request_stream( 'openai:gpt-4.1-mini', messages) as stream: + chunks = [] + async for chunk in stream: + chunks.append(chunk) + print(chunks) + ''' + [ + PartStartEvent( + index=0, + part=TextPart(content='Albert Einstein was ', part_kind='text'), + event_kind='part_start', + ), + PartDeltaEvent( + index=0, + delta=TextPartDelta( + content_delta='a German-born theoretical ', part_delta_kind='text' + ), + event_kind='part_delta', + ), + PartDeltaEvent( + index=0, + delta=TextPartDelta(content_delta='physicist.', part_delta_kind='text'), + event_kind='part_delta', + ), + ] + ''' + ``` + + Args: + model: The model to make a request to. We allow `str` here since the actual list of allowed models changes frequently. + messages: Messages to send to the model + model_settings: optional model settings + model_request_parameters: optional model request parameters + instrument: Whether to instrument the request with OpenTelemetry/logfire, if `None` the value from + [`logfire.instrument_pydantic_ai`][logfire.Logfire.instrument_pydantic_ai] is used. + + Returns: + A [stream response][pydantic_ai.models.StreamedResponse] async context manager. + """ + model_instance = _prepare_model(model, instrument) + stream_cxt_mgr = model_instance.request_stream( + messages, + model_settings, + model_request_parameters or models.ModelRequestParameters(), + ) + async with stream_cxt_mgr as streamed_response: + yield streamed_response + + +def _prepare_model( + model: models.Model | models.KnownModelName | str, + instrument: instrumented_models.InstrumentationSettings | bool | None, +) -> models.Model: + model_instance = models.infer_model(model) + + if instrument is None: + instrument = agent.Agent._instrument_default # pyright: ignore[reportPrivateUsage] + + return instrumented_models.instrument_model(model_instance, instrument) diff --git a/pydantic_ai_slim/pydantic_ai/messages.py b/pydantic_ai_slim/pydantic_ai/messages.py index 48eb82f65..fba79426f 100644 --- a/pydantic_ai_slim/pydantic_ai/messages.py +++ b/pydantic_ai_slim/pydantic_ai/messages.py @@ -470,6 +470,11 @@ class ModelRequest: kind: Literal['request'] = 'request' """Message type identifier, this is available on all parts as a discriminator.""" + @classmethod + def user_text_prompt(cls, user_prompt: str, *, instructions: str | None = None) -> ModelRequest: + """Create a `ModelRequest` with a single user prompt as text.""" + return cls(parts=[UserPromptPart(user_prompt)], instructions=instructions) + @dataclass class TextPart: diff --git a/pydantic_ai_slim/pydantic_ai/models/__init__.py b/pydantic_ai_slim/pydantic_ai/models/__init__.py index 6e842e3b4..4517d93be 100644 --- a/pydantic_ai_slim/pydantic_ai/models/__init__.py +++ b/pydantic_ai_slim/pydantic_ai/models/__init__.py @@ -264,9 +264,9 @@ class ModelRequestParameters: """Configuration for an agent's request to a model, specifically related to tools and output handling.""" - function_tools: list[ToolDefinition] - allow_text_output: bool - output_tools: list[ToolDefinition] + function_tools: list[ToolDefinition] = field(default_factory=list) + allow_text_output: bool = True + output_tools: list[ToolDefinition] = field(default_factory=list) class Model(ABC): diff --git a/pydantic_ai_slim/pydantic_ai/models/instrumented.py b/pydantic_ai_slim/pydantic_ai/models/instrumented.py index 20fa568e9..69df21b5c 100644 --- a/pydantic_ai_slim/pydantic_ai/models/instrumented.py +++ b/pydantic_ai_slim/pydantic_ai/models/instrumented.py @@ -27,6 +27,8 @@ from . import KnownModelName, Model, ModelRequestParameters, StreamedResponse from .wrapper import WrapperModel +__all__ = 'instrument_model', 'InstrumentationSettings', 'InstrumentedModel' + MODEL_SETTING_ATTRIBUTES: tuple[ Literal[ 'max_tokens', @@ -49,6 +51,17 @@ ANY_ADAPTER = TypeAdapter[Any](Any) +def instrument_model(model: Model, instrument: InstrumentationSettings | bool) -> Model: + """Instrument a model with OpenTelemetry/logfire.""" + if instrument and not isinstance(model, InstrumentedModel): + if instrument is True: + instrument = InstrumentationSettings() + + model = InstrumentedModel(model, instrument) + + return model + + @dataclass(init=False) class InstrumentationSettings: """Options for instrumenting models and agents with OpenTelemetry. diff --git a/tests/test_examples.py b/tests/test_examples.py index a0591e211..66d6a9d1a 100644 --- a/tests/test_examples.py +++ b/tests/test_examples.py @@ -387,6 +387,11 @@ async def list_tools() -> list[None]: 'steps': ['Mix the ingredients', 'Bake at 350°F for 30 minutes'], }, ), + 'What is 123 / 456?': ToolCallPart( + tool_name='divide', + args={'numerator': '123', 'denominator': '456'}, + tool_call_id='pyd_ai_2e0e396768a14fe482df90a29a78dc7b', + ), } tool_responses: dict[tuple[str, str], str] = { diff --git a/tests/test_low_level.py b/tests/test_low_level.py new file mode 100644 index 000000000..be4accfb1 --- /dev/null +++ b/tests/test_low_level.py @@ -0,0 +1,115 @@ +from contextlib import contextmanager +from datetime import timezone + +import pytest +from inline_snapshot import snapshot + +from pydantic_ai import Agent +from pydantic_ai.low_level import ( + _prepare_model, # pyright: ignore[reportPrivateUsage] + model_request, + model_request_stream, + model_request_sync, +) +from pydantic_ai.messages import ( + ModelRequest, + ModelResponse, + PartDeltaEvent, + PartStartEvent, + TextPart, + TextPartDelta, + ToolCallPart, +) +from pydantic_ai.models import ModelRequestParameters +from pydantic_ai.models.instrumented import InstrumentedModel +from pydantic_ai.models.test import TestModel +from pydantic_ai.tools import ToolDefinition +from pydantic_ai.usage import Usage + +from .conftest import IsNow, IsStr + +pytestmark = pytest.mark.anyio + + +async def test_model_request(): + model_response, request_usage = await model_request('test', [ModelRequest.user_text_prompt('x')]) + assert model_response == snapshot( + ModelResponse( + parts=[TextPart(content='success (no tool calls)')], + model_name='test', + timestamp=IsNow(tz=timezone.utc), + ) + ) + assert request_usage == snapshot(Usage(request_tokens=51, response_tokens=4, total_tokens=55)) + + +async def test_model_request_tool_call(): + model_response, request_usage = await model_request( + 'test', + [ModelRequest.user_text_prompt('x')], + model_request_parameters=ModelRequestParameters( + function_tools=[ToolDefinition(name='tool_name', description='', parameters_json_schema={})], + allow_text_output=False, + ), + ) + assert model_response == snapshot( + ModelResponse( + parts=[ToolCallPart(tool_name='tool_name', args='a', tool_call_id=IsStr(regex='pyd_ai_.*'))], + model_name='test', + timestamp=IsNow(tz=timezone.utc), + ) + ) + assert request_usage == snapshot(Usage(request_tokens=51, response_tokens=2, total_tokens=53)) + + +def test_model_request_sync(): + model_response, request_usage = model_request_sync('test', [ModelRequest.user_text_prompt('x')]) + assert model_response == snapshot( + ModelResponse( + parts=[TextPart(content='success (no tool calls)')], + model_name='test', + timestamp=IsNow(tz=timezone.utc), + ) + ) + assert request_usage == snapshot(Usage(request_tokens=51, response_tokens=4, total_tokens=55)) + + +async def test_model_request_stream(): + async with model_request_stream('test', [ModelRequest.user_text_prompt('x')]) as stream: + chunks = [chunk async for chunk in stream] + assert chunks == snapshot( + [ + PartStartEvent(index=0, part=TextPart(content='')), + PartDeltaEvent(index=0, delta=TextPartDelta(content_delta='success ')), + PartDeltaEvent(index=0, delta=TextPartDelta(content_delta='(no ')), + PartDeltaEvent(index=0, delta=TextPartDelta(content_delta='tool ')), + PartDeltaEvent(index=0, delta=TextPartDelta(content_delta='calls)')), + ] + ) + + +@contextmanager +def set_instrument_default(value: bool): + """Context manager to temporarily set the default instrumentation value.""" + initial_value = Agent._instrument_default # pyright: ignore[reportPrivateUsage] + try: + Agent._instrument_default = value # pyright: ignore[reportPrivateUsage] + yield + finally: + Agent._instrument_default = initial_value # pyright: ignore[reportPrivateUsage] + + +def test_prepare_model(): + with set_instrument_default(False): + model = _prepare_model('test', None) + assert isinstance(model, TestModel) + + model = _prepare_model('test', True) + assert isinstance(model, InstrumentedModel) + + with set_instrument_default(True): + model = _prepare_model('test', None) + assert isinstance(model, InstrumentedModel) + + model = _prepare_model('test', False) + assert isinstance(model, TestModel)