Skip to content

Commit b4b7aa1

Browse files
fix: only send priming events when eventStore is configured
Priming events are only meaningful for resumability, which requires an eventStore. Without eventStore, the event ID is useless since there's nothing to resume from. - Rename _writePrimingEvent to _maybeWritePrimingEvent - Guard priming event behind eventStore check - Restore original tests that don't have eventStore - Add eventStore to priming event tests
1 parent 76c3cb9 commit b4b7aa1

File tree

2 files changed

+39
-14
lines changed

2 files changed

+39
-14
lines changed

src/server/streamableHttp.test.ts

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -534,20 +534,14 @@ describe('StreamableHTTPServerTransport', () => {
534534
expect(sseResponse.status).toBe(200);
535535
const reader = sseResponse.body?.getReader();
536536

537-
// Read and validate the priming event
538-
const { value: primingValue } = await reader!.read();
539-
const primingText = new TextDecoder().decode(primingValue);
540-
expect(primingText).toContain('id: ');
541-
expect(primingText.match(/id: ([^\n]+)/)).toBeTruthy();
542-
543-
// Send a notification
537+
// Send multiple notifications
544538
const notification1: JSONRPCMessage = {
545539
jsonrpc: '2.0',
546540
method: 'notifications/message',
547541
params: { level: 'info', data: 'First notification' }
548542
};
549543

550-
// Send and verify it comes through - then the stream should stay open
544+
// Just send one and verify it comes through - then the stream should stay open
551545
await transport.send(notification1);
552546

553547
const { value, done } = await reader!.read();
@@ -1562,6 +1556,30 @@ describe('StreamableHTTPServerTransport SSE priming events', () => {
15621556
let baseUrl: URL;
15631557
let sessionId: string;
15641558

1559+
// Simple eventStore for priming event tests
1560+
const createEventStore = (): EventStore => {
1561+
const storedEvents = new Map<string, { eventId: string; message: JSONRPCMessage }>();
1562+
return {
1563+
async storeEvent(streamId: string, message: JSONRPCMessage): Promise<string> {
1564+
const eventId = `${streamId}_${randomUUID()}`;
1565+
storedEvents.set(eventId, { eventId, message });
1566+
return eventId;
1567+
},
1568+
async replayEventsAfter(
1569+
lastEventId: EventId,
1570+
{ send }: { send: (eventId: EventId, message: JSONRPCMessage) => Promise<void> }
1571+
): Promise<StreamId> {
1572+
const streamId = lastEventId.split('_')[0];
1573+
for (const [eventId, { message }] of storedEvents.entries()) {
1574+
if (eventId.startsWith(streamId) && eventId !== lastEventId) {
1575+
await send(eventId, message);
1576+
}
1577+
}
1578+
return streamId;
1579+
}
1580+
};
1581+
};
1582+
15651583
afterEach(async () => {
15661584
if (server && transport) {
15671585
await stopTestServer({ server, transport });
@@ -1571,6 +1589,7 @@ describe('StreamableHTTPServerTransport SSE priming events', () => {
15711589
it('should send priming event with retry field when retryInterval is configured', async () => {
15721590
const result = await createTestServer({
15731591
sessionIdGenerator: () => randomUUID(),
1592+
eventStore: createEventStore(),
15741593
retryInterval: 5000
15751594
});
15761595
server = result.server;
@@ -1608,7 +1627,8 @@ describe('StreamableHTTPServerTransport SSE priming events', () => {
16081627

16091628
it('should send priming event without retry field when retryInterval is not configured', async () => {
16101629
const result = await createTestServer({
1611-
sessionIdGenerator: () => randomUUID()
1630+
sessionIdGenerator: () => randomUUID(),
1631+
eventStore: createEventStore()
16121632
// No retryInterval
16131633
});
16141634
server = result.server;
@@ -1665,6 +1685,7 @@ describe('StreamableHTTPServerTransport SSE priming events', () => {
16651685
it('should include event ID in priming event for resumability', async () => {
16661686
const result = await createTestServer({
16671687
sessionIdGenerator: () => randomUUID(),
1688+
eventStore: createEventStore(),
16681689
retryInterval: 2000
16691690
});
16701691
server = result.server;

src/server/streamableHttp.ts

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -260,10 +260,14 @@ export class StreamableHTTPServerTransport implements Transport {
260260

261261
/**
262262
* Writes a priming event to establish resumption capability.
263-
* This primes the client's Last-Event-ID for reconnection.
263+
* Only sends if eventStore is configured (opt-in for resumability).
264264
*/
265-
private async _writePrimingEvent(res: ServerResponse, streamId: string): Promise<void> {
266-
const primingEventId = this._eventStore ? await this._eventStore.storeEvent(streamId, {} as JSONRPCMessage) : randomUUID();
265+
private async _maybeWritePrimingEvent(res: ServerResponse, streamId: string): Promise<void> {
266+
if (!this._eventStore) {
267+
return;
268+
}
269+
270+
const primingEventId = await this._eventStore.storeEvent(streamId, {} as JSONRPCMessage);
267271

268272
let primingEvent = `id: ${primingEventId}\ndata: \n\n`;
269273
if (this._retryInterval !== undefined) {
@@ -343,7 +347,7 @@ export class StreamableHTTPServerTransport implements Transport {
343347
// otherwise the client will just wait for the first message
344348
res.writeHead(200, headers).flushHeaders();
345349

346-
await this._writePrimingEvent(res, this._standaloneSseStreamId);
350+
await this._maybeWritePrimingEvent(res, this._standaloneSseStreamId);
347351

348352
// Assign the response to the standalone SSE stream
349353
this._streamMapping.set(this._standaloneSseStreamId, res);
@@ -573,7 +577,7 @@ export class StreamableHTTPServerTransport implements Transport {
573577

574578
res.writeHead(200, headers);
575579

576-
await this._writePrimingEvent(res, streamId);
580+
await this._maybeWritePrimingEvent(res, streamId);
577581
}
578582
// Store the response for this request to send messages back through this connection
579583
// We need to track by request ID to maintain the connection

0 commit comments

Comments
 (0)