Skip to content

feat: Streamable HTTP support #643

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions examples/mcp/streamablehttp_example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# MCP Streamable HTTP Example

This example uses a local Streamable HTTP server in [server.py](server.py).

Run the example via:

```
uv run python examples/mcp/streamablehttp_example/main.py
```

## Details

The example uses the `MCPServerStreamableHttp` class from `agents.mcp`. The server runs in a sub-process at `https://localhost:8000/mcp`.
83 changes: 83 additions & 0 deletions examples/mcp/streamablehttp_example/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import asyncio
import os
import shutil
import subprocess
import time
from typing import Any

from agents import Agent, Runner, gen_trace_id, trace
from agents.mcp import MCPServer, MCPServerStreamableHttp
from agents.model_settings import ModelSettings


async def run(mcp_server: MCPServer):
agent = Agent(
name="Assistant",
instructions="Use the tools to answer the questions.",
mcp_servers=[mcp_server],
model_settings=ModelSettings(tool_choice="required"),
)

# Use the `add` tool to add two numbers
message = "Add these numbers: 7 and 22."
print(f"Running: {message}")
result = await Runner.run(starting_agent=agent, input=message)
print(result.final_output)

# Run the `get_weather` tool
message = "What's the weather in Tokyo?"
print(f"\n\nRunning: {message}")
result = await Runner.run(starting_agent=agent, input=message)
print(result.final_output)

# Run the `get_secret_word` tool
message = "What's the secret word?"
print(f"\n\nRunning: {message}")
result = await Runner.run(starting_agent=agent, input=message)
print(result.final_output)


async def main():
async with MCPServerStreamableHttp(
name="Streamable HTTP Python Server",
params={
"url": "http://localhost:8000/mcp",
},
) as server:
trace_id = gen_trace_id()
with trace(workflow_name="Streamable HTTP Example", trace_id=trace_id):
print(f"View trace: https://platform.openai.com/traces/trace?trace_id={trace_id}\n")
await run(server)


if __name__ == "__main__":
# Let's make sure the user has uv installed
if not shutil.which("uv"):
raise RuntimeError(
"uv is not installed. Please install it: https://docs.astral.sh/uv/getting-started/installation/"
)

# We'll run the Streamable HTTP server in a subprocess. Usually this would be a remote server, but for this
# demo, we'll run it locally at http://localhost:8000/mcp
process: subprocess.Popen[Any] | None = None
try:
this_dir = os.path.dirname(os.path.abspath(__file__))
server_file = os.path.join(this_dir, "server.py")

print("Starting Streamable HTTP server at http://localhost:8000/mcp ...")

# Run `uv run server.py` to start the Streamable HTTP server
process = subprocess.Popen(["uv", "run", server_file])
# Give it 3 seconds to start
time.sleep(3)

print("Streamable HTTP server started. Running example...\n\n")
except Exception as e:
print(f"Error starting Streamable HTTP server: {e}")
exit(1)

try:
asyncio.run(main())
finally:
if process:
process.terminate()
33 changes: 33 additions & 0 deletions examples/mcp/streamablehttp_example/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import random

import requests
from mcp.server.fastmcp import FastMCP

# Create server
mcp = FastMCP("Echo Server")


@mcp.tool()
def add(a: int, b: int) -> int:
"""Add two numbers"""
print(f"[debug-server] add({a}, {b})")
return a + b


@mcp.tool()
def get_secret_word() -> str:
print("[debug-server] get_secret_word()")
return random.choice(["apple", "banana", "cherry"])


@mcp.tool()
def get_current_weather(city: str) -> str:
print(f"[debug-server] get_current_weather({city})")

endpoint = "https://wttr.in"
response = requests.get(f"{endpoint}/{city}")
return response.text


if __name__ == "__main__":
mcp.run(transport="streamable-http")
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ dependencies = [
"typing-extensions>=4.12.2, <5",
"requests>=2.0, <3",
"types-requests>=2.0, <3",
"mcp>=1.6.0, <2; python_version >= '3.10'",
"mcp>=1.8.0, <2; python_version >= '3.10'",
]
classifiers = [
"Typing :: Typed",
Expand Down
4 changes: 4 additions & 0 deletions src/agents/mcp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
MCPServerSseParams,
MCPServerStdio,
MCPServerStdioParams,
MCPServerStreamableHttp,
MCPServerStreamableHttpParams
)
except ImportError:
pass
Expand All @@ -17,5 +19,7 @@
"MCPServerSseParams",
"MCPServerStdio",
"MCPServerStdioParams",
"MCPServerStreamableHttp",
"MCPServerStreamableHttpParams",
"MCPUtil",
]
89 changes: 88 additions & 1 deletion src/agents/mcp/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
from mcp import ClientSession, StdioServerParameters, Tool as MCPTool, stdio_client
from mcp.client.sse import sse_client
from mcp.client.streamable_http import streamablehttp_client
from mcp.types import CallToolResult, JSONRPCMessage
from typing_extensions import NotRequired, TypedDict

Expand Down Expand Up @@ -105,7 +106,14 @@ async def connect(self):
"""Connect to the server."""
try:
transport = await self.exit_stack.enter_async_context(self.create_streams())
read, write = transport
# Handle different transport return values
if len(transport) == 3:
# streamablehttp_client returns (read, write, get_session_id)
read, write, _ = transport
else:
# sse_client returns (read, write)
read, write = transport

session = await self.exit_stack.enter_async_context(
ClientSession(
read,
Expand Down Expand Up @@ -318,3 +326,82 @@ def create_streams(
def name(self) -> str:
"""A readable name for the server."""
return self._name


class MCPServerStreamableHttpParams(TypedDict):
"""Mirrors the params in`mcp.client.streamable_http.streamablehttp_client`."""

url: str
"""The URL of the server."""

headers: NotRequired[dict[str, str]]
"""The headers to send to the server."""

timeout: NotRequired[timedelta]
"""The timeout for the HTTP request. Defaults to 5 seconds."""

sse_read_timeout: NotRequired[timedelta]
"""The timeout for the SSE connection, in seconds. Defaults to 5 minutes."""

terminate_on_close: NotRequired[bool]
"""Terminate on close"""


class MCPServerStreamableHttp(_MCPServerWithClientSession):
"""MCP server implementation that uses the Streamable HTTP transport. See the [spec]
(https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http)
for details.
"""

def __init__(
self,
params: MCPServerStreamableHttpParams,
cache_tools_list: bool = False,
name: str | None = None,
client_session_timeout_seconds: float | None = 5,
):
"""Create a new MCP server based on the Streamable HTTP transport.

Args:
params: The params that configure the server. This includes the URL of the server,
the headers to send to the server, the timeout for the HTTP request, and the
timeout for the Streamable HTTP connection and whether we need to terminate on close.

cache_tools_list: Whether to cache the tools list. If `True`, the tools list will be
cached and only fetched from the server once. If `False`, the tools list will be
fetched from the server on each call to `list_tools()`. The cache can be
invalidated by calling `invalidate_tools_cache()`. You should set this to `True`
if you know the server will not change its tools list, because it can drastically
improve latency (by avoiding a round-trip to the server every time).

name: A readable name for the server. If not provided, we'll create one from the
URL.

client_session_timeout_seconds: the read timeout passed to the MCP ClientSession.
"""
super().__init__(cache_tools_list, client_session_timeout_seconds)

self.params = params
self._name = name or f"streamable_http: {self.params['url']}"

def create_streams(
self,
) -> AbstractAsyncContextManager[
tuple[
MemoryObjectReceiveStream[JSONRPCMessage | Exception],
MemoryObjectSendStream[JSONRPCMessage],
]
]:
"""Create the streams for the server."""
return streamablehttp_client(
url=self.params["url"],
headers=self.params.get("headers", None),
timeout=self.params.get("timeout", timedelta(seconds=30)),
sse_read_timeout=self.params.get("sse_read_timeout", timedelta(seconds=60 * 5)),
terminate_on_close=self.params.get("terminate_on_close", True)
)

@property
def name(self) -> str:
"""A readable name for the server."""
return self._name