Skip to content

Commit 2ffc1de

Browse files
committed
Correctly close streams, better handle edge cases
1 parent f36666d commit 2ffc1de

File tree

2 files changed

+53
-40
lines changed

2 files changed

+53
-40
lines changed

src/mcp_grafana/__init__.py

+6-9
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import anyio
55
import uvicorn
66
from mcp.server import FastMCP
7-
from starlette.requests import Request
87

98
from .tools import add_tools
109

@@ -18,14 +17,14 @@ class Transport(enum.StrEnum):
1817
class GrafanaMCP(FastMCP):
1918
async def run_http_async(self) -> None:
2019
from starlette.applications import Starlette
21-
from starlette.routing import Route
20+
from starlette.routing import Mount
2221

2322
from .transports.http import handle_message
2423

25-
async def handle_http(request: Request):
26-
async with handle_message(
27-
request.scope, request.receive, request._send
28-
) as (
24+
async def handle_http(scope, receive, send):
25+
if scope["type"] != "http":
26+
raise ValueError("Expected HTTP request")
27+
async with handle_message(scope, receive, send) as (
2928
read_stream,
3029
write_stream,
3130
):
@@ -37,9 +36,7 @@ async def handle_http(request: Request):
3736

3837
starlette_app = Starlette(
3938
debug=self.settings.debug,
40-
routes=[
41-
Route("/mcp", endpoint=handle_http, methods=["POST"]),
42-
],
39+
routes=[Mount("/", app=handle_http)],
4340
)
4441

4542
config = uvicorn.Config(

src/mcp_grafana/transports/http.py

+47-31
Original file line numberDiff line numberDiff line change
@@ -113,39 +113,55 @@ async def handle_message(scope: Scope, receive: Receive, send: Send):
113113
read_stream, read_stream_writer, write_stream, write_stream_reader = make_streams()
114114

115115
async def handle_post_message():
116-
request = Request(scope, receive)
117116
try:
118-
json = await request.json()
119-
except JSONDecodeError as err:
120-
logger.error(f"Failed to parse message: {err}")
121-
response = Response("Could not parse message", status_code=400)
122-
await response(scope, receive, send)
123-
return
124-
try:
125-
client_message = types.JSONRPCMessage.model_validate(json)
126-
logger.debug(f"Validated client message: {client_message}")
127-
except ValidationError as err:
128-
logger.error(f"Failed to parse message: {err}")
129-
response = Response("Could not parse message", status_code=400)
117+
request = Request(scope, receive)
118+
if request.method != "POST":
119+
response = Response("Method not allowed", status_code=405)
120+
await response(scope, receive, send)
121+
return
122+
if scope["path"] != "/mcp":
123+
response = Response("Not found", status_code=404)
124+
await response(scope, receive, send)
125+
return
126+
try:
127+
json = await request.json()
128+
except JSONDecodeError as err:
129+
logger.error(f"Failed to parse message: {err}")
130+
response = Response("Could not parse message", status_code=400)
131+
await response(scope, receive, send)
132+
return
133+
134+
try:
135+
client_message = types.JSONRPCMessage.model_validate(json)
136+
logger.debug(f"Validated client message: {client_message}")
137+
except ValidationError as err:
138+
logger.error(f"Failed to parse message: {err}")
139+
response = Response("Could not parse message", status_code=400)
140+
await response(scope, receive, send)
141+
return
142+
143+
# As part of the MCP spec we need to initialize first.
144+
# In a stateful flow (e.g. stdio or sse transports) the client would
145+
# send an initialize request to the server, and the server would send
146+
# a response back to the client. In this case we're trying to be stateless,
147+
# so we'll handle the initialization ourselves.
148+
logger.debug("Initializing server")
149+
await initialize(read_stream_writer, write_stream_reader)
150+
151+
# Alright, now we can send the client message.
152+
logger.debug("Sending client message")
153+
await read_stream_writer.send(client_message)
154+
155+
# Wait for the server's response, and forward it to the client.
156+
server_message = await write_stream_reader.receive()
157+
obj = server_message.model_dump(
158+
by_alias=True, mode="json", exclude_none=True
159+
)
160+
response = JSONResponse(obj)
130161
await response(scope, receive, send)
131-
return
132-
133-
# As part of the MCP spec we need to initialize first.
134-
# In a stateful flow (e.g. stdio or sse transports) the client would
135-
# send an initialize request to the server, and the server would send
136-
# a response back to the client. In this case we're trying to be stateless,
137-
# so we'll handle the initialization ourselves.
138-
logger.debug("Initializing server")
139-
await initialize(read_stream_writer, write_stream_reader)
140-
141-
# Alright, now we can send the client message.
142-
logger.debug("Sending client message")
143-
await read_stream_writer.send(client_message)
144-
# Wait for the server's response, and forward it to the client.
145-
server_message = await write_stream_reader.receive()
146-
obj = server_message.model_dump(by_alias=True, mode="json", exclude_none=True)
147-
response = JSONResponse(obj)
148-
await response(scope, receive, send)
162+
finally:
163+
await read_stream_writer.aclose()
164+
await write_stream_reader.aclose()
149165

150166
async with anyio.create_task_group() as tg:
151167
tg.start_soon(handle_post_message)

0 commit comments

Comments
 (0)