Skip to content

Commit 6368bfa

Browse files
fix: revert GET stream event storage, add POST SSE polling test
- Revert send() changes for GET streams: only store events when stream is active (not part of SEP-1699 scope) - Add POST SSE polling test that demonstrates full reconnection flow: client sends POST, gets priming event, server closes stream, client reconnects with Last-Event-ID, events are replayed
1 parent 7439f48 commit 6368bfa

File tree

2 files changed

+107
-7
lines changed

2 files changed

+107
-7
lines changed

src/server/streamableHttp.test.ts

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1771,6 +1771,107 @@ describe('StreamableHTTPServerTransport POST SSE priming events', () => {
17711771
// Clean up - resolve the tool promise
17721772
toolResolve!();
17731773
});
1774+
1775+
it('should support POST SSE polling with client reconnection', async () => {
1776+
const result = await createTestServer({
1777+
sessionIdGenerator: () => randomUUID(),
1778+
eventStore: createEventStore(),
1779+
retryInterval: 1000
1780+
});
1781+
server = result.server;
1782+
transport = result.transport;
1783+
baseUrl = result.baseUrl;
1784+
mcpServer = result.mcpServer;
1785+
1786+
// Track tool execution state
1787+
let toolResolve: () => void;
1788+
const toolPromise = new Promise<void>(resolve => {
1789+
toolResolve = resolve;
1790+
});
1791+
1792+
// Register a tool that sends progress and then completes
1793+
mcpServer.tool('polling-tool', 'A tool for polling test', {}, async () => {
1794+
// Wait for test to signal completion
1795+
await toolPromise;
1796+
return { content: [{ type: 'text', text: 'Final result' }] };
1797+
});
1798+
1799+
// Initialize to get session ID
1800+
const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
1801+
sessionId = initResponse.headers.get('mcp-session-id') as string;
1802+
expect(sessionId).toBeDefined();
1803+
1804+
// Send a POST request for the tool
1805+
const toolCallRequest: JSONRPCMessage = {
1806+
jsonrpc: '2.0',
1807+
id: 100,
1808+
method: 'tools/call',
1809+
params: { name: 'polling-tool', arguments: {} }
1810+
};
1811+
1812+
const postResponse = await fetch(baseUrl, {
1813+
method: 'POST',
1814+
headers: {
1815+
'Content-Type': 'application/json',
1816+
Accept: 'text/event-stream, application/json',
1817+
'mcp-session-id': sessionId,
1818+
'mcp-protocol-version': '2025-03-26'
1819+
},
1820+
body: JSON.stringify(toolCallRequest)
1821+
});
1822+
1823+
expect(postResponse.status).toBe(200);
1824+
expect(postResponse.headers.get('content-type')).toBe('text/event-stream');
1825+
1826+
const reader = postResponse.body?.getReader();
1827+
1828+
// Read the priming event and extract event ID
1829+
const { value: primingValue } = await reader!.read();
1830+
const primingText = new TextDecoder().decode(primingValue);
1831+
expect(primingText).toContain('id: ');
1832+
expect(primingText).toContain('retry: 1000');
1833+
1834+
// Extract the priming event ID
1835+
const primingIdMatch = primingText.match(/id: ([^\n]+)/);
1836+
expect(primingIdMatch).toBeTruthy();
1837+
const primingEventId = primingIdMatch![1];
1838+
1839+
// Server closes the stream to trigger polling
1840+
transport.closeSSEStream(100);
1841+
1842+
// Verify stream is closed
1843+
const { done } = await reader!.read();
1844+
expect(done).toBe(true);
1845+
1846+
// Now complete the tool - this will store the result event
1847+
toolResolve!();
1848+
1849+
// Give the tool time to complete and store the result
1850+
await new Promise(resolve => setTimeout(resolve, 50));
1851+
1852+
// Client reconnects with Last-Event-ID to get missed events
1853+
const reconnectResponse = await fetch(baseUrl, {
1854+
method: 'GET',
1855+
headers: {
1856+
Accept: 'text/event-stream',
1857+
'mcp-session-id': sessionId,
1858+
'mcp-protocol-version': '2025-03-26',
1859+
'last-event-id': primingEventId
1860+
}
1861+
});
1862+
1863+
expect(reconnectResponse.status).toBe(200);
1864+
expect(reconnectResponse.headers.get('content-type')).toBe('text/event-stream');
1865+
1866+
// Read the replayed events
1867+
const reconnectReader = reconnectResponse.body?.getReader();
1868+
const { value: replayValue } = await reconnectReader!.read();
1869+
const replayText = new TextDecoder().decode(replayValue);
1870+
1871+
// Should receive the tool result that was stored after stream was closed
1872+
expect(replayText).toContain('Final result');
1873+
expect(replayText).toContain('"id":100');
1874+
});
17741875
});
17751876

17761877
// Test onsessionclosed callback

src/server/streamableHttp.ts

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -774,19 +774,18 @@ export class StreamableHTTPServerTransport implements Transport {
774774
throw new Error('Cannot send a response on a standalone SSE stream unless resuming a previous client request');
775775
}
776776

777+
const standaloneSse = this._streamMapping.get(this._standaloneSseStreamId);
778+
if (standaloneSse === undefined) {
779+
// No active stream - nothing to do
780+
return;
781+
}
782+
777783
// Generate and store event ID if event store is provided
778-
// Store even if no active stream - events will be replayed on reconnection
779784
let eventId: string | undefined;
780785
if (this._eventStore) {
781786
eventId = await this._eventStore.storeEvent(this._standaloneSseStreamId, message);
782787
}
783788

784-
const standaloneSse = this._streamMapping.get(this._standaloneSseStreamId);
785-
if (standaloneSse === undefined) {
786-
// No active stream - message stored for replay, nothing more to do
787-
return;
788-
}
789-
790789
// Send the message to the standalone SSE stream
791790
this.writeSSEEvent(standaloneSse, message, eventId);
792791
return;

0 commit comments

Comments
 (0)