diff --git a/README.md b/README.md index f468969b..a305dca4 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,7 @@ - [Running Your Server](#running-your-server) - [stdio](#stdio) - [Streamable HTTP](#streamable-http) + - [Using with Node.js HTTP Frameworks (Fastify, Express, etc.)](#using-with-nodejs-http-frameworks-fastify-express-etc) - [Testing and Debugging](#testing-and-debugging) - [Examples](#examples) - [Echo Server](#echo-server) @@ -386,6 +387,55 @@ This stateless approach is useful for: - RESTful scenarios where each request is independent - Horizontally scaled deployments without shared session state +### Using with Node.js HTTP Frameworks (Fastify, Express, etc.) + +The `StreamableHTTPServerTransport` works directly with Node.js's native `IncomingMessage` and `ServerResponse` objects. To simplify integration with common Node.js web frameworks like Fastify or Express, the SDK provides the `RawHttpServerAdapter`. This adapter wraps `StreamableHTTPServerTransport` and exposes a `handleNodeRequest` method. + +The `RawHttpServerAdapter` expects your framework's request and response objects to provide access to the underlying Node.js `request.raw` and `response.raw` objects. It also expects that your framework can provide the pre-parsed request body (e.g., `request.body`). + +**Key features:** +- Implements the `Transport` interface, so it can be directly passed to `McpServer.connect()`. +- Delegates to an internal `StreamableHTTPServerTransport` instance. +- Simplifies request/response handling by working with the raw Node.js objects exposed by many frameworks. + +**Example with a Fastify-like setup:** + +```typescript +import { McpServer, StreamableHTTPServerTransportOptions } from "@modelcontextprotocol/sdk/server"; // Or specific paths +import { RawHttpServerAdapter } from "@modelcontextprotocol/sdk/server/raw-http-adapter"; // Adjust path as needed +import { randomUUID } from "node:crypto"; +// import YourFramework from 'your-framework'; // e.g., Fastify or Express + +// const frameworkApp = YourFramework(); +// const mcpServer = new McpServer({ name: "MyFrameworkMCPServer", version: "1.0.0" }); + +const mcpAdapterOptions: StreamableHTTPServerTransportOptions = { + sessionIdGenerator: () => randomUUID(), + // enableJsonResponse: true, // Useful for POSTs if SSE is not desired for responses +}; +const mcpAdapter = new RawHttpServerAdapter(mcpAdapterOptions); + +await mcpServer.connect(mcpAdapter); + +// In your framework's route handler for the MCP endpoint (e.g., /mcp) +// frameworkApp.all('/mcp', async (request, reply) => { +// try { +// await mcpAdapter.handleNodeRequest( +// { raw: request.raw, body: request.body }, // From your framework +// { raw: reply.raw } // From your framework +// ); +// // IMPORTANT for SSE: Do not let your framework automatically end the response here. +// // The mcpAdapter will manage the response stream (reply.raw). +// } catch (error) { +// // Handle error +// if (!reply.raw.headersSent) { /* send error response via reply.raw */ } +// } +// }); + +// frameworkApp.listen({ port: 3000 }); +``` +Refer to the example in `src/examples/server/simpleFastifyServer.ts` for a runnable demonstration with Fastify. + ### Testing and Debugging To test your server, you can use the [MCP Inspector](https://github.com/modelcontextprotocol/inspector). See its README for more information. diff --git a/src/examples/server/simpleFastifyServer.ts b/src/examples/server/simpleFastifyServer.ts new file mode 100644 index 00000000..cb075ceb --- /dev/null +++ b/src/examples/server/simpleFastifyServer.ts @@ -0,0 +1,211 @@ +import Fastify from "fastify"; +import { McpServer } from "../../server/mcp.js"; +import { StreamableHTTPServerTransportOptions } from "../../server/streamableHttp.js"; +import { RawHttpServerAdapter } from "../../server/raw-http-adapter.js"; +import { randomUUID } from "node:crypto"; +import { z } from "zod"; +import { CallToolResult, GetPromptResult, ReadResourceResult } from '../../types.js'; + +async function runFastifyMCPServer() { + const fastify = Fastify({ logger: true }); + + const mcpAdapterOptions: StreamableHTTPServerTransportOptions = { + sessionIdGenerator: () => randomUUID(), + // For true JSON request/response (non-SSE) for POSTs, uncomment the next line. + // Otherwise, POST requests that expect a response will use an SSE stream. + // enableJsonResponse: true, + }; + const mcpAdapter = new RawHttpServerAdapter(mcpAdapterOptions); + + const mcpServer = new McpServer({ + name: "simple-streamable-http-server", + version: "1.0.0", + }, { capabilities: { logging: {} } }); + + // === Tools, Resource, and Prompt from simpleStreamableHttp.ts === + + // Tool: greet + mcpServer.tool( + 'greet', + 'A simple greeting tool', + { + name: z.string().describe('Name to greet'), + }, + async ({ name }): Promise => { + return { + content: [ + { + type: 'text', + text: `Hello, ${name}!`, + }, + ], + }; + } + ); + + // Tool: multi-greet + mcpServer.tool( + 'multi-greet', + 'A tool that sends different greetings with delays between them', + { + name: z.string().describe('Name to greet'), + }, + { + title: 'Multiple Greeting Tool', + readOnlyHint: true, + openWorldHint: false + }, + async ({ name }, { sendNotification }): Promise => { + const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); + + await sendNotification({ + method: "notifications/message", + params: { level: "debug", data: `Starting multi-greet for ${name}` } + }); + + await sleep(1000); // Wait 1 second before first greeting + + await sendNotification({ + method: "notifications/message", + params: { level: "info", data: `Sending first greeting to ${name}` } + }); + + await sleep(1000); // Wait another second before second greeting + + await sendNotification({ + method: "notifications/message", + params: { level: "info", data: `Sending second greeting to ${name}` } + }); + + return { + content: [ + { + type: 'text', + text: `Good morning, ${name}!`, + } + ], + }; + } + ); + + // Tool: start-notification-stream + mcpServer.tool( + 'start-notification-stream', + 'Starts sending periodic notifications for testing resumability', + { + interval: z.number().describe('Interval in milliseconds between notifications').default(100), + count: z.number().describe('Number of notifications to send (0 for 100)').default(50), + }, + async ({ interval, count }, { sendNotification }): Promise => { + const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); + let counter = 0; + + while (count === 0 || counter < count) { + counter++; + try { + await sendNotification({ + method: "notifications/message", + params: { + level: "info", + data: `Periodic notification #${counter} at ${new Date().toISOString()}` + } + }); + } + catch (error) { + console.error("Error sending notification:", error); + } + await sleep(interval); + } + + return { + content: [ + { + type: 'text', + text: `Started sending periodic notifications every ${interval}ms`, + } + ], + }; + } + ); + + // Resource: greeting-resource + mcpServer.resource( + 'greeting-resource', + 'https://example.com/greetings/default', + { mimeType: 'text/plain' }, + async (): Promise => { + return { + contents: [ + { + uri: 'https://example.com/greetings/default', + text: 'Hello, world!', + }, + ], + }; + } + ); + + // Prompt: greeting-template + mcpServer.prompt( + 'greeting-template', + 'A simple greeting prompt template', + { + name: z.string().describe('Name to include in greeting'), + }, + async ({ name }): Promise => { + return { + messages: [ + { + role: 'user', + content: { + type: 'text', + text: `Please greet ${name} in a friendly manner.`, + }, + }, + ], + }; + } + ); + + // === End of copied tools, resource, and prompt === + + await mcpServer.connect(mcpAdapter); + + // Register a catch-all route for the /mcp endpoint + fastify.all("/mcp", async (request, reply) => { + try { + await mcpAdapter.handleNodeRequest( + { raw: request.raw, body: request.body }, + { raw: reply.raw } + ); + // IMPORTANT for SSE: Do not let Fastify automatically end the response here. + // The mcpAdapter (via StreamableHTTPServerTransport) will manage the response stream (reply.raw). + } catch (error) { + fastify.log.error(error, "Error in MCP request handler"); + if (!reply.raw.headersSent) { + reply.raw.writeHead(500, { "Content-Type": "application/json" }); + reply.raw.end( + JSON.stringify({ + jsonrpc: "2.0", + error: { code: -32000, message: "Internal Server Error" }, + id: null, + }) + ); + } + } + }); + + try { + const address = await fastify.listen({ port: 3000, host: "0.0.0.0" }); + fastify.log.info(`MCP Server (Fastify with tools from simpleStreamableHttp) listening on ${address}`); + fastify.log.info(`MCP endpoint available at POST ${address}/mcp`); + fastify.log.info(`Available tools: greet, multi-greet, start-notification-stream`); + fastify.log.info(`Available resource: GET https://example.com/greetings/default (via MCP readResource)`); + fastify.log.info(`Available prompt: greeting-template`); + } catch (err) { + fastify.log.error(err); + process.exit(1); + } +} + +runFastifyMCPServer(); diff --git a/src/server/raw-http-adapter.test.ts b/src/server/raw-http-adapter.test.ts new file mode 100644 index 00000000..a786e237 --- /dev/null +++ b/src/server/raw-http-adapter.test.ts @@ -0,0 +1,197 @@ +import { RawHttpServerAdapter } from "./raw-http-adapter.js"; +import { + StreamableHTTPServerTransport, + StreamableHTTPServerTransportOptions, +} from "./streamableHttp.js"; +import type { IncomingMessage, ServerResponse } from "node:http"; +import type { JSONRPCMessage, RequestId } from "../types.js"; + +const mockHandleRequest = jest.fn(); +const mockSend = jest.fn(); +const mockStart = jest.fn(); +const mockClose = jest.fn(); + +let mockTransportOnMessage: ((message: JSONRPCMessage) => void) | undefined; +let mockTransportOnError: ((error: Error) => void) | undefined; +let mockTransportOnClose: (() => void) | undefined; +let mockTransportSessionId: string | undefined; + +jest.mock("./streamableHttp.js", () => ({ + StreamableHTTPServerTransport: jest + .fn() + .mockImplementation((options: StreamableHTTPServerTransportOptions) => { + // Allow setting these callbacks from the adapter + return { + start: mockStart, + close: mockClose, + send: mockSend, + handleRequest: mockHandleRequest, + get sessionId() { + return mockTransportSessionId; + }, + set sessionId(id: string | undefined) { + mockTransportSessionId = id; + }, + set onmessage(fn: ((message: JSONRPCMessage) => void) | undefined) { + mockTransportOnMessage = fn; + }, + get onmessage() { + return mockTransportOnMessage ?? (() => {}); + }, + set onerror(fn: ((error: Error) => void) | undefined) { + mockTransportOnError = fn; + }, + get onerror() { + return mockTransportOnError ?? ((_error: Error) => {}); + }, + set onclose(fn: (() => void) | undefined) { + mockTransportOnClose = fn; + }, + get onclose() { + return mockTransportOnClose ?? (() => {}); + }, + // Store options to assert them if needed + _options: options, + }; + }), +})); + +const MockedStreamableHTTPServerTransport = + StreamableHTTPServerTransport as jest.MockedClass< + typeof StreamableHTTPServerTransport + >; + +describe("RawHttpServerAdapter", () => { + let adapter: RawHttpServerAdapter; + const mockOptions: StreamableHTTPServerTransportOptions = { + sessionIdGenerator: () => "test-session-id", + }; + + beforeEach(() => { + MockedStreamableHTTPServerTransport.mockClear(); + mockHandleRequest.mockClear(); + mockSend.mockClear(); + mockStart.mockClear(); + mockClose.mockClear(); + mockTransportOnMessage = undefined; + mockTransportOnError = undefined; + mockTransportOnClose = undefined; + mockTransportSessionId = undefined; + + adapter = new RawHttpServerAdapter(mockOptions); + }); + + it("should instantiate StreamableHTTPServerTransport with options", () => { + expect(MockedStreamableHTTPServerTransport).toHaveBeenCalledTimes(1); + expect(MockedStreamableHTTPServerTransport).toHaveBeenCalledWith( + mockOptions + ); + }); + + describe("constructor callback and sessionId forwarding", () => { + it("should forward onmessage from transport to adapter", () => { + const adapterOnMessage = jest.fn(); + adapter.onmessage = adapterOnMessage; + + const testMessage: JSONRPCMessage = { jsonrpc: "2.0", method: "test" }; + mockTransportSessionId = "session-from-transport"; + if (mockTransportOnMessage) { + mockTransportOnMessage(testMessage); + } + + expect(adapterOnMessage).toHaveBeenCalledWith(testMessage); + expect(adapter.sessionId).toBe("session-from-transport"); + }); + + it("should forward onerror from transport to adapter", () => { + const adapterOnError = jest.fn(); + adapter.onerror = adapterOnError; + + const testError = new Error("test error"); + if (mockTransportOnError) { + mockTransportOnError(testError); + } + + expect(adapterOnError).toHaveBeenCalledWith(testError); + }); + + it("should forward onclose from transport to adapter and clear sessionId", () => { + const adapterOnClose = jest.fn(); + adapter.onclose = adapterOnClose; + adapter.sessionId = "initial-session"; + + if (mockTransportOnClose) { + mockTransportOnClose(); + } + + expect(adapterOnClose).toHaveBeenCalledTimes(1); + expect(adapter.sessionId).toBeUndefined(); + }); + + it("should initialize sessionId from transport if available on construction", () => { + mockTransportSessionId = "pre-existing-session"; + const newAdapter = new RawHttpServerAdapter(mockOptions); + expect(newAdapter.sessionId).toBe("pre-existing-session"); + }); + }); + + describe("start()", () => { + it("should call mcpTransport.start() and update sessionId", async () => { + mockStart.mockImplementation(async () => { + mockTransportSessionId = "started-session"; + }); + await adapter.start(); + expect(mockStart).toHaveBeenCalledTimes(1); + expect(adapter.sessionId).toBe("started-session"); + }); + }); + + describe("close()", () => { + it("should call mcpTransport.close() and clear sessionId", async () => { + adapter.sessionId = "active-session"; + mockClose.mockImplementation(async () => { + mockTransportSessionId = undefined; + }); + await adapter.close(); + expect(mockClose).toHaveBeenCalledTimes(1); + expect(adapter.sessionId).toBeUndefined(); + }); + }); + + describe("send()", () => { + it("should call mcpTransport.send() with message and options, and update sessionId", async () => { + const message: JSONRPCMessage = { + jsonrpc: "2.0", + method: "notify", + params: { data: 1 }, + }; + const options = { relatedRequestId: "req-1" as RequestId }; + mockSend.mockImplementation(async () => { + mockTransportSessionId = "session-after-send"; + }); + + await adapter.send(message, options); + expect(mockSend).toHaveBeenCalledWith(message, options); + expect(adapter.sessionId).toBe("session-after-send"); + }); + }); + + describe("handleNodeRequest()", () => { + const mockReq = { raw: {} as IncomingMessage, body: { data: "test_body" } }; + const mockRes = { raw: {} as ServerResponse }; + + it("should call mcpTransport.handleRequest() with raw request, response, and body", async () => { + mockHandleRequest.mockImplementation(async () => { + mockTransportSessionId = "session-after-handle"; + }); + + await adapter.handleNodeRequest(mockReq, mockRes); + expect(mockHandleRequest).toHaveBeenCalledWith( + mockReq.raw, + mockRes.raw, + mockReq.body + ); + expect(adapter.sessionId).toBe("session-after-handle"); + }); + }); +}); diff --git a/src/server/raw-http-adapter.ts b/src/server/raw-http-adapter.ts new file mode 100644 index 00000000..4c84c179 --- /dev/null +++ b/src/server/raw-http-adapter.ts @@ -0,0 +1,97 @@ +import { IncomingMessage, ServerResponse } from "node:http"; +import { + StreamableHTTPServerTransport, + StreamableHTTPServerTransportOptions, +} from "./streamableHttp.js"; +import { Transport } from "../shared/transport.js"; +import { JSONRPCMessage, RequestId } from "../types.js"; + +// Container for a Node.js IncomingMessage, optionally with a pre-parsed body. +interface NodeRequestContainer { + raw: IncomingMessage; + body?: unknown; // Pre-parsed body by the framework (e.g., Fastify's request.body) +} + +// Container for a Node.js ServerResponse. +interface NodeResponseContainer { + raw: ServerResponse; +} + +/** + * An adapter to use StreamableHTTPServerTransport with Node.js HTTP frameworks + * that provide access to the raw Node.js request and response objects (e.g., Fastify, Express). + * This adapter implements the MCP Transport interface, allowing it to be + * used directly with `McpServer.connect()`. + */ +export class RawHttpServerAdapter implements Transport { + private mcpTransport: StreamableHTTPServerTransport; + + // Transport interface properties + onmessage?: (message: JSONRPCMessage) => void; + onerror?: (error: Error) => void; + onclose?: () => void; + sessionId?: string; + + constructor(options: StreamableHTTPServerTransportOptions) { + this.mcpTransport = new StreamableHTTPServerTransport(options); + + this.mcpTransport.onmessage = (msg) => { + this.sessionId = this.mcpTransport.sessionId; + this.onmessage?.(msg); + }; + this.mcpTransport.onerror = (err) => this.onerror?.(err); + this.mcpTransport.onclose = () => { + this.sessionId = undefined; + this.onclose?.(); + }; + this.sessionId = this.mcpTransport.sessionId; + } + + /** + * Starts the transport. For StreamableHTTPServerTransport, this is largely a no-op + * as connections are request-driven, but it fulfills the Transport interface. + */ + public async start(): Promise { + await this.mcpTransport.start(); + this.sessionId = this.mcpTransport.sessionId; + } + + /** + * Closes the transport and cleans up any resources. + */ + public async close(): Promise { + await this.mcpTransport.close(); + this.sessionId = undefined; + } + + /** + * Sends a JSONRPCMessage through the transport. + * @param message The message to send. + * @param options Optional parameters, like relatedRequestId for responses. + */ + public async send( + message: JSONRPCMessage, + options?: { relatedRequestId?: RequestId } + ): Promise { + await this.mcpTransport.send(message, options); + this.sessionId = this.mcpTransport.sessionId; + } + + /** + * Handles an incoming HTTP request from a Node.js-based framework + * and delegates it to the StreamableHTTPServerTransport. + * @param requestContainer An object containing the raw Node.js IncomingMessage and optionally a pre-parsed body. + * @param responseContainer An object containing the raw Node.js ServerResponse. + */ + public async handleNodeRequest( + requestContainer: NodeRequestContainer, + responseContainer: NodeResponseContainer + ): Promise { + await this.mcpTransport.handleRequest( + requestContainer.raw, + responseContainer.raw, + requestContainer.body + ); + this.sessionId = this.mcpTransport.sessionId; + } +}