Skip to content

Commit f36666d

Browse files
committed
feat: add PoC of stateless HTTP transport
This is an implementation of a stateless transport over HTTP for MCP. It's not part of the official spec, but something similar will probably be added in the future. Functionally, it just presents a single HTTP endpoint (conventionally at `/mcp`) that accepts JSONRPC messages as POST bodies, and returns JSONRPC messages as responses. Each inbound message is validated and gets its own 'server' instance. The handler then takes care of initializing the server (which in a stateless protocol is the responsibility of the client), then forwards the inbound message to the server, and returns the server's response. We could easily add middleware for things like authentication, similar to in #21. This also comes with a 'client' implementation which is similar to the stdio and sse clients in the official SDK and returns a pair of read/write streams for use with a `ClientSession`. This is useful for testing or writing example MCP clients. The idea is that this transport will be compatible with the [HTTP transport in the unofficial Go SDK][go-sdk], so we can start building examples of usage. Notably this client also supports custom headers, which will be useful for authentication. [go-sdk]: https://github.com/metoro-io/mcp-golang/blob/main/examples/http_example/README.md
1 parent 23ab161 commit f36666d

File tree

6 files changed

+402
-9
lines changed

6 files changed

+402
-9
lines changed

pyproject.toml

+4
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,7 @@ build-backend = "hatchling.build"
3434

3535
[tool.pytest.ini_options]
3636
asyncio_default_fixture_loop_scope = "session"
37+
38+
[tool.ruff]
39+
line-length = 88
40+
target-version = "py310"

src/mcp_grafana/__init__.py

+67-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,73 @@
1+
import enum
2+
from typing import Literal
3+
4+
import anyio
5+
import uvicorn
16
from mcp.server import FastMCP
7+
from starlette.requests import Request
28

39
from .tools import add_tools
410

11+
12+
class Transport(enum.StrEnum):
13+
http = "http"
14+
stdio = "stdio"
15+
sse = "sse"
16+
17+
18+
class GrafanaMCP(FastMCP):
19+
async def run_http_async(self) -> None:
20+
from starlette.applications import Starlette
21+
from starlette.routing import Route
22+
23+
from .transports.http import handle_message
24+
25+
async def handle_http(request: Request):
26+
async with handle_message(
27+
request.scope, request.receive, request._send
28+
) as (
29+
read_stream,
30+
write_stream,
31+
):
32+
await self._mcp_server.run(
33+
read_stream,
34+
write_stream,
35+
self._mcp_server.create_initialization_options(),
36+
)
37+
38+
starlette_app = Starlette(
39+
debug=self.settings.debug,
40+
routes=[
41+
Route("/mcp", endpoint=handle_http, methods=["POST"]),
42+
],
43+
)
44+
45+
config = uvicorn.Config(
46+
starlette_app,
47+
host=self.settings.host,
48+
port=self.settings.port,
49+
log_level=self.settings.log_level.lower(),
50+
)
51+
server = uvicorn.Server(config)
52+
await server.serve()
53+
54+
def run(self, transport: Literal["http", "stdio", "sse"] = "stdio") -> None:
55+
"""Run the FastMCP server. Note this is a synchronous function.
56+
57+
Args:
58+
transport: Transport protocol to use ("stdio" or "sse")
59+
"""
60+
if transport not in Transport:
61+
raise ValueError(f"Unknown transport: {transport}")
62+
63+
if transport == "stdio":
64+
anyio.run(self.run_stdio_async)
65+
elif transport == "sse":
66+
anyio.run(self.run_sse_async)
67+
else:
68+
anyio.run(self.run_http_async)
69+
70+
571
# Create an MCP server
6-
mcp = FastMCP("Grafana", log_level="DEBUG")
72+
mcp = GrafanaMCP("Grafana", log_level="DEBUG")
773
add_tools(mcp)

src/mcp_grafana/cli.py

+1-8
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,10 @@
1-
import enum
2-
31
import typer
42

5-
from . import mcp
3+
from . import mcp, Transport
64

75
app = typer.Typer()
86

97

10-
class Transport(enum.StrEnum):
11-
stdio = "stdio"
12-
sse = "sse"
13-
14-
158
@app.command()
169
def run(transport: Transport = Transport.stdio):
1710
mcp.run(transport.value)

src/mcp_grafana/transports/http.py

+182
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
import logging
2+
from contextlib import asynccontextmanager
3+
from json import JSONDecodeError
4+
from typing import Any, Tuple
5+
6+
import anyio
7+
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
8+
import httpx
9+
from mcp import types
10+
from pydantic import ValidationError
11+
from starlette.requests import Request
12+
from starlette.responses import JSONResponse, Response
13+
from starlette.types import Receive, Scope, Send
14+
15+
16+
logger = logging.getLogger(__name__)
17+
18+
19+
ReadStream = MemoryObjectReceiveStream[types.JSONRPCMessage | Exception]
20+
ReadStreamWriter = MemoryObjectSendStream[types.JSONRPCMessage | Exception]
21+
WriteStream = MemoryObjectSendStream[types.JSONRPCMessage]
22+
WriteStreamReader = MemoryObjectReceiveStream[types.JSONRPCMessage]
23+
24+
25+
def make_streams() -> Tuple[
26+
ReadStream, ReadStreamWriter, WriteStream, WriteStreamReader
27+
]:
28+
read_stream: MemoryObjectReceiveStream[types.JSONRPCMessage | Exception]
29+
read_stream_writer: MemoryObjectSendStream[types.JSONRPCMessage | Exception]
30+
31+
write_stream: MemoryObjectSendStream[types.JSONRPCMessage]
32+
write_stream_reader: MemoryObjectReceiveStream[types.JSONRPCMessage]
33+
34+
read_stream_writer, read_stream = anyio.create_memory_object_stream(0)
35+
write_stream, write_stream_reader = anyio.create_memory_object_stream(0)
36+
return read_stream, read_stream_writer, write_stream, write_stream_reader
37+
38+
39+
async def initialize(
40+
read_stream_writer: ReadStreamWriter,
41+
write_stream_reader: WriteStreamReader,
42+
):
43+
"""
44+
Initialize the MCP server for this request.
45+
46+
In a stateful transport (e.g. stdio or sse) the client would
47+
send an initialize request to the server, and the server would send
48+
an 'initialized' response back to the client.
49+
50+
In the HTTP transport we're trying to be stateless, so we'll have to
51+
handle the initialization ourselves.
52+
53+
This function handles that initialization by sending the required
54+
messages to the server and consuming the response.
55+
"""
56+
# First construct the initialize request.
57+
initialize_request = types.InitializeRequest(
58+
method="initialize",
59+
params=types.InitializeRequestParams(
60+
protocolVersion=types.LATEST_PROTOCOL_VERSION,
61+
capabilities=types.ClientCapabilities(
62+
experimental=None,
63+
roots=None,
64+
sampling=None,
65+
),
66+
# TODO: get the name and version from the package metadata.
67+
clientInfo=types.Implementation(name="mcp-grafana", version="0.1.2"),
68+
),
69+
)
70+
initialize_request = types.JSONRPCRequest(
71+
jsonrpc="2.0",
72+
id=1,
73+
**initialize_request.model_dump(by_alias=True, mode="json"),
74+
)
75+
# Send it to the server.
76+
await read_stream_writer.send(types.JSONRPCMessage(initialize_request))
77+
# We can ignore the response since we're not sending it back to the client.
78+
await write_stream_reader.receive()
79+
80+
# Next we need to notify the server that we're initialized.
81+
initialize_notification = types.JSONRPCNotification(
82+
jsonrpc="2.0",
83+
**types.ClientNotification(
84+
types.InitializedNotification(method="notifications/initialized"),
85+
).model_dump(by_alias=True, mode="json"),
86+
)
87+
await read_stream_writer.send(types.JSONRPCMessage(initialize_notification))
88+
# Notifications don't have a response, so we don't need to await the
89+
# write stream reader.
90+
91+
92+
@asynccontextmanager
93+
async def handle_message(scope: Scope, receive: Receive, send: Send):
94+
"""
95+
ASGI application for handling MCP messages using the stateless HTTP transport.
96+
97+
This function is called for each incoming message. It creates a new
98+
stream for reading and writing messages, which will be used by the
99+
MCP server, and handles:
100+
101+
- decoding the client message from JSON into internal types
102+
- validating the client message
103+
- initializing the MCP server, which must be done on every request
104+
(since this is a stateless transport)
105+
- sending the client message to the MCP server
106+
- receiving the server's response
107+
- encoding the server's response into JSON and sending it back to the client
108+
109+
The returned read and write streams are intended to be passed to
110+
`mcp.server.lowlevel.Server.run()` as the `read_stream` and `write_stream`
111+
arguments.
112+
"""
113+
read_stream, read_stream_writer, write_stream, write_stream_reader = make_streams()
114+
115+
async def handle_post_message():
116+
request = Request(scope, receive)
117+
try:
118+
json = await request.json()
119+
except JSONDecodeError as err:
120+
logger.error(f"Failed to parse message: {err}")
121+
response = Response("Could not parse message", status_code=400)
122+
await response(scope, receive, send)
123+
return
124+
try:
125+
client_message = types.JSONRPCMessage.model_validate(json)
126+
logger.debug(f"Validated client message: {client_message}")
127+
except ValidationError as err:
128+
logger.error(f"Failed to parse message: {err}")
129+
response = Response("Could not parse message", status_code=400)
130+
await response(scope, receive, send)
131+
return
132+
133+
# As part of the MCP spec we need to initialize first.
134+
# In a stateful flow (e.g. stdio or sse transports) the client would
135+
# send an initialize request to the server, and the server would send
136+
# a response back to the client. In this case we're trying to be stateless,
137+
# so we'll handle the initialization ourselves.
138+
logger.debug("Initializing server")
139+
await initialize(read_stream_writer, write_stream_reader)
140+
141+
# Alright, now we can send the client message.
142+
logger.debug("Sending client message")
143+
await read_stream_writer.send(client_message)
144+
# Wait for the server's response, and forward it to the client.
145+
server_message = await write_stream_reader.receive()
146+
obj = server_message.model_dump(by_alias=True, mode="json", exclude_none=True)
147+
response = JSONResponse(obj)
148+
await response(scope, receive, send)
149+
150+
async with anyio.create_task_group() as tg:
151+
tg.start_soon(handle_post_message)
152+
yield (read_stream, write_stream)
153+
154+
155+
@asynccontextmanager
156+
async def http_client(url: str, headers: dict[str, Any] | None = None):
157+
read_stream, read_stream_writer, write_stream, write_stream_reader = make_streams()
158+
159+
async with anyio.create_task_group() as tg:
160+
try:
161+
162+
async def http_rw():
163+
logger.debug("Waiting for request body")
164+
body = await write_stream_reader.receive()
165+
166+
logger.debug(f"Connecting to HTTP endpoint: {url}")
167+
async with httpx.AsyncClient(headers=headers) as client:
168+
response = await client.post(
169+
url, content=body.model_dump_json(by_alias=True)
170+
)
171+
logger.debug(f"Received response: {response.status_code}")
172+
message = types.JSONRPCMessage.model_validate_json(response.content)
173+
await read_stream_writer.send(message)
174+
175+
tg.start_soon(http_rw)
176+
try:
177+
yield read_stream, write_stream
178+
finally:
179+
tg.cancel_scope.cancel()
180+
finally:
181+
await read_stream_writer.aclose()
182+
await write_stream_reader.aclose()

tests/conftest.py

+8
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
import pytest
2+
3+
14
def pytest_addoption(parser):
25
"""
36
Add command line options for integration and cloud tests.
@@ -29,3 +32,8 @@ def pytest_addoption(parser):
2932
default=False,
3033
help="enable cloud integration tests",
3134
)
35+
36+
37+
@pytest.fixture
38+
def anyio_backend():
39+
return "asyncio"

0 commit comments

Comments
 (0)