Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Anthropic Claude is not working in heterogenous mode with run_stream #860

Closed
WorldInnovationsDepartment opened this issue Feb 6, 2025 · 2 comments

Comments

@WorldInnovationsDepartment
Copy link

WorldInnovationsDepartment commented Feb 6, 2025

I was working on testing Agent with different models and I have noticed that v0.0.22 Agent does not process tool outputs with claude3.5 sonnet if streaming is turned on. The same code works just fine if streaming is turned off.
I can add that tool calls works with streaming for GPT4o, o3, Gemini2.0 Pro Experimental

example:

"""
Example configuration for a search-enabled AI agent using pydantic_ai.

This module demonstrates how to set up an agent with multiple model backends
and custom search functionality integration.
"""

from __future__ import annotations

import os
from dataclasses import dataclass
from typing import List

from dotenv import load_dotenv
from pydantic_ai import Agent, RunContext
from pydantic_ai.models.openai import OpenAIModel
from pydantic_ai.models.gemini import GeminiModel
from pydantic_ai.models.anthropic import AnthropicModel


# Load environment variables
load_dotenv()

# Initialize model settings
llm = os.getenv('LLM_MODEL', 'gpt-4')

model = AnthropicModel(llm)

# Initialize agent with configuration
agent = Agent(
    model,
    name='Search Agent',
    system_prompt='You are a helpful search assistant.',
    end_strategy='exhaustive',
    deps_type=SearchDeps,
    tools=[
        search_meetings,
        search,
    ],
    retries=2
)

main.py:

"""
Console interface example for pydantic_ai agents.
Demonstrates how to create an interactive CLI chat interface with streaming support.
"""

from __future__ import annotations

from typing import List
import asyncio
from dotenv import load_dotenv

from pydantic_ai.messages import (
    ModelMessage,
    ModelRequest,
    ModelResponse,
    TextPart,
    UserPromptPart,
)
from pydantic_ai.models import ModelSettings

from example_agent_setup import agent, SearchTools, SearchDeps


# Load environment variables
load_dotenv()


class Console:
    """Interactive console interface for chatting with an AI agent."""

    def __init__(self, streaming: bool = True):
        """Initialize the console interface.
        
        Args:
            streaming: Whether to use streaming mode for responses
        """
        self.messages: List[ModelMessage] = []
        self.streaming = streaming

    async def chat(self):
        """Run the interactive chat session."""
        print('AI Agent Console (type "exit" to exit)')
        print('Enter your message:')
        
        # Initialize search tools and model settings
        search_tools = SearchTools(
            search_clients={},  # Configure with your search clients
            user_id='example-user'
        )
        model_settings = ModelSettings(
            temperature=0.3,
            top_p=0.7,
        )

        try:
            while True:
                user_input = input('> ').strip()
                if user_input.lower() == 'exit':
                    break

                # Store the user message
                self.messages.append(
                    ModelRequest(parts=[UserPromptPart(content=user_input)])
                )

                if self.streaming:
                    await self._handle_streaming_response(
                        user_input, search_tools, model_settings
                    )
                else:
                    await self._handle_normal_response(
                        user_input, search_tools, model_settings
                    )

        except Exception as e:
            print(f'Error during chat: {str(e)}')

    async def _handle_streaming_response(
        self, user_input: str, search_tools: SearchTools, model_settings: ModelSettings
    ):
        """Handle streaming response from the agent.
        
        Args:
            user_input: User's input text
            search_tools: Initialized search tools
            model_settings: Model configuration settings
        """
        async with agent.run_stream(
            user_input,
            message_history=self.messages,
            deps=SearchDeps(tools=search_tools),
            model_settings=model_settings
        ) as result:
            response_content = ''
            async for chunk in result.stream_text(delta=True):
                response_content += chunk
                print(chunk, end='', flush=True)
            print()  # New line after response
            
            # Store intermediary messages (e.g., tool calls and responses)
            filtered_messages = [
                msg for msg in result.new_messages()
                if not (hasattr(msg, 'parts') and
                       any(part.part_kind in ('user-prompt', 'text')
                           for part in msg.parts))
            ]
            self.messages.extend(filtered_messages)

            # Store the final response
            self.messages.append(
                ModelResponse(parts=[TextPart(content=response_content)])
            )
            
            print(result.usage())

    async def _handle_normal_response(
        self, user_input: str, search_tools: SearchTools, model_settings: ModelSettings
    ):
        """Handle non-streaming response from the agent.
        
        Args:
            user_input: User's input text
            search_tools: Initialized search tools
            model_settings: Model configuration settings
        """
        result = await agent.run(
            user_input,
            message_history=self.messages,
            deps=SearchDeps(tools=search_tools),
            model_settings=model_settings
        )
        print(result.data)

        # Store intermediary messages
        filtered_messages = [
            msg for msg in result.new_messages()
            if not (hasattr(msg, 'parts') and
                   any(part.part_kind in ('user-prompt', 'text')
                       for part in msg.parts))
        ]
        self.messages.extend(filtered_messages)

        # Store the final response
        self.messages.append(
            ModelResponse(parts=[TextPart(content=result.data)])
        )
        
        print(result.usage())


async def main():
    """Main entry point for the Console application."""
    await Console(streaming=True).chat()


if __name__ == '__main__':
    asyncio.run(main())

As you can see I am running query that always triggers search tool but after text message there is no processing for tool_output:
Image

@dmontagu
Copy link
Contributor

dmontagu commented Feb 12, 2025

Hoping to make some massive improvements to streaming in the near future in #833, I think that will help resolve this misbehavior. Once #833 is merged (which admittedly may take a few more work days, it's somewhat involved), I think if this isn't immediately solved (or solved by adapting to the new streaming APIs) I think it will be a high priority for us to fix this issue.

(Ultimately the misbehavior today comes from allowing str as a final output, and wanting to have tool calls. #833 changes the streaming API so that you can iterate over all model requests/responses, even if they aren't "final", so you can get the full stream of response data from the model, and it should make it possible to implement this functionality the way I can see you are trying to.)

@Kludex
Copy link
Member

Kludex commented Feb 28, 2025

@dmontagu 's PRs landed on the last release.

Please check the new Agent().iter() API here: https://ai.pydantic.dev/agents/#iterating-over-an-agents-graph.

Let us if that solves your issue. 🙏

@Kludex Kludex closed this as completed Feb 28, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants