From cf794de3acb6563475ff7a2d2fc7082d77a7f598 Mon Sep 17 00:00:00 2001 From: MQ Date: Thu, 11 Sep 2025 10:34:08 +0200 Subject: [PATCH 1/2] fix: prevent streamable http wite after end crashing the node process --- src/server/streamableHttp.ts | 75 +++++++++++++++++++++++++++++++++--- 1 file changed, 70 insertions(+), 5 deletions(-) diff --git a/src/server/streamableHttp.ts b/src/server/streamableHttp.ts index 3bf84e430..e96c5894f 100644 --- a/src/server/streamableHttp.ts +++ b/src/server/streamableHttp.ts @@ -303,6 +303,15 @@ export class StreamableHTTPServerTransport implements Transport { res.on("close", () => { this._streamMapping.delete(this._standaloneSseStreamId); }); + + // Add error handler for standalone SSE stream + res.on("error", (error) => { + if ((error as NodeJS.ErrnoException).code === 'ERR_STREAM_WRITE_AFTER_END') { + this._streamMapping.delete(this._standaloneSseStreamId); + } else { + this.onerror?.(error as Error); + } + }); } /** @@ -327,6 +336,11 @@ export class StreamableHTTPServerTransport implements Transport { const streamId = await this._eventStore?.replayEventsAfter(lastEventId, { send: async (eventId: string, message: JSONRPCMessage) => { + // Check stream state before writing + if (res.destroyed || res.writableEnded || !res.writable) { + res.end(); + return; + } if (!this.writeSSEEvent(res, message, eventId)) { this.onerror?.(new Error("Failed replay events")); res.end(); @@ -334,6 +348,15 @@ export class StreamableHTTPServerTransport implements Transport { } }); this._streamMapping.set(streamId, res); + + // Add error handler for replay stream + res.on("error", (error) => { + if ((error as NodeJS.ErrnoException).code === 'ERR_STREAM_WRITE_AFTER_END') { + this._streamMapping.delete(streamId); + } else { + this.onerror?.(error as Error); + } + }); } catch (error) { this.onerror?.(error as Error); } @@ -343,6 +366,11 @@ export class StreamableHTTPServerTransport implements Transport { * Writes an event to the SSE stream with proper formatting */ private writeSSEEvent(res: ServerResponse, message: JSONRPCMessage, eventId?: string): boolean { + // Check if stream is still writable to prevent ERR_STREAM_WRITE_AFTER_END + if (res.destroyed || res.writableEnded || !res.writable) { + return false; + } + let eventData = `event: message\n`; // Include event ID if provided - this is important for resumability if (eventId) { @@ -350,7 +378,14 @@ export class StreamableHTTPServerTransport implements Transport { } eventData += `data: ${JSON.stringify(message)}\n\n`; - return res.write(eventData); + try { + return res.write(eventData); + } catch (error) { + if ((error as NodeJS.ErrnoException).code === 'ERR_STREAM_WRITE_AFTER_END') { + return false; + } + throw error; + } } /** @@ -509,10 +544,12 @@ export class StreamableHTTPServerTransport implements Transport { } // Store the response for this request to send messages back through this connection // We need to track by request ID to maintain the connection + const requestIds: RequestId[] = []; for (const message of messages) { if (isJSONRPCRequest(message)) { this._streamMapping.set(streamId, res); this._requestToStreamMapping.set(message.id, streamId); + requestIds.push(message.id); } } // Set up close handler for client disconnects @@ -520,6 +557,19 @@ export class StreamableHTTPServerTransport implements Transport { this._streamMapping.delete(streamId); }); + // Add error handler for stream write errors + res.on("error", (error) => { + if ((error as NodeJS.ErrnoException).code === 'ERR_STREAM_WRITE_AFTER_END') { + this._streamMapping.delete(streamId); + // Clean up all request mappings for this stream + for (const reqId of requestIds) { + this._requestToStreamMapping.delete(reqId); + } + } else { + this.onerror?.(error as Error); + } + }); + // handle each message for (const message of messages) { this.onmessage?.(message, { authInfo, requestInfo }); @@ -681,7 +731,11 @@ export class StreamableHTTPServerTransport implements Transport { } // Send the message to the standalone SSE stream - this.writeSSEEvent(standaloneSse, message, eventId); + const writeSuccess = this.writeSSEEvent(standaloneSse, message, eventId); + if (!writeSuccess) { + // Clean up if write failed + this._streamMapping.delete(this._standaloneSseStreamId); + } return; } @@ -692,6 +746,14 @@ export class StreamableHTTPServerTransport implements Transport { throw new Error(`No connection established for request ID: ${String(requestId)}`); } + // Check if response stream is still valid and writable + if (!response || response.destroyed || response.writableEnded || !response.writable) { + // Clean up mappings for ended streams + this._streamMapping.delete(streamId); + this._requestToStreamMapping.delete(requestId); + return; + } + if (!this._enableJsonResponse) { // For SSE responses, generate event ID if event store is provided let eventId: string | undefined; @@ -699,9 +761,12 @@ export class StreamableHTTPServerTransport implements Transport { if (this._eventStore) { eventId = await this._eventStore.storeEvent(streamId, message); } - if (response) { - // Write the event to the response stream - this.writeSSEEvent(response, message, eventId); + // Write the event to the response stream (now safe due to validation above) + const writeSuccess = this.writeSSEEvent(response, message, eventId); + if (!writeSuccess) { + // Clean up if write failed + this._streamMapping.delete(streamId); + this._requestToStreamMapping.delete(requestId); } } From dea1ecca84c3eff0767a446f7162d41f5a62ab9c Mon Sep 17 00:00:00 2001 From: MQ Date: Mon, 15 Sep 2025 19:00:12 +0200 Subject: [PATCH 2/2] simplify the fix --- src/server/streamableHttp.ts | 66 +++++------------------------------- 1 file changed, 8 insertions(+), 58 deletions(-) diff --git a/src/server/streamableHttp.ts b/src/server/streamableHttp.ts index e96c5894f..c0da91704 100644 --- a/src/server/streamableHttp.ts +++ b/src/server/streamableHttp.ts @@ -306,11 +306,7 @@ export class StreamableHTTPServerTransport implements Transport { // Add error handler for standalone SSE stream res.on("error", (error) => { - if ((error as NodeJS.ErrnoException).code === 'ERR_STREAM_WRITE_AFTER_END') { - this._streamMapping.delete(this._standaloneSseStreamId); - } else { - this.onerror?.(error as Error); - } + this.onerror?.(error as Error); }); } @@ -336,11 +332,6 @@ export class StreamableHTTPServerTransport implements Transport { const streamId = await this._eventStore?.replayEventsAfter(lastEventId, { send: async (eventId: string, message: JSONRPCMessage) => { - // Check stream state before writing - if (res.destroyed || res.writableEnded || !res.writable) { - res.end(); - return; - } if (!this.writeSSEEvent(res, message, eventId)) { this.onerror?.(new Error("Failed replay events")); res.end(); @@ -351,11 +342,7 @@ export class StreamableHTTPServerTransport implements Transport { // Add error handler for replay stream res.on("error", (error) => { - if ((error as NodeJS.ErrnoException).code === 'ERR_STREAM_WRITE_AFTER_END') { - this._streamMapping.delete(streamId); - } else { - this.onerror?.(error as Error); - } + this.onerror?.(error as Error); }); } catch (error) { this.onerror?.(error as Error); @@ -366,11 +353,6 @@ export class StreamableHTTPServerTransport implements Transport { * Writes an event to the SSE stream with proper formatting */ private writeSSEEvent(res: ServerResponse, message: JSONRPCMessage, eventId?: string): boolean { - // Check if stream is still writable to prevent ERR_STREAM_WRITE_AFTER_END - if (res.destroyed || res.writableEnded || !res.writable) { - return false; - } - let eventData = `event: message\n`; // Include event ID if provided - this is important for resumability if (eventId) { @@ -378,14 +360,7 @@ export class StreamableHTTPServerTransport implements Transport { } eventData += `data: ${JSON.stringify(message)}\n\n`; - try { - return res.write(eventData); - } catch (error) { - if ((error as NodeJS.ErrnoException).code === 'ERR_STREAM_WRITE_AFTER_END') { - return false; - } - throw error; - } + return res.write(eventData); } /** @@ -544,12 +519,10 @@ export class StreamableHTTPServerTransport implements Transport { } // Store the response for this request to send messages back through this connection // We need to track by request ID to maintain the connection - const requestIds: RequestId[] = []; for (const message of messages) { if (isJSONRPCRequest(message)) { this._streamMapping.set(streamId, res); this._requestToStreamMapping.set(message.id, streamId); - requestIds.push(message.id); } } // Set up close handler for client disconnects @@ -559,15 +532,7 @@ export class StreamableHTTPServerTransport implements Transport { // Add error handler for stream write errors res.on("error", (error) => { - if ((error as NodeJS.ErrnoException).code === 'ERR_STREAM_WRITE_AFTER_END') { - this._streamMapping.delete(streamId); - // Clean up all request mappings for this stream - for (const reqId of requestIds) { - this._requestToStreamMapping.delete(reqId); - } - } else { - this.onerror?.(error as Error); - } + this.onerror?.(error as Error); }); // handle each message @@ -731,11 +696,7 @@ export class StreamableHTTPServerTransport implements Transport { } // Send the message to the standalone SSE stream - const writeSuccess = this.writeSSEEvent(standaloneSse, message, eventId); - if (!writeSuccess) { - // Clean up if write failed - this._streamMapping.delete(this._standaloneSseStreamId); - } + this.writeSSEEvent(standaloneSse, message, eventId); return; } @@ -746,14 +707,6 @@ export class StreamableHTTPServerTransport implements Transport { throw new Error(`No connection established for request ID: ${String(requestId)}`); } - // Check if response stream is still valid and writable - if (!response || response.destroyed || response.writableEnded || !response.writable) { - // Clean up mappings for ended streams - this._streamMapping.delete(streamId); - this._requestToStreamMapping.delete(requestId); - return; - } - if (!this._enableJsonResponse) { // For SSE responses, generate event ID if event store is provided let eventId: string | undefined; @@ -761,12 +714,9 @@ export class StreamableHTTPServerTransport implements Transport { if (this._eventStore) { eventId = await this._eventStore.storeEvent(streamId, message); } - // Write the event to the response stream (now safe due to validation above) - const writeSuccess = this.writeSSEEvent(response, message, eventId); - if (!writeSuccess) { - // Clean up if write failed - this._streamMapping.delete(streamId); - this._requestToStreamMapping.delete(requestId); + if (response) { + // Write the event to the response stream + this.writeSSEEvent(response, message, eventId); } }