-
Notifications
You must be signed in to change notification settings - Fork 156
/
Copy pathfetch.ts
154 lines (134 loc) · 5.79 KB
/
fetch.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import { EventSourceMessage, getBytes, getLines, getMessages } from './parse';
export const EventStreamContentType = 'text/event-stream';
const DefaultRetryInterval = 1000;
const LastEventId = 'last-event-id';
export interface FetchEventSourceInit extends RequestInit {
/**
* The request headers. FetchEventSource only supports the Record<string,string> format.
*/
headers?: Record<string, string>,
/**
* Called when a response is received. Use this to validate that the response
* actually matches what you expect (and throw if it doesn't.) If not provided,
* will default to a basic validation to ensure the content-type is text/event-stream.
*/
onopen?: (response: Response) => void | Promise<void>;
/**
* Called when a message is received. NOTE: Unlike the default browser
* EventSource.onmessage, this callback is called for _all_ events,
* even ones with a custom `event` field.
*/
onmessage?: (ev: EventSourceMessage) => void | Promise<void>;
/**
* Called when a response finishes. If you don't expect the server to kill
* the connection, you can throw an exception here and retry using onerror.
*/
onclose?: () => void | Promise<void>;
/**
* Called when there is any error making the request / processing messages /
* handling callbacks etc. Use this to control the retry strategy: if the
* error is fatal, rethrow the error inside the callback to stop the entire
* operation. Otherwise, you can return an interval (in milliseconds) after
* which the request will automatically retry (with the last-event-id).
* If this callback is not specified, or it returns undefined, fetchEventSource
* will treat every error as retriable and will try again after 1 second.
*/
onerror?: (err: any) => number | null | undefined | void,
/**
* If true, will keep the request open even if the document is hidden.
* By default, fetchEventSource will close the request and reopen it
* automatically when the document becomes visible again.
*/
openWhenHidden?: boolean;
/** The Fetch function to use. Defaults to window.fetch */
fetch?: typeof fetch;
}
export function fetchEventSource(input: RequestInfo, {
signal: inputSignal,
headers: inputHeaders,
onopen: inputOnOpen,
onmessage,
onclose,
onerror,
openWhenHidden,
fetch: inputFetch,
...rest
}: FetchEventSourceInit) {
return new Promise<void>((resolve, reject) => {
// make a copy of the input headers since we may modify it below:
const headers = { ...inputHeaders };
if (!headers.accept) {
headers.accept = EventStreamContentType;
}
let curRequestController: AbortController;
function onVisibilityChange() {
curRequestController.abort(); // close existing request on every visibility change
if (!document.hidden) {
create(); // page is now visible again, recreate request.
}
}
if (!openWhenHidden) {
document.addEventListener('visibilitychange', onVisibilityChange);
}
let retryInterval = DefaultRetryInterval;
let retryTimer = 0;
function dispose() {
document.removeEventListener('visibilitychange', onVisibilityChange);
window.clearTimeout(retryTimer);
curRequestController.abort();
}
// if the incoming signal aborts, dispose resources and resolve:
inputSignal?.addEventListener('abort', () => {
dispose();
resolve(); // don't waste time constructing/logging errors
});
const fetch = inputFetch ?? window.fetch;
const onopen = inputOnOpen ?? defaultOnOpen;
async function create() {
curRequestController = new AbortController();
try {
const response = await fetch(input, {
...rest,
headers,
signal: curRequestController.signal,
});
await onopen(response);
await getBytes(response.body!, getLines(getMessages(id => {
if (id) {
// store the id and send it back on the next retry:
headers[LastEventId] = id;
} else {
// don't send the last-event-id header anymore:
delete headers[LastEventId];
}
}, retry => {
retryInterval = retry;
}, onmessage)));
onclose?.();
dispose();
resolve();
} catch (err) {
if (!curRequestController.signal.aborted) {
// if we haven't aborted the request ourselves:
try {
// check if we need to retry:
const interval: any = onerror?.(err) ?? retryInterval;
window.clearTimeout(retryTimer);
retryTimer = window.setTimeout(create, interval);
} catch (innerErr) {
// we should not retry anymore:
dispose();
reject(innerErr);
}
}
}
}
create();
});
}
function defaultOnOpen(response: Response) {
const contentType = response.headers.get('content-type');
if (!contentType?.startsWith(EventStreamContentType)) {
throw new Error(`Expected content-type to be ${EventStreamContentType}, Actual: ${contentType}`);
}
}