-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathflushed.ts
35 lines (32 loc) · 1.32 KB
/
flushed.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
export { pipe as combine } from 'https://cdn.skypack.dev/[email protected]?dts';
import { ResolvablePromise } from 'https://ghuc.cc/worker-tools/resolvable-promise/index.ts'
import type { Awaitable } from "./utils/common-types.ts";
import type { Context } from "./index.ts";
class FlushCallbackStream<T> extends TransformStream<T, T> {
constructor(flushCallback: () => void) {
super({ flush() { flushCallback() } })
}
}
export interface FlushedContext {
/**
* A promise that resolves when the entire response body has been written to the wire,
* or if the stream has been closed for any other reason.
* Most likely useful when combined with streaming responses.
*/
flushed: Promise<Response>
}
export const flushed = () => async <X extends Context>(ax: Awaitable<X>): Promise<X & FlushedContext> => {
const x = await ax;
const flush = new ResolvablePromise<Response>()
const flushed = Promise.resolve(flush)
x.effects.push(res => {
const ref: { res?: Response } = {}
const cb = () => flush.resolve(ref.res!)
const { status, statusText, headers, body } = res;
ref.res = new Response(body != null
? body.pipeThrough(new FlushCallbackStream(cb))
: (x.handled.then(cb), null), { status, statusText, headers })
return ref.res;
})
return Object.assign(x, { flushed })
}