From 3214e7bf9d4bdee4c7d58d07e6bf5f9eb248ebd5 Mon Sep 17 00:00:00 2001 From: aagarwal25 Date: Sat, 3 May 2025 02:05:10 +0530 Subject: [PATCH 1/6] feat: Streamable HTTP support --- pyproject.toml | 2 +- src/agents/mcp/__init__.py | 4 ++ src/agents/mcp/server.py | 80 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 85 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 22b028ae..e3adde75 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,7 +13,7 @@ dependencies = [ "typing-extensions>=4.12.2, <5", "requests>=2.0, <3", "types-requests>=2.0, <3", - "mcp>=1.6.0, <2; python_version >= '3.10'", + "mcp>=1.7.1, <2; python_version >= '3.10'", ] classifiers = [ "Typing :: Typed", diff --git a/src/agents/mcp/__init__.py b/src/agents/mcp/__init__.py index 1a72a89f..90da9e5d 100644 --- a/src/agents/mcp/__init__.py +++ b/src/agents/mcp/__init__.py @@ -5,6 +5,8 @@ MCPServerSseParams, MCPServerStdio, MCPServerStdioParams, + MCPServerStreamableHttp, + MCPServerStreamableHttpParams ) except ImportError: pass @@ -17,5 +19,7 @@ "MCPServerSseParams", "MCPServerStdio", "MCPServerStdioParams", + "MCPServerStreamableHttp", + "MCPServerStreamableHttpParams", "MCPUtil", ] diff --git a/src/agents/mcp/server.py b/src/agents/mcp/server.py index 9916c92b..2474bc84 100644 --- a/src/agents/mcp/server.py +++ b/src/agents/mcp/server.py @@ -10,6 +10,7 @@ from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream from mcp import ClientSession, StdioServerParameters, Tool as MCPTool, stdio_client from mcp.client.sse import sse_client +from mcp.client.streamable_http import streamablehttp_client from mcp.types import CallToolResult, JSONRPCMessage from typing_extensions import NotRequired, TypedDict @@ -318,3 +319,82 @@ def create_streams( def name(self) -> str: """A readable name for the server.""" return self._name + + +class MCPServerStreamableHttpParams(TypedDict): + """Mirrors the params in`mcp.client.streamable_http.streamablehttp_client`.""" + + url: str + """The URL of the server.""" + + headers: NotRequired[dict[str, str]] + """The headers to send to the server.""" + + timeout: NotRequired[float] + """The timeout for the HTTP request. Defaults to 5 seconds.""" + + sse_read_timeout: NotRequired[float] + """The timeout for the SSE connection, in seconds. Defaults to 5 minutes.""" + + terminate_on_close: bool + """Terminate on close""" + + +class MCPServerStreamableHttp(_MCPServerWithClientSession): + """MCP server implementation that uses the Streamable HTTP transport. See the [spec] + (https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http) + for details. + """ + + def __init__( + self, + params: MCPServerStreamableHttpParams, + cache_tools_list: bool = False, + name: str | None = None, + client_session_timeout_seconds: float | None = 5, + ): + """Create a new MCP server based on the Streamable HTTP transport. + + Args: + params: The params that configure the server. This includes the URL of the server, + the headers to send to the server, the timeout for the HTTP request, and the + timeout for the Streamable HTTP connection and whether we need to terminate on close. + + cache_tools_list: Whether to cache the tools list. If `True`, the tools list will be + cached and only fetched from the server once. If `False`, the tools list will be + fetched from the server on each call to `list_tools()`. The cache can be + invalidated by calling `invalidate_tools_cache()`. You should set this to `True` + if you know the server will not change its tools list, because it can drastically + improve latency (by avoiding a round-trip to the server every time). + + name: A readable name for the server. If not provided, we'll create one from the + URL. + + client_session_timeout_seconds: the read timeout passed to the MCP ClientSession. + """ + super().__init__(cache_tools_list, client_session_timeout_seconds) + + self.params = params + self._name = name or f"sse: {self.params['url']}" + + def create_streams( + self, + ) -> AbstractAsyncContextManager[ + tuple[ + MemoryObjectReceiveStream[JSONRPCMessage | Exception], + MemoryObjectSendStream[JSONRPCMessage], + ] + ]: + """Create the streams for the server.""" + return streamablehttp_client( + url=self.params["url"], + headers=self.params.get("headers", None), + timeout=self.params.get("timeout", 5), + sse_read_timeout=self.params.get("sse_read_timeout", 60 * 5), + terminate_on_close=self.params.get("terminate_on_close", True) + ) + + @property + def name(self) -> str: + """A readable name for the server.""" + return self._name From 9aa32f507b629c54ca005a21cdcbb109cfa66caf Mon Sep 17 00:00:00 2001 From: aagarwal25 Date: Sat, 3 May 2025 02:07:35 +0530 Subject: [PATCH 2/6] feat: Streamable HTTP support --- src/agents/mcp/server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/agents/mcp/server.py b/src/agents/mcp/server.py index 2474bc84..b549ee86 100644 --- a/src/agents/mcp/server.py +++ b/src/agents/mcp/server.py @@ -336,7 +336,7 @@ class MCPServerStreamableHttpParams(TypedDict): sse_read_timeout: NotRequired[float] """The timeout for the SSE connection, in seconds. Defaults to 5 minutes.""" - terminate_on_close: bool + terminate_on_close: NotRequired[bool] """Terminate on close""" From e7eb306bc012e248643c2cdf25a21cac23c72bf8 Mon Sep 17 00:00:00 2001 From: aagarwal25 Date: Sat, 3 May 2025 02:08:50 +0530 Subject: [PATCH 3/6] feat: Streamable HTTP support --- src/agents/mcp/server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/agents/mcp/server.py b/src/agents/mcp/server.py index b549ee86..ebd4a626 100644 --- a/src/agents/mcp/server.py +++ b/src/agents/mcp/server.py @@ -375,7 +375,7 @@ def __init__( super().__init__(cache_tools_list, client_session_timeout_seconds) self.params = params - self._name = name or f"sse: {self.params['url']}" + self._name = name or f"streamable_http: {self.params['url']}" def create_streams( self, From e50bd99f98acf557c68934c9b018eeeca6d700e7 Mon Sep 17 00:00:00 2001 From: aagarwal25 Date: Fri, 9 May 2025 10:23:31 +0530 Subject: [PATCH 4/6] feat: Streamable HTTP support --- pyproject.toml | 2 +- src/agents/mcp/server.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index e3adde75..87a707d3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,7 +13,7 @@ dependencies = [ "typing-extensions>=4.12.2, <5", "requests>=2.0, <3", "types-requests>=2.0, <3", - "mcp>=1.7.1, <2; python_version >= '3.10'", + "mcp>=1.8.0, <2; python_version >= '3.10'", ] classifiers = [ "Typing :: Typed", diff --git a/src/agents/mcp/server.py b/src/agents/mcp/server.py index ebd4a626..50c4698a 100644 --- a/src/agents/mcp/server.py +++ b/src/agents/mcp/server.py @@ -389,7 +389,7 @@ def create_streams( return streamablehttp_client( url=self.params["url"], headers=self.params.get("headers", None), - timeout=self.params.get("timeout", 5), + timeout=self.params.get("timeout", 30), sse_read_timeout=self.params.get("sse_read_timeout", 60 * 5), terminate_on_close=self.params.get("terminate_on_close", True) ) From 8794eb8ff3e6789fd4ac88d62f34dabe0162031f Mon Sep 17 00:00:00 2001 From: aagarwal25 Date: Fri, 9 May 2025 19:10:02 +0530 Subject: [PATCH 5/6] feat: Streamable HTTP support --- src/agents/mcp/server.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/src/agents/mcp/server.py b/src/agents/mcp/server.py index 50c4698a..740169b1 100644 --- a/src/agents/mcp/server.py +++ b/src/agents/mcp/server.py @@ -106,7 +106,14 @@ async def connect(self): """Connect to the server.""" try: transport = await self.exit_stack.enter_async_context(self.create_streams()) - read, write = transport + # Handle different transport return values + if len(transport) == 3: + # streamablehttp_client returns (read, write, get_session_id) + read, write, _ = transport + else: + # sse_client returns (read, write) + read, write = transport + session = await self.exit_stack.enter_async_context( ClientSession( read, @@ -330,10 +337,10 @@ class MCPServerStreamableHttpParams(TypedDict): headers: NotRequired[dict[str, str]] """The headers to send to the server.""" - timeout: NotRequired[float] + timeout: NotRequired[timedelta] """The timeout for the HTTP request. Defaults to 5 seconds.""" - sse_read_timeout: NotRequired[float] + sse_read_timeout: NotRequired[timedelta] """The timeout for the SSE connection, in seconds. Defaults to 5 minutes.""" terminate_on_close: NotRequired[bool] @@ -389,8 +396,8 @@ def create_streams( return streamablehttp_client( url=self.params["url"], headers=self.params.get("headers", None), - timeout=self.params.get("timeout", 30), - sse_read_timeout=self.params.get("sse_read_timeout", 60 * 5), + timeout=self.params.get("timeout", timedelta(seconds=30)), + sse_read_timeout=self.params.get("sse_read_timeout", timedelta(seconds=60 * 5)), terminate_on_close=self.params.get("terminate_on_close", True) ) From ed5879223fd030d5223559497d05a7702051a234 Mon Sep 17 00:00:00 2001 From: aagarwal25 Date: Fri, 9 May 2025 19:10:14 +0530 Subject: [PATCH 6/6] feat: adding Streamable HTTP example --- examples/mcp/streamablehttp_example/README.md | 13 +++ examples/mcp/streamablehttp_example/main.py | 83 +++++++++++++++++++ examples/mcp/streamablehttp_example/server.py | 33 ++++++++ 3 files changed, 129 insertions(+) create mode 100644 examples/mcp/streamablehttp_example/README.md create mode 100644 examples/mcp/streamablehttp_example/main.py create mode 100644 examples/mcp/streamablehttp_example/server.py diff --git a/examples/mcp/streamablehttp_example/README.md b/examples/mcp/streamablehttp_example/README.md new file mode 100644 index 00000000..a07fe19b --- /dev/null +++ b/examples/mcp/streamablehttp_example/README.md @@ -0,0 +1,13 @@ +# MCP Streamable HTTP Example + +This example uses a local Streamable HTTP server in [server.py](server.py). + +Run the example via: + +``` +uv run python examples/mcp/streamablehttp_example/main.py +``` + +## Details + +The example uses the `MCPServerStreamableHttp` class from `agents.mcp`. The server runs in a sub-process at `https://localhost:8000/mcp`. diff --git a/examples/mcp/streamablehttp_example/main.py b/examples/mcp/streamablehttp_example/main.py new file mode 100644 index 00000000..cc95e798 --- /dev/null +++ b/examples/mcp/streamablehttp_example/main.py @@ -0,0 +1,83 @@ +import asyncio +import os +import shutil +import subprocess +import time +from typing import Any + +from agents import Agent, Runner, gen_trace_id, trace +from agents.mcp import MCPServer, MCPServerStreamableHttp +from agents.model_settings import ModelSettings + + +async def run(mcp_server: MCPServer): + agent = Agent( + name="Assistant", + instructions="Use the tools to answer the questions.", + mcp_servers=[mcp_server], + model_settings=ModelSettings(tool_choice="required"), + ) + + # Use the `add` tool to add two numbers + message = "Add these numbers: 7 and 22." + print(f"Running: {message}") + result = await Runner.run(starting_agent=agent, input=message) + print(result.final_output) + + # Run the `get_weather` tool + message = "What's the weather in Tokyo?" + print(f"\n\nRunning: {message}") + result = await Runner.run(starting_agent=agent, input=message) + print(result.final_output) + + # Run the `get_secret_word` tool + message = "What's the secret word?" + print(f"\n\nRunning: {message}") + result = await Runner.run(starting_agent=agent, input=message) + print(result.final_output) + + +async def main(): + async with MCPServerStreamableHttp( + name="Streamable HTTP Python Server", + params={ + "url": "http://localhost:8000/mcp", + }, + ) as server: + trace_id = gen_trace_id() + with trace(workflow_name="Streamable HTTP Example", trace_id=trace_id): + print(f"View trace: https://platform.openai.com/traces/trace?trace_id={trace_id}\n") + await run(server) + + +if __name__ == "__main__": + # Let's make sure the user has uv installed + if not shutil.which("uv"): + raise RuntimeError( + "uv is not installed. Please install it: https://docs.astral.sh/uv/getting-started/installation/" + ) + + # We'll run the Streamable HTTP server in a subprocess. Usually this would be a remote server, but for this + # demo, we'll run it locally at http://localhost:8000/mcp + process: subprocess.Popen[Any] | None = None + try: + this_dir = os.path.dirname(os.path.abspath(__file__)) + server_file = os.path.join(this_dir, "server.py") + + print("Starting Streamable HTTP server at http://localhost:8000/mcp ...") + + # Run `uv run server.py` to start the Streamable HTTP server + process = subprocess.Popen(["uv", "run", server_file]) + # Give it 3 seconds to start + time.sleep(3) + + print("Streamable HTTP server started. Running example...\n\n") + except Exception as e: + print(f"Error starting Streamable HTTP server: {e}") + exit(1) + + try: + asyncio.run(main()) + finally: + if process: + process.terminate() diff --git a/examples/mcp/streamablehttp_example/server.py b/examples/mcp/streamablehttp_example/server.py new file mode 100644 index 00000000..d8f83965 --- /dev/null +++ b/examples/mcp/streamablehttp_example/server.py @@ -0,0 +1,33 @@ +import random + +import requests +from mcp.server.fastmcp import FastMCP + +# Create server +mcp = FastMCP("Echo Server") + + +@mcp.tool() +def add(a: int, b: int) -> int: + """Add two numbers""" + print(f"[debug-server] add({a}, {b})") + return a + b + + +@mcp.tool() +def get_secret_word() -> str: + print("[debug-server] get_secret_word()") + return random.choice(["apple", "banana", "cherry"]) + + +@mcp.tool() +def get_current_weather(city: str) -> str: + print(f"[debug-server] get_current_weather({city})") + + endpoint = "https://wttr.in" + response = requests.get(f"{endpoint}/{city}") + return response.text + + +if __name__ == "__main__": + mcp.run(transport="streamable-http")