diff --git a/src/steps/sse.ts b/src/steps/sse.ts index fc3587e..09e141c 100644 --- a/src/steps/sse.ts +++ b/src/steps/sse.ts @@ -60,6 +60,11 @@ export default async function ( ) { const stepResult: StepRunResult = { type: 'sse', + request : { + url: params.url, + headers: params.headers, + size: 0, + } } const ssw = new co2() @@ -81,18 +86,16 @@ export default async function ( }) const messages: MessageEvent[] = [] + const expectedMessages: Set | undefined = params.check?.messages + ? new Set(params.check?.messages?.map((m) => m.id)) + : undefined - const timeout = setTimeout(() => { + // Closes the `EventSource` and exits as "passed" + const end = () => { ev.close() const messagesBuffer = Buffer.from(messages.map((m) => m.data).join('\n')) - stepResult.request = { - url: params.url, - headers: params.headers, - size: 0, - } - stepResult.response = { contentType: 'text/event-stream', body: messagesBuffer, @@ -103,12 +106,35 @@ export default async function ( } resolve(true) + } + + const timeout = setTimeout(() => { + console.debug('SSE timed out') + end() }, params.timeout || 10000) ev.onerror = (error) => { clearTimeout(timeout) + + let message: string + if (ev.readyState === EventSource.CLOSED) { + // SSE stream closed gracefully + return end() + } else if (ev.readyState === EventSource.CONNECTING) { + // SSE stream closed by the server + if (expectedMessages === undefined) { + message = 'The SSE stream was closed by the server. If this is expected behavior, please use [`tests..steps.[step].sse.check.messages`](https://docs.stepci.com/reference/workflow-syntax.html#tests-test-steps-step-sse-check-messages-message).' + } else { + message = `The SSE stream was closed by the server before all expected messages were received. Missing IDs: ${JSON.stringify([...expectedMessages], null, 2)}` + } + } else { + // SSE stream is still open (`ev.readyState === EventSource.OPEN`) + // but received an "error" event from the server + message = `The SSE stream received an error event from the server: ${JSON.stringify(error, null, 2)}` + } + ev.close() - reject(error) + reject({ ...error, message }) } if (params.check) { @@ -129,6 +155,9 @@ export default async function ( if (params.check) { params.check.messages?.forEach((check, id) => { + // Don't run check if it's not intended for this message + if (check.id !== message.lastEventId) return + if (check.body) { const result = checkResult(message.data, check.body) if (result.passed && stepResult.checks?.messages) @@ -178,7 +207,7 @@ export default async function ( .map((c: CheckResult) => c.passed) .every((passed) => passed) - if (passed && stepResult.checks?.messages) + if (stepResult.checks?.messages) (stepResult.checks.messages as CheckResults)[check.id] = { expected: check.jsonpath, given: jsonpathResult, @@ -190,6 +219,15 @@ export default async function ( } }) } + + // Mark message as received + expectedMessages?.delete(message.lastEventId) + // If all expected messages have been received, close connection and return as "passed" + if (expectedMessages?.size === 0) { + // console.debug('All expected messages received, closing connection…') + clearTimeout(timeout) + end() + } } })