Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 101 additions & 98 deletions bucketVersionsStats.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
const fs = require('fs');
const { http, https } = require('httpagent');

const AWS = require('aws-sdk');
const { doWhilst } = require('async');
const { S3Client, ListObjectVersionsCommand } = require('@aws-sdk/client-s3');
const { NodeHttpHandler } = require('@aws-sdk/node-http-handler');
const { ConfiguredRetryStrategy } = require('@smithy/util-retry');

const { Logger } = require('werelogs');

const parseOlderThan = require('./utils/parseOlderThan');
const { safeListObjectVersions } = require('./utils/safeList');

const log = new Logger('s3utils::bucketVersionsStats');
const { ENDPOINT } = process.env;
Expand Down Expand Up @@ -97,38 +97,32 @@
agent = new http.Agent({ keepAlive: true });
}

const options = {
accessKeyId: ACCESS_KEY,
secretAccessKey: SECRET_KEY,
const s3 = new S3Client({
credentials: {
accessKeyId: ACCESS_KEY,
secretAccessKey: SECRET_KEY,
},
endpoint: ENDPOINT,
region: 'us-east-1',
sslEnabled: s3EndpointIsHttps,
s3ForcePathStyle: true,
apiVersions: { s3: '2006-03-01' },
signatureVersion: 'v4',
signatureCache: false,
httpOptions: {
timeout: 0,
agent,
},
};
/**
* Options specific to s3 requests
* `maxRetries` & `customBackoff` are set only to s3 requests
* default aws sdk retry count is 3 with an exponential delay of 2^n * 30 ms
*/
const s3Options = {
maxRetries: AWS_SDK_REQUEST_RETRIES,
customBackoff: (retryCount, error) => {
log.error('aws sdk request error', { error, retryCount });
// retry with exponential backoff delay capped at 1mn max
// between retries, and a little added jitter
return Math.min(AWS_SDK_REQUEST_INITIAL_DELAY_MS
* 2 ** retryCount, 60000)
* (0.9 + Math.random() * 0.2);
},
};
const s3 = new AWS.S3(Object.assign(options, s3Options));
forcePathStyle: true,
tls: s3EndpointIsHttps,
requestHandler: new NodeHttpHandler({
httpAgent: agent,
httpsAgent: agent,
requestTimeout: 60000,
}),
retryStrategy: new ConfiguredRetryStrategy(
AWS_SDK_REQUEST_RETRIES,
// eslint-disable-next-line arrow-body-style
attempt => {
// Custom backoff with exponential delay capped at 1mn max
// between retries, and a little added jitter
return Math.min(AWS_SDK_REQUEST_INITIAL_DELAY_MS
* 2 ** attempt, 60000)
* (0.9 + Math.random() * 0.2);
}
),
});

const stats = {
current: {
Expand All @@ -147,10 +141,17 @@
function _logProgress(message) {
const loggedStats = {
total: {
count: BigInt(stats.current.count + stats.noncurrent.count),
size: BigInt(stats.current.size + stats.noncurrent.size),
count: String(stats.current.count + stats.noncurrent.count),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there was a weird error with the logger when using bigInt

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: not sure if String() function shoud be used (as you did), or if the toString() method should be preferred. I did not find any mention of BigInt support with the String() function, though running in a node.js shell it seems to work just as well... 🤷‍♂️

size: String(stats.current.size + stats.noncurrent.size),
},
current: {
count: String(stats.current.count),
size: String(stats.current.size),
},
noncurrent: {
count: String(stats.noncurrent.count),
size: String(stats.noncurrent.size),
},
...stats,
};
log.info(message, {
bucket: BUCKET,
Expand All @@ -166,67 +167,65 @@
LOG_PROGRESS_INTERVAL_MS,
);

function _listObjectVersions(bucket, KeyMarker, VersionIdMarker, cb) {
return safeListObjectVersions(s3, {
Bucket: bucket,
MaxKeys: LISTING_LIMIT,
Prefix: TARGET_PREFIX,
KeyMarker,
VersionIdMarker,
}, cb);
}


function listBucket(bucket, cb) {
async function listBucket(bucket) {
let NextKeyMarker = KEY_MARKER;
let NextVersionIdMarker = VERSION_ID_MARKER;
return doWhilst(
done => {
KeyMarker = NextKeyMarker;
VersionIdMarker = NextVersionIdMarker;
_listObjectVersions(bucket, KeyMarker, VersionIdMarker, (err, data) => {
if (err) {
log.error('error listing object versions', {
error: err,
});
return done(err);
}
for (const version of data.Versions) {
if (_OLDER_THAN_TIMESTAMP) {
const parsed = new Date(version.LastModified);
if (Number.isNaN(parsed.getTime()) || parsed > _OLDER_THAN_TIMESTAMP) {
continue;
}
}
const statObj = version.IsLatest ? stats.current : stats.noncurrent;
statObj.count += 1n;
statObj.size += version.Size || 0n;
if (VERBOSE) {
log.info('version info', {
bucket: BUCKET,
key: version.Key,
versionId: version.VersionId,
isLatest: version.IsLatest,
lastModified: version.LastModified,
size: version.Size,
});

while (true) {
KeyMarker = NextKeyMarker;
VersionIdMarker = NextVersionIdMarker;

const command = new ListObjectVersionsCommand({
Bucket: bucket,
MaxKeys: LISTING_LIMIT,
Prefix: TARGET_PREFIX,
KeyMarker,
VersionIdMarker,
});

try {
const data = await s3.send(command);
const versions = data.Versions || [];
for (const version of versions) {
if (_OLDER_THAN_TIMESTAMP) {
const parsed = new Date(version.LastModified);
if (Number.isNaN(parsed.getTime()) || parsed > _OLDER_THAN_TIMESTAMP) {
continue;
}
}
NextKeyMarker = data.NextKeyMarker;
NextVersionIdMarker = data.NextVersionIdMarker;
return done();
});
},
() => {
if (NextKeyMarker || NextVersionIdMarker) {
return true;
const statObj = version.IsLatest ? stats.current : stats.noncurrent;
statObj.count += 1n;
statObj.size += BigInt(version.Size || 0);
if (VERBOSE) {
log.info('version info', {
bucket: BUCKET,
key: version.Key,
versionId: version.VersionId,
isLatest: version.IsLatest,
lastModified: version.LastModified,
size: version.Size,
});
}
}
KeyMarker = undefined;
VersionIdMarker = undefined;
return false;
},
cb,
);

NextKeyMarker = data.NextKeyMarker;
NextVersionIdMarker = data.NextVersionIdMarker;

if (!NextKeyMarker && !NextVersionIdMarker) {
break;
}
} catch (error) {
log.error('error listing object versions', {
bucket: bucket,

Check failure on line 219 in bucketVersionsStats.js

View workflow job for this annotation

GitHub Actions / tests

Expected property shorthand
keyMarker: KeyMarker,
versionIdMarker: VersionIdMarker,
error: error,

Check failure on line 222 in bucketVersionsStats.js

View workflow job for this annotation

GitHub Actions / tests

Expected property shorthand
errorName: error.name,
errorMessage: error.message,
});
throw error;
}
}
}

function shutdown(exitCode) {
Expand All @@ -235,20 +234,24 @@
process.exit(exitCode);
}

listBucket(BUCKET, err => {
if (err) {
async function main() {
try {
await listBucket(BUCKET);
_logProgress('final summary');
shutdown(0);
} catch (error) {
log.error('error during execution', {
bucket: BUCKET,
KeyMarker,
VersionIdMarker,
error,
});
_logProgress('summary after error');
shutdown(1);
} else {
_logProgress('final summary');
shutdown(0);
}
});
}

main();

function stop() {
log.warn('stopping execution');
Expand Down
Loading
Loading