Skip to content

Commit

Permalink
feat: sync with built-in sink from eik/core
Browse files Browse the repository at this point in the history
  • Loading branch information
wkillerud committed Jul 29, 2024
1 parent 8b62834 commit 8828705
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 59 deletions.
33 changes: 33 additions & 0 deletions lib/entry.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import crypto from 'node:crypto';

export default class Entry {
constructor({ mimeType = 'application/octet-stream', payload = [] } = {}) {
this._mimeType = mimeType;
this._payload = payload;
this._hash = '';

if (Array.isArray(payload)) {
const hash = crypto.createHash('sha512');
payload.forEach((buffer) => {
hash.update(buffer.toString());
});
this._hash = `sha512-${hash.digest('base64')}`;
}
}

get mimeType() {
return this._mimeType;
}

get payload() {
return this._payload;
}

get hash() {
return this._hash;
}

get [Symbol.toStringTag]() {
return 'Entry';
}
}
179 changes: 142 additions & 37 deletions lib/main.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,41 @@
import { join } from 'node:path';
import { Readable, Writable } from 'node:stream';
import { ReadFile } from '@eik/common';
import Sink from '@eik/sink';
import MetricsClient from '@metrics/client';
import { Readable, Writable } from 'node:stream';
import Entry from './entry.js';

/** @type {Map<string, string>} */
let content = new Map();
/** @type {Map<string, string>} */
let mimetypes = new Map();
/**
* @typedef {object} SinkMemoryOptions
* @property {string} [rootPath="/"]
*/

export default class SinkMemory extends Sink {
#metrics = new MetricsClient();
_metrics = new MetricsClient();
/** @type {Map<string, Entry>}*/
_state = new Map();

/**
*
* @param {SinkMemoryOptions} options
*/
constructor(options = {}) {
super();
this._rootPath = options.rootPath || '/';
this._counter = this._metrics.counter({
name: 'eik_core_sink_mem',
description:
'Counter measuring access to the in memory storage sink',
labels: {
operation: 'n/a',
success: false,
access: false,
},
});
}

get metrics() {
return this.#metrics;
return this._metrics;
}

/**
Expand All @@ -22,34 +45,51 @@ export default class SinkMemory extends Sink {
*/
write(filePath, contentType) {
return new Promise((resolve, reject) => {
const operation = 'write';

try {
Sink.validateFilePath(filePath);
Sink.validateContentType(contentType);
} catch (error) {
this._counter.inc({ labels: { operation } });
reject(error);
return;
}

if (!content.has(filePath)) {
content.set(filePath, '');
mimetypes.set(filePath, contentType);
const pathname = join(this._rootPath, filePath);

if (pathname.indexOf(this._rootPath) !== 0) {
reject(new Error(`Directory traversal - ${filePath}`));
this._counter.inc({ labels: { operation } });
return;
}

resolve(
new Writable({
write(chunk, encoding, callback) {
try {
content.set(
filePath,
content.get(filePath) + chunk.toString(),
);
callback();
} catch (e) {
callback(e);
}
const payload = [];
const stream = new Writable({
write(chunk, encoding, cb) {
payload.push(chunk);
cb();
},
});

stream.on('finish', () => {
const entry = new Entry({
mimeType: contentType,
payload,
});

this._state.set(pathname, entry);

this._counter.inc({
labels: {
success: true,
access: true,
operation,
},
}),
);
});
});

resolve(stream);
});
}

Expand All @@ -60,30 +100,55 @@ export default class SinkMemory extends Sink {
*/
read(filePath) {
return new Promise((resolve, reject) => {
const operation = 'read';

try {
Sink.validateFilePath(filePath);
} catch (error) {
this._counter.inc({ labels: { operation } });
reject(error);
return;
}

if (!content.has(filePath)) {
const pathname = join(this._rootPath, filePath);

if (pathname.indexOf(this._rootPath) !== 0) {
this._counter.inc({ labels: { operation } });
reject(new Error(`Directory traversal - ${filePath}`));
return;
}

const entry = this._state.get(pathname);
if (!entry) {
reject(new Error(`${filePath} does not exist`));
}

let stream = new Readable({
const payload = entry.payload || [];
const obj = new ReadFile({
mimeType: entry.mimeType,
etag: entry.hash,
});

obj.stream = new Readable({
read() {
this.push(content.get(filePath));
payload.forEach((item) => {
this.push(item);
});
this.push(null);
},
});

const file = new ReadFile({
mimeType: mimetypes.get(filePath),
obj.stream.on('end', () => {
this._counter.inc({
labels: {
success: true,
access: true,
operation,
},
});
});
file.stream = stream;

resolve(file);
resolve(obj);
});
}

Expand All @@ -94,18 +159,38 @@ export default class SinkMemory extends Sink {
*/
delete(filePath) {
return new Promise((resolve, reject) => {
const operation = 'delete';

try {
Sink.validateFilePath(filePath);
} catch (error) {
this._counter.inc({ labels: { operation } });
reject(error);
return;
}

if (!content.has(filePath)) {
reject(new Error(`${filePath} does not exist`));
const pathname = join(this._rootPath, filePath);

if (pathname.indexOf(this._rootPath) !== 0) {
this._counter.inc({ labels: { operation } });
reject(new Error(`Directory traversal - ${filePath}`));
return;
}

content.delete(filePath);
// Delete recursively
Array.from(this._state.keys()).forEach((key) => {
if (key.startsWith(pathname)) {
this._state.delete(key);
}
});

this._counter.inc({
labels: {
success: true,
access: true,
operation,
},
});

resolve();
});
Expand All @@ -118,18 +203,38 @@ export default class SinkMemory extends Sink {
*/
exist(filePath) {
return new Promise((resolve, reject) => {
const operation = 'exist';

try {
Sink.validateFilePath(filePath);
} catch (error) {
this._counter.inc({ labels: { operation } });
reject(error);
return;
}

if (!content.has(filePath)) {
reject(new Error(`${filePath} does not exist`));
const pathname = join(this._rootPath, filePath);

if (pathname.indexOf(this._rootPath) !== 0) {
this._counter.inc({ labels: { operation } });
reject(new Error(`Directory traversal - ${filePath}`));
return;
}

resolve();
this._counter.inc({
labels: {
success: true,
access: true,
operation,
},
});

if (this._state.has(pathname)) {
resolve();
return;
}

reject(new Error(`${filePath} does not exist`));
});
}
}
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
"dependencies": {
"@eik/common": "3.0.1",
"@eik/sink": "1.2.5",
"@metrics/client": "2.5.2"
"@metrics/client": "2.5.3"
},
"devDependencies": {
"@semantic-release/changelog": "6.0.3",
Expand Down
31 changes: 10 additions & 21 deletions tests/main.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ tap.test('Sink() - .read() - File exists', async (t) => {
const type = 'text/plain';
const writable = await sink.write(path, type);
const readable = Readable.from(['Hello, World!']);
t.resolves(pipeline(readable, writable));
await t.resolves(pipeline(readable, writable));

const file = await sink.read(path);
t.equal(file.mimeType, type);
Expand All @@ -40,32 +40,21 @@ tap.test('Sink() - .read() - File exists', async (t) => {
t.end();
});

tap.test('Sink() - .read() - File does not exist', (t) => {
tap.test('Sink() - .read() - File does not exist', async (t) => {
const sink = new Sink();
t.rejects(sink.read('/does/not/exist.txt'));
await t.rejects(sink.read('/does/not/exist.txt'));
t.end();
});

tap.test('Sink() - .delete() - File exists', async (t) => {
tap.test('Sink() - .delete()', async (t) => {
const sink = new Sink();
const path = '/mem/foo/bar.txt';
const writable = await sink.write(path, 'text/plain');
const readable = Readable.from(['Hello, World!']);

t.resolves(pipeline(readable, writable));
t.resolves(sink.delete(path));

t.rejects(
sink.delete(path),
'Second call to delete should reject if file is successfully deleted',
);

t.end();
});
await t.resolves(pipeline(readable, writable));
await t.resolves(sink.delete(path));

tap.test('Sink() - .delete() - File does not exist', (t) => {
const sink = new Sink();
t.rejects(sink.delete('/does/not/exist.txt'));
t.end();
});

Expand All @@ -75,13 +64,13 @@ tap.test('Sink() - .exist() - File exists', async (t) => {
const writable = await sink.write(path, 'text/plain');
const readable = Readable.from(['Hello, World!']);

t.resolves(pipeline(readable, writable));
t.resolves(sink.exist(path));
await t.resolves(pipeline(readable, writable));
await t.resolves(sink.exist(path));
t.end();
});

tap.test('Sink() - .exist() - File does not exist', (t) => {
tap.test('Sink() - .exist() - File does not exist', async (t) => {
const sink = new Sink();
t.rejects(sink.exist('/does/not/exist.txt'));
await t.rejects(sink.exist('/does/not/exist.txt'));
t.end();
});

0 comments on commit 8828705

Please sign in to comment.