2
2
import * as API from '../api.js'
3
3
import { DigestMap } from '@web3-storage/blob-index'
4
4
import defer from 'p-defer'
5
- import { MultipartByteRangeDecoder , getBoundary } from 'multipart-byte-range/decoder'
6
5
import { NetworkError , NotFoundError } from '../lib.js'
7
6
import { fetchBlob } from './simple.js'
8
7
import { resolveRange } from './lib.js'
8
+ import { withAsyncGeneratorSpan , withResultSpan } from '../tracing/tracing.js'
9
+ import { MultipartByteRangeDecoder , getBoundary } from 'multipart-byte-range'
9
10
10
11
/**
11
12
* @typedef {'*'|`${number},${number}`|`${number}` } RangeKey
@@ -18,6 +19,7 @@ const MAX_BATCH_SIZE = 16
18
19
/** @implements {API.Fetcher} */
19
20
class BatchingFetcher {
20
21
#locator
22
+ #fetch
21
23
22
24
/** @type {DigestMap<API.MultihashDigest, RangedRequests> } */
23
25
#pendingReqs = new DigestMap ( )
@@ -30,9 +32,13 @@ class BatchingFetcher {
30
32
/** @type {Promise<void>|null } */
31
33
#processing = null
32
34
33
- /** @param {API.Locator } locator */
34
- constructor ( locator ) {
35
+ /**
36
+ * @param {API.Locator } locator
37
+ * @param {typeof globalThis.fetch } [fetch]
38
+ */
39
+ constructor ( locator , fetch = globalThis . fetch . bind ( globalThis ) ) {
35
40
this . #locator = locator
41
+ this . #fetch = fetch
36
42
}
37
43
38
44
#scheduleBatchProcessing ( ) {
@@ -66,6 +72,14 @@ class BatchingFetcher {
66
72
const pendingReqs = this . #pendingReqs
67
73
this . #pendingReqs = new DigestMap ( )
68
74
75
+ // Basic algorithm
76
+ // 1. assemble each http request
77
+ // 2. fire off request
78
+ // 3. once first byte received, begin processing the response async in background
79
+ // 4. immediately go to next http request, but after first iteration, wait so that we're never processing the body
80
+ // of more than one response at a time
81
+ /** @type {Promise<API.Result<true, API.NotFound|API.Aborted|API.NetworkError>> | undefined } */
82
+ let lastResolveBlobs
69
83
while ( true ) {
70
84
const first = queue . shift ( )
71
85
if ( ! first ) break
@@ -84,16 +98,25 @@ class BatchingFetcher {
84
98
if ( locs . length >= MAX_BATCH_SIZE ) break
85
99
}
86
100
87
- const res = await fetchBlobs ( siteURL , locs )
88
- if ( res . error ) break
89
- for ( const [ i , blob ] of res . ok . entries ( ) ) {
90
- const rangeReqs = pendingReqs . get ( blob . digest )
91
- const key = rangeKey ( locs [ i ] . range )
92
- const reqs = rangeReqs ?. get ( key )
93
- reqs ?. [ 0 ] . resolve ( { ok : blob } )
94
- reqs ?. slice ( 1 ) . forEach ( r => r . resolve ( { ok : blob . clone ( ) } ) )
95
- rangeReqs ?. delete ( key )
101
+ const fetchRes = await fetchBlobs ( siteURL , locs , this . #fetch)
102
+ // if we have an error, stop
103
+ if ( fetchRes . error ) {
104
+ break
105
+ }
106
+ // if we are still processing the previous response, we should wait before we process this response
107
+ if ( lastResolveBlobs !== undefined ) {
108
+ const resolveRes = await lastResolveBlobs
109
+ lastResolveBlobs = undefined
110
+ if ( resolveRes . error ) {
111
+ break
112
+ }
96
113
}
114
+ lastResolveBlobs = resolveRequests ( fetchRes . ok , pendingReqs )
115
+ }
116
+
117
+ // await the last call to resolve blobs
118
+ if ( lastResolveBlobs !== undefined ) {
119
+ await lastResolveBlobs
97
120
}
98
121
99
122
// resolve `undefined` for any remaining requests
@@ -135,83 +158,143 @@ class BatchingFetcher {
135
158
/**
136
159
* Create a new batching blob fetcher.
137
160
* @param {API.Locator } locator
161
+ * @param {typeof globalThis.fetch } [fetch]
138
162
* @returns {API.Fetcher }
139
163
*/
140
- export const create = ( locator ) => new BatchingFetcher ( locator )
164
+ export const create = ( locator , fetch = globalThis . fetch . bind ( globalThis ) ) => new BatchingFetcher ( locator , fetch )
165
+
166
+ /** @typedef {{range: API.AbsoluteRange, digest: API.MultihashDigest, orig: API.Range | undefined} } ResolvedBlobs */
141
167
142
168
/**
143
169
* Fetch blobs from the passed locations. The locations MUST share a common
144
170
* site to fetch from.
145
- *
171
+ */
172
+ export const fetchBlobs = withResultSpan ( 'fetchBlobs' , _fetchBlobs )
173
+
174
+ /**
146
175
* @param {URL } url Desired URL to fetch blobs from.
147
176
* @param {Array<{ location: API.Location, range?: API.Range }> } locations
148
- * @returns {Promise<API.Result<API.Blob[], API.NotFound|API.Aborted|API.NetworkError>> }
177
+ * @param {typeof globalThis.fetch } [fetch]
178
+ * @returns {Promise<API.Result<AsyncGenerator<BlobResult, API.Result<true, API.NotFound|API.Aborted|API.NetworkError>>, API.NotFound|API.Aborted|API.NetworkError>> }
149
179
*/
150
- export const fetchBlobs = async ( url , locations ) => {
180
+ async function _fetchBlobs ( url , locations , fetch = globalThis . fetch . bind ( globalThis ) ) {
151
181
if ( locations . length === 1 ) {
152
- const res = await fetchBlob ( locations [ 0 ] . location , locations [ 0 ] . range )
182
+ const res = await fetchBlob ( locations [ 0 ] . location , locations [ 0 ] . range , fetch )
153
183
if ( res . error ) return res
154
- return { ok : [ res . ok ] }
184
+ return {
185
+ ok : ( async function * ( ) {
186
+ yield { blob : res . ok , range : locations [ 0 ] . range }
187
+ return { ok : true }
188
+ } ( ) )
189
+ }
155
190
}
156
191
157
- const ranges = [ ]
192
+ // resolve ranges for blobs
193
+
194
+ /** @type {ResolvedBlobs[] } */
195
+ const resolvedBlobs = [ ]
158
196
for ( const { location, range } of locations ) {
159
197
for ( const s of location . site ) {
160
198
let found = false
161
199
for ( const l of s . location ) {
162
200
if ( l . toString ( ) === url . toString ( ) ) {
163
- /** @type {import('multipart-byte-range') .AbsoluteRange } */
201
+ /** @type {API .AbsoluteRange } */
164
202
let resolvedRange = [ s . range . offset , s . range . offset + s . range . length - 1 ]
165
203
if ( range ) {
166
204
const relRange = resolveRange ( range , s . range . length )
167
205
resolvedRange = [ s . range . offset + relRange [ 0 ] , s . range . offset + relRange [ 1 ] ]
168
206
}
169
- ranges . push ( resolvedRange )
207
+ resolvedBlobs . push ( {
208
+ digest : location . digest ,
209
+ range : resolvedRange ,
210
+ orig : range
211
+ } )
170
212
found = true
171
213
break
172
214
}
173
215
}
174
216
if ( found ) break
175
217
}
176
218
}
177
- if ( ranges . length !== locations . length ) {
219
+ if ( resolvedBlobs . length !== locations . length ) {
178
220
throw new Error ( 'no common site' )
179
221
}
180
222
181
- const headers = { Range : `bytes=${ ranges . map ( r => `${ r [ 0 ] } -${ r [ 1 ] } ` ) . join ( ',' ) } ` }
223
+ const headers = { Range : `bytes=${ resolvedBlobs . map ( r => `${ r . range [ 0 ] } -${ r . range [ 1 ] } ` ) . join ( ',' ) } ` }
182
224
try {
183
225
const res = await fetch ( url , { headers } )
184
226
if ( ! res . ok ) {
185
227
return { error : new NetworkError ( url , { cause : new Error ( `unexpected HTTP status: ${ res . status } ` ) } ) }
186
228
}
229
+ return { ok : consumeBatchResponse ( url , resolvedBlobs , res ) }
230
+ } catch ( err ) {
231
+ return { error : new NetworkError ( url , { cause : err } ) }
232
+ }
233
+ }
187
234
188
- if ( ! res . body ) {
189
- return { error : new NetworkError ( url , { cause : new Error ( 'missing repsonse body' ) } ) }
190
- }
235
+ /** @typedef {{blob: API.Blob, range: API.Range | undefined} } BlobResult */
191
236
192
- const boundary = getBoundary ( res . headers )
193
- if ( ! boundary ) {
194
- return { error : new NetworkError ( url , { cause : new Error ( 'missing multipart boundary' ) } ) }
195
- }
237
+ /**
238
+ * Consumes a batch request to create multiple blobs. Will break up
239
+ * a byte range going from first byte byte of first blob to last byte of last blob
240
+ * into appropriate ranges for each blob
241
+ */
242
+ const consumeBatchResponse = withAsyncGeneratorSpan ( 'consumeBatchResponse' , _consumeBatchResponse )
196
243
197
- /** @type {API.Blob[] } */
198
- const blobs = [ ]
199
- let i = 0
200
- await res . body
201
- . pipeThrough ( new MultipartByteRangeDecoder ( boundary ) )
202
- . pipeTo ( new WritableStream ( {
203
- write ( part ) {
204
- blobs . push ( new Blob ( locations [ i ] . location . digest , part . content ) )
205
- i ++
206
- }
207
- } ) )
244
+ /**
245
+ * @param {URL } url
246
+ * @param {ResolvedBlobs[] } resolvedBlobs
247
+ * @param {Response } res
248
+ * @returns {AsyncGenerator<BlobResult, API.Result<true, API.NotFound|API.Aborted|API.NetworkError>> }
249
+ */
250
+ async function * _consumeBatchResponse ( url , resolvedBlobs , res ) {
251
+ if ( ! res . body ) {
252
+ return { error : new NetworkError ( url , { cause : new Error ( 'missing repsonse body' ) } ) }
253
+ }
254
+
255
+ const boundary = getBoundary ( res . headers )
256
+ if ( ! boundary ) {
257
+ return { error : new NetworkError ( url , { cause : new Error ( 'missing multipart boundary' ) } ) }
258
+ }
259
+
260
+ let i = 0
208
261
209
- return { ok : blobs }
262
+ try {
263
+ for await ( const chunk of res . body . pipeThrough ( new MultipartByteRangeDecoder ( boundary ) ) ) {
264
+ // generate blob out of the current buffer
265
+ const blob = new Blob ( resolvedBlobs [ i ] . digest , chunk . content )
266
+ yield ( { blob, range : resolvedBlobs [ i ] . orig } )
267
+ i ++
268
+ }
269
+ return { ok : true }
210
270
} catch ( err ) {
211
271
return { error : new NetworkError ( url , { cause : err } ) }
212
272
}
213
273
}
214
274
275
+ /**
276
+ * Resolve pending requests from blobs generated out of the last fetch
277
+ *
278
+ * @param {AsyncGenerator<BlobResult, API.Result<true, API.NotFound|API.Aborted|API.NetworkError>> } blobResults
279
+ * @param {DigestMap<API.MultihashDigest, RangedRequests> } pendingReqs
280
+ * @returns {Promise<API.Result<true, API.NotFound|API.Aborted|API.NetworkError>> }
281
+ */
282
+ const resolveRequests = async ( blobResults , pendingReqs ) => {
283
+ for ( ; ; ) {
284
+ const { value : result , done } = await blobResults . next ( )
285
+ if ( done ) {
286
+ return result
287
+ }
288
+ const { blob, range } = result
289
+ const rangeReqs = pendingReqs . get ( blob . digest )
290
+ const key = rangeKey ( range )
291
+ const reqs = rangeReqs ?. get ( key )
292
+ reqs ?. [ 0 ] . resolve ( { ok : blob } )
293
+ reqs ?. slice ( 1 ) . forEach ( r => r . resolve ( { ok : blob . clone ( ) } ) )
294
+ rangeReqs ?. delete ( key )
295
+ }
296
+ }
297
+
215
298
/** @implements {API.Blob} */
216
299
class Blob {
217
300
#digest
0 commit comments