generated from digitalbazaar/bedrock-module-template-http
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathConfigStorage.js
328 lines (288 loc) · 10.3 KB
/
ConfigStorage.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
/*!
* Copyright (c) 2021-2025 Digital Bazaar, Inc. All rights reserved.
*/
import * as bedrock from '@bedrock/core';
import * as brZCapStorage from '@bedrock/zcap-storage';
import * as database from '@bedrock/mongodb';
import assert from 'assert-plus';
import {LruCache} from '@digitalbazaar/lru-memoize';
import {verifyRequestIp} from '../helpers.js';
const {util: {BedrockError}} = bedrock;
const USAGE_COUNTER_MAX_CONCURRENCY = 100;
export class ConfigStorage {
/**
* Creates a new ConfigStorage API for a particular service type.
*
* @param {object} options - The options to use.
* @param {string} options.serviceType - The type of service.
* @param {object} options.storageCost - The storage cost config.
* @param {object} options.cacheConfig - The cache config.
*
* @returns {Promise<ConfigStorage>} A `ConfigStorage` instance.
*/
constructor({serviceType, storageCost, cacheConfig} = {}) {
assert.string(serviceType, 'serviceType');
assert.object(storageCost, 'storageCost');
assert.optionalObject(cacheConfig, 'cacheConfig');
if(!cacheConfig) {
cacheConfig = bedrock.config['service-core'].cacheDefaults;
}
const collectionName = `service-core-config-${serviceType}`;
this.collectionName = collectionName;
this.storageCost = storageCost;
this.cache = new LruCache(cacheConfig);
_createStorageInitializer({collectionName});
}
/**
* Establishes a new service object by inserting its configuration
* into storage.
*
* @param {object} options - The options to use.
* @param {object} options.config - The configuration.
*
* @returns {Promise<object>} The database record.
*/
async insert({config} = {}) {
assert.object(config, 'config');
assert.string(config.id, 'config.id');
assert.string(config.controller, 'config.controller');
assert.string(config.meterId, 'config.meterId');
// require starting sequence to be 0
if(config.sequence !== 0) {
throw new BedrockError(
'Configuration sequence must be "0".',
'DataError', {
public: true,
httpStatusCode: 400
});
}
// insert the configuration and return the updated record
const now = Date.now();
const meta = {created: now, updated: now};
const record = {meta, config};
try {
const collection = this._getCollection();
await collection.insertOne(record);
return record;
} catch(e) {
if(!database.isDuplicateError(e)) {
throw e;
}
throw new BedrockError(
'Duplicate configuration.',
'DuplicateError', {
public: true,
httpStatusCode: 409
}, e);
}
}
/**
* Retrieves all service object configs matching the given query.
*
* @param {object} options - The options to use.
* @param {string} options.controller - The controller for the configs to
* retrieve.
* @param {object} options.req - A request to check against an IP allow list.
* @param {object} [options.query={}] - The optional query to use.
* @param {object} [options.options={}] - Query options (eg: 'sort', 'limit').
* @param {boolean} [options.explain=false] - An optional explain boolean.
*
* @returns {Promise<Array> | ExplainObject} The records that matched the
* query or an ExplainObject if `explain=true`.
*/
async find({
controller, req, query = {}, options = {}, explain = false
} = {}) {
// force controller ID
query['config.controller'] = controller;
const collection = this._getCollection();
const cursor = await collection.find(query, options);
if(explain) {
return cursor.explain('executionStats');
}
const records = await cursor.toArray();
if(!req) {
return records;
}
// since `req` is given, omit any records for which the IP is not allowed
return records.filter(({config}) => {
const {verified} = verifyRequestIp({config, req});
return verified;
});
}
/**
* Updates a service object config if its sequence number is next.
*
* @param {object} options - The options to use.
* @param {object} options.config - The configuration.
* @param {boolean} [options.explain=false] - An optional explain boolean.
*
* @returns {Promise<object> | ExplainObject} The database record or an
* ExplainObject if `explain=true`.
*/
async update({config, explain = false} = {}) {
assert.object(config, 'config');
assert.string(config.id, 'config.id');
assert.number(config.sequence, config.sequence);
assert.string(config.controller, 'config.controller');
// insert the configuration and get the updated record
const now = Date.now();
const collection = this._getCollection();
const query = {
'config.id': config.id,
'config.sequence': config.sequence - 1
};
if(explain) {
// 'find().limit(1)' is used here because 'updateOne()' doesn't return a
// cursor which allows the use of the explain function
const cursor = await collection.find(query).limit(1);
return cursor.explain('executionStats');
}
const result = await collection.updateOne(
query, {$set: {config, 'meta.updated': now}});
if(result.result.n === 0) {
// no records changed...
throw new BedrockError(
'Could not update configuration. ' +
'Record sequence does not match or configuration does not exist.',
'InvalidStateError', {httpStatusCode: 409, public: true});
}
// clear cache
this.cache.delete(config.id);
return true;
}
/**
* Gets a service object configuration. If a request (`req`) is passed,
* an IP allow list check will be performed.
*
* @param {object} options - The options to use.
* @param {string} options.id - The ID of the service object.
* @param {object} options.req - A request to check against an IP allow list.
* @param {boolean} [options.explain=false] - An optional explain boolean.
*
* @returns {Promise<object> | ExplainObject} The data base record or an
* ExplainObject if `explain=true`.
*/
async get({id, req, explain = false} = {}) {
assert.string(id, 'id');
// skip cache if `explain=true`
if(explain) {
return this._getUncachedRecord({id, explain});
}
const fn = () => this._getUncachedRecord({id});
const record = await this.cache.memoize({key: id, fn});
if(req) {
// verify that request is from an IP that is allowed to access the config
const {verified} = verifyRequestIp({config: record.config, req});
if(!verified) {
throw new BedrockError(
'Permission denied. Source IP is not allowed.', 'NotAllowedError', {
httpStatusCode: 403,
public: true,
});
}
}
return record;
}
/**
* Gets storage statistics for the given meter. This includes the total
* number of service objects associated with a meter ID, represented as
* storage units according to the service's `storageCost` configuration.
*
* @param {object} options - The options to use.
* @param {string} options.meterId - The ID of the meter to get.
* @param {AbortSignal} [options.signal] - An abort signal to check.
*
* @returns {Promise<object>} The storage usage for the meter.
*/
async getUsage({meterId, signal} = {}) {
// find all configs with the given meter ID
const cursor = await this._getCollection().find(
{'config.meterId': meterId},
{projection: {_id: 0, config: 1}});
const {storageCost} = this;
const usage = {storage: 0};
let counters = [];
while(await cursor.hasNext()) {
// get next service object config
const {config} = await cursor.next();
// add storage units for service object config
usage.storage += storageCost.config;
// add zcap revocation storage
counters.push(_addRevocationUsage({id: config.id, storageCost, usage}));
// await counters if max concurrency reached
if(counters.length === USAGE_COUNTER_MAX_CONCURRENCY) {
await Promise.all(counters);
counters = [];
}
if(signal && signal.abort) {
throw new BedrockError(
'Computing metered storage aborted.',
'AbortError',
{meterId, httpStatusCode: 503, public: true});
}
}
// await any counters that didn't complete
await Promise.all(counters);
return usage;
}
_getCollection() {
return database.collections[this.collectionName];
}
async _getUncachedRecord({id, explain = false}) {
const collection = this._getCollection();
const query = {'config.id': id};
const projection = {_id: 0, config: 1, meta: 1};
if(explain) {
// 'find().limit(1)' is used here because 'updateOne()' doesn't return a
// cursor which allows the use of the explain function
const cursor = await collection.find(query, {projection}).limit(1);
return cursor.explain('executionStats');
}
const record = await collection.findOne(query, {projection});
if(!record) {
throw new BedrockError(
'Configuration not found.',
'NotFoundError',
{configId: id, httpStatusCode: 404, public: true});
}
return record;
}
}
async function _addRevocationUsage({id, storageCost, usage}) {
// add storage units for revocations associated with the config
const {count} = await brZCapStorage.revocations.count(
{rootTarget: id});
usage.storage += count * storageCost.revocation;
}
function _createStorageInitializer({collectionName} = {}) {
bedrock.events.on('bedrock-mongodb.ready', async () => {
await database.openCollections([collectionName]);
await database.createIndexes([{
// cover queries config by ID
collection: collectionName,
fields: {'config.id': 1},
options: {unique: true, background: false}
}, {
// cover config queries by controller
collection: collectionName,
fields: {'config.controller': 1},
options: {unique: false, background: false}
}, {
// cover counting configs in use by meter ID, if present
collection: collectionName,
fields: {'config.meterId': 1},
options: {
partialFilterExpression: {
'config.meterId': {$exists: true}
},
unique: false, background: false
}
}]);
});
}
/**
* An object containing information on the query plan.
*
* @typedef {object} ExplainObject
*/