Skip to content

Commit c1d581a

Browse files
Add test for replaying multiple messages after closeSSEStream
Addresses PR feedback from paoloricciuti requesting test coverage for the scenario where multiple messages are sent while the SSE client is disconnected. Uses a batch of tool calls to generate multiple responses that get stored and replayed on reconnection.
1 parent 464b1f8 commit c1d581a

File tree

1 file changed

+132
-0
lines changed

1 file changed

+132
-0
lines changed

src/server/streamableHttp.test.ts

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1833,6 +1833,138 @@ describe('StreamableHTTPServerTransport POST SSE priming events', () => {
18331833
expect(replayText).toContain('Final result');
18341834
expect(replayText).toContain('"id":100');
18351835
});
1836+
1837+
it('should replay multiple messages sent after closeSSEStream', async () => {
1838+
const result = await createTestServer({
1839+
sessionIdGenerator: () => randomUUID(),
1840+
eventStore: createEventStore(),
1841+
retryInterval: 1000
1842+
});
1843+
server = result.server;
1844+
transport = result.transport;
1845+
baseUrl = result.baseUrl;
1846+
mcpServer = result.mcpServer;
1847+
1848+
// Track tool execution state - we'll use multiple tools
1849+
let tool1Resolve: () => void;
1850+
let tool2Resolve: () => void;
1851+
let tool3Resolve: () => void;
1852+
const tool1Promise = new Promise<void>(resolve => {
1853+
tool1Resolve = resolve;
1854+
});
1855+
const tool2Promise = new Promise<void>(resolve => {
1856+
tool2Resolve = resolve;
1857+
});
1858+
const tool3Promise = new Promise<void>(resolve => {
1859+
tool3Resolve = resolve;
1860+
});
1861+
1862+
// Register multiple tools that wait for test signals
1863+
mcpServer.tool('tool-1', 'First tool', {}, async () => {
1864+
await tool1Promise;
1865+
return { content: [{ type: 'text', text: 'Result from tool 1' }] };
1866+
});
1867+
mcpServer.tool('tool-2', 'Second tool', {}, async () => {
1868+
await tool2Promise;
1869+
return { content: [{ type: 'text', text: 'Result from tool 2' }] };
1870+
});
1871+
mcpServer.tool('tool-3', 'Third tool', {}, async () => {
1872+
await tool3Promise;
1873+
return { content: [{ type: 'text', text: 'Result from tool 3' }] };
1874+
});
1875+
1876+
// Initialize to get session ID
1877+
const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
1878+
sessionId = initResponse.headers.get('mcp-session-id') as string;
1879+
expect(sessionId).toBeDefined();
1880+
1881+
// Send a BATCH of tool calls in one POST request
1882+
// All responses will go to the same SSE stream
1883+
const batchRequest: JSONRPCMessage[] = [
1884+
{ jsonrpc: '2.0', id: 201, method: 'tools/call', params: { name: 'tool-1', arguments: {} } },
1885+
{ jsonrpc: '2.0', id: 202, method: 'tools/call', params: { name: 'tool-2', arguments: {} } },
1886+
{ jsonrpc: '2.0', id: 203, method: 'tools/call', params: { name: 'tool-3', arguments: {} } }
1887+
];
1888+
1889+
const postResponse = await fetch(baseUrl, {
1890+
method: 'POST',
1891+
headers: {
1892+
'Content-Type': 'application/json',
1893+
Accept: 'text/event-stream, application/json',
1894+
'mcp-session-id': sessionId,
1895+
'mcp-protocol-version': '2025-03-26'
1896+
},
1897+
body: JSON.stringify(batchRequest)
1898+
});
1899+
1900+
expect(postResponse.status).toBe(200);
1901+
expect(postResponse.headers.get('content-type')).toBe('text/event-stream');
1902+
1903+
const reader = postResponse.body?.getReader();
1904+
1905+
// Read the priming event and extract event ID
1906+
const { value: primingValue } = await reader!.read();
1907+
const primingText = new TextDecoder().decode(primingValue);
1908+
expect(primingText).toContain('id: ');
1909+
expect(primingText).toContain('retry: 1000');
1910+
1911+
// Extract the priming event ID
1912+
const primingIdMatch = primingText.match(/id: ([^\n]+)/);
1913+
expect(primingIdMatch).toBeTruthy();
1914+
const primingEventId = primingIdMatch![1];
1915+
1916+
// Server closes the stream to trigger polling - use first request ID
1917+
transport.closeSSEStream(201);
1918+
1919+
// Verify stream is closed
1920+
const { done } = await reader!.read();
1921+
expect(done).toBe(true);
1922+
1923+
// Complete all tools while the client is disconnected
1924+
// Each completion will store a response in the event store
1925+
tool1Resolve!();
1926+
await new Promise(resolve => setTimeout(resolve, 10));
1927+
tool2Resolve!();
1928+
await new Promise(resolve => setTimeout(resolve, 10));
1929+
tool3Resolve!();
1930+
1931+
// Give all tools time to complete and store results
1932+
await new Promise(resolve => setTimeout(resolve, 50));
1933+
1934+
// Client reconnects with Last-Event-ID to get all missed events
1935+
const reconnectResponse = await fetch(baseUrl, {
1936+
method: 'GET',
1937+
headers: {
1938+
Accept: 'text/event-stream',
1939+
'mcp-session-id': sessionId,
1940+
'mcp-protocol-version': '2025-03-26',
1941+
'last-event-id': primingEventId
1942+
}
1943+
});
1944+
1945+
expect(reconnectResponse.status).toBe(200);
1946+
expect(reconnectResponse.headers.get('content-type')).toBe('text/event-stream');
1947+
1948+
// Read the replayed events
1949+
const reconnectReader = reconnectResponse.body?.getReader();
1950+
const { value: replayValue } = await reconnectReader!.read();
1951+
const replayText = new TextDecoder().decode(replayValue);
1952+
1953+
// Should receive all three tool results that were stored after stream was closed
1954+
expect(replayText).toContain('Result from tool 1');
1955+
expect(replayText).toContain('Result from tool 2');
1956+
expect(replayText).toContain('Result from tool 3');
1957+
1958+
// Verify all request IDs are present
1959+
expect(replayText).toContain('"id":201');
1960+
expect(replayText).toContain('"id":202');
1961+
expect(replayText).toContain('"id":203');
1962+
1963+
// Verify multiple event IDs are present (at least 3 messages)
1964+
const eventIds = replayText.match(/id: [^\n]+/g);
1965+
expect(eventIds).toBeTruthy();
1966+
expect(eventIds!.length).toBeGreaterThanOrEqual(3);
1967+
});
18361968
});
18371969

18381970
// Test onsessionclosed callback

0 commit comments

Comments
 (0)