From 0b5f8de369279e5b52935233a19aa6d279b7f05b Mon Sep 17 00:00:00 2001 From: sylvain senechal Date: Tue, 23 Sep 2025 17:06:26 +0200 Subject: [PATCH 1/2] migrate to sdk v3 for bucketVersionsStats Issue: S3UTILS-200 --- bucketVersionsStats.js | 199 +++++++++++++++++++++-------------------- 1 file changed, 101 insertions(+), 98 deletions(-) diff --git a/bucketVersionsStats.js b/bucketVersionsStats.js index ccf2e6e9..44d49eff 100644 --- a/bucketVersionsStats.js +++ b/bucketVersionsStats.js @@ -1,13 +1,13 @@ const fs = require('fs'); const { http, https } = require('httpagent'); -const AWS = require('aws-sdk'); -const { doWhilst } = require('async'); +const { S3Client, ListObjectVersionsCommand } = require('@aws-sdk/client-s3'); +const { NodeHttpHandler } = require('@aws-sdk/node-http-handler'); +const { ConfiguredRetryStrategy } = require('@smithy/util-retry'); const { Logger } = require('werelogs'); const parseOlderThan = require('./utils/parseOlderThan'); -const { safeListObjectVersions } = require('./utils/safeList'); const log = new Logger('s3utils::bucketVersionsStats'); const { ENDPOINT } = process.env; @@ -97,38 +97,32 @@ if (s3EndpointIsHttps) { agent = new http.Agent({ keepAlive: true }); } -const options = { - accessKeyId: ACCESS_KEY, - secretAccessKey: SECRET_KEY, +const s3 = new S3Client({ + credentials: { + accessKeyId: ACCESS_KEY, + secretAccessKey: SECRET_KEY, + }, endpoint: ENDPOINT, region: 'us-east-1', - sslEnabled: s3EndpointIsHttps, - s3ForcePathStyle: true, - apiVersions: { s3: '2006-03-01' }, - signatureVersion: 'v4', - signatureCache: false, - httpOptions: { - timeout: 0, - agent, - }, -}; -/** - * Options specific to s3 requests - * `maxRetries` & `customBackoff` are set only to s3 requests - * default aws sdk retry count is 3 with an exponential delay of 2^n * 30 ms - */ -const s3Options = { - maxRetries: AWS_SDK_REQUEST_RETRIES, - customBackoff: (retryCount, error) => { - log.error('aws sdk request error', { error, retryCount }); - // retry with exponential backoff delay capped at 1mn max - // between retries, and a little added jitter - return Math.min(AWS_SDK_REQUEST_INITIAL_DELAY_MS - * 2 ** retryCount, 60000) - * (0.9 + Math.random() * 0.2); - }, -}; -const s3 = new AWS.S3(Object.assign(options, s3Options)); + forcePathStyle: true, + tls: s3EndpointIsHttps, + requestHandler: new NodeHttpHandler({ + httpAgent: agent, + httpsAgent: agent, + requestTimeout: 60000, + }), + retryStrategy: new ConfiguredRetryStrategy( + AWS_SDK_REQUEST_RETRIES, + // eslint-disable-next-line arrow-body-style + attempt => { + // Custom backoff with exponential delay capped at 1mn max + // between retries, and a little added jitter + return Math.min(AWS_SDK_REQUEST_INITIAL_DELAY_MS + * 2 ** attempt, 60000) + * (0.9 + Math.random() * 0.2); + } + ), +}); const stats = { current: { @@ -147,10 +141,17 @@ let VersionIdMarker; function _logProgress(message) { const loggedStats = { total: { - count: BigInt(stats.current.count + stats.noncurrent.count), - size: BigInt(stats.current.size + stats.noncurrent.size), + count: String(stats.current.count + stats.noncurrent.count), + size: String(stats.current.size + stats.noncurrent.size), + }, + current: { + count: String(stats.current.count), + size: String(stats.current.size), + }, + noncurrent: { + count: String(stats.noncurrent.count), + size: String(stats.noncurrent.size), }, - ...stats, }; log.info(message, { bucket: BUCKET, @@ -166,67 +167,65 @@ const logProgressInterval = setInterval( LOG_PROGRESS_INTERVAL_MS, ); -function _listObjectVersions(bucket, KeyMarker, VersionIdMarker, cb) { - return safeListObjectVersions(s3, { - Bucket: bucket, - MaxKeys: LISTING_LIMIT, - Prefix: TARGET_PREFIX, - KeyMarker, - VersionIdMarker, - }, cb); -} - - -function listBucket(bucket, cb) { +async function listBucket(bucket) { let NextKeyMarker = KEY_MARKER; let NextVersionIdMarker = VERSION_ID_MARKER; - return doWhilst( - done => { - KeyMarker = NextKeyMarker; - VersionIdMarker = NextVersionIdMarker; - _listObjectVersions(bucket, KeyMarker, VersionIdMarker, (err, data) => { - if (err) { - log.error('error listing object versions', { - error: err, - }); - return done(err); - } - for (const version of data.Versions) { - if (_OLDER_THAN_TIMESTAMP) { - const parsed = new Date(version.LastModified); - if (Number.isNaN(parsed.getTime()) || parsed > _OLDER_THAN_TIMESTAMP) { - continue; - } - } - const statObj = version.IsLatest ? stats.current : stats.noncurrent; - statObj.count += 1n; - statObj.size += version.Size || 0n; - if (VERBOSE) { - log.info('version info', { - bucket: BUCKET, - key: version.Key, - versionId: version.VersionId, - isLatest: version.IsLatest, - lastModified: version.LastModified, - size: version.Size, - }); + + while (true) { + KeyMarker = NextKeyMarker; + VersionIdMarker = NextVersionIdMarker; + + const command = new ListObjectVersionsCommand({ + Bucket: bucket, + MaxKeys: LISTING_LIMIT, + Prefix: TARGET_PREFIX, + KeyMarker, + VersionIdMarker, + }); + + try { + const data = await s3.send(command); + const versions = data.Versions || []; + for (const version of versions) { + if (_OLDER_THAN_TIMESTAMP) { + const parsed = new Date(version.LastModified); + if (Number.isNaN(parsed.getTime()) || parsed > _OLDER_THAN_TIMESTAMP) { + continue; } } - NextKeyMarker = data.NextKeyMarker; - NextVersionIdMarker = data.NextVersionIdMarker; - return done(); - }); - }, - () => { - if (NextKeyMarker || NextVersionIdMarker) { - return true; + const statObj = version.IsLatest ? stats.current : stats.noncurrent; + statObj.count += 1n; + statObj.size += BigInt(version.Size || 0); + if (VERBOSE) { + log.info('version info', { + bucket: BUCKET, + key: version.Key, + versionId: version.VersionId, + isLatest: version.IsLatest, + lastModified: version.LastModified, + size: version.Size, + }); + } } - KeyMarker = undefined; - VersionIdMarker = undefined; - return false; - }, - cb, - ); + + NextKeyMarker = data.NextKeyMarker; + NextVersionIdMarker = data.NextVersionIdMarker; + + if (!NextKeyMarker && !NextVersionIdMarker) { + break; + } + } catch (error) { + log.error('error listing object versions', { + bucket: bucket, + keyMarker: KeyMarker, + versionIdMarker: VersionIdMarker, + error: error, + errorName: error.name, + errorMessage: error.message, + }); + throw error; + } + } } function shutdown(exitCode) { @@ -235,20 +234,24 @@ function shutdown(exitCode) { process.exit(exitCode); } -listBucket(BUCKET, err => { - if (err) { +async function main() { + try { + await listBucket(BUCKET); + _logProgress('final summary'); + shutdown(0); + } catch (error) { log.error('error during execution', { bucket: BUCKET, KeyMarker, VersionIdMarker, + error, }); _logProgress('summary after error'); shutdown(1); - } else { - _logProgress('final summary'); - shutdown(0); } -}); +} + +main(); function stop() { log.warn('stopping execution'); From 03e1a411ed28e88f783b0fa8aa8d1e8e663ad0bd Mon Sep 17 00:00:00 2001 From: sylvain senechal Date: Tue, 23 Sep 2025 17:06:55 +0200 Subject: [PATCH 2/2] migrate to aws sdk v3 for cleanupNoncurrentVersions Issue: S3UTILS-200 --- cleanupNoncurrentVersions.js | 124 +++++++++++++++++++++++++---------- 1 file changed, 88 insertions(+), 36 deletions(-) diff --git a/cleanupNoncurrentVersions.js b/cleanupNoncurrentVersions.js index c16d2173..1549e440 100644 --- a/cleanupNoncurrentVersions.js +++ b/cleanupNoncurrentVersions.js @@ -1,15 +1,17 @@ const fs = require('fs'); +const crypto = require('crypto'); const { http, https } = require('httpagent'); const { ObjectMD } = require('arsenal').models; -const AWS = require('aws-sdk'); +const { S3Client, ListObjectVersionsCommand, DeleteObjectsCommand } = require('@aws-sdk/client-s3'); +const { NodeHttpHandler } = require('@aws-sdk/node-http-handler'); +const { ConfiguredRetryStrategy } = require('@smithy/util-retry'); const { doWhilst, eachSeries, filterLimit } = require('async'); const { Logger } = require('werelogs'); const BackbeatClient = require('./BackbeatClient'); const parseOlderThan = require('./utils/parseOlderThan'); -const { safeListObjectVersions } = require('./utils/safeList'); const log = new Logger('s3utils::cleanupNoncurrentVersions'); @@ -173,6 +175,33 @@ if (s3EndpointIsHttps) { agent = new http.Agent({ keepAlive: true }); } +const s3 = new S3Client({ + credentials: { + accessKeyId: ACCESS_KEY, + secretAccessKey: SECRET_KEY, + }, + endpoint: S3_ENDPOINT, + region: 'us-east-1', + forcePathStyle: true, + tls: s3EndpointIsHttps, + requestHandler: new NodeHttpHandler({ + httpAgent: agent, + httpsAgent: agent, + requestTimeout: 60000, + }), + retryStrategy: new ConfiguredRetryStrategy( + AWS_SDK_REQUEST_RETRIES, + // eslint-disable-next-line arrow-body-style + attempt => { + // Custom backoff with exponential delay capped at 1mn max + // between retries, and a little added jitter + return Math.min(AWS_SDK_REQUEST_INITIAL_DELAY_MS + * 2 ** attempt, 60000) + * (0.9 + Math.random() * 0.2); + } + ), +}); + const options = { accessKeyId: ACCESS_KEY, secretAccessKey: SECRET_KEY, @@ -207,7 +236,6 @@ const s3Options = { const opt = Object.assign(options, s3Options); -const s3 = new AWS.S3(opt); const bb = new BackbeatClient(opt); let nListed = 0; @@ -244,13 +272,17 @@ const logProgressInterval = setInterval( ); function _listObjectVersions(bucket, VersionIdMarker, KeyMarker, cb) { - return safeListObjectVersions(s3, { + const command = new ListObjectVersionsCommand({ Bucket: bucket, MaxKeys: LISTING_LIMIT, Prefix: TARGET_PREFIX, KeyMarker, VersionIdMarker, - }, cb); + }); + + s3.send(command) + .then(data => cb(null, data)) + .catch(cb); } function _getMetadata(bucket, key, versionId, cb) { @@ -297,13 +329,37 @@ function _doBatchDelete(bucket) { batchDeleteInProgress = true; // multi object delete can delete max 1000 objects const batchDeleteObjects = deleteQueue.splice(0, 1000); - const params = { + const command = new DeleteObjectsCommand({ Bucket: bucket, Delete: { Objects: batchDeleteObjects }, - }; - s3.deleteObjects(params, err => { - if (err) { - log.error('batch delete error', { error: err }); + }); + + command.middlewareStack.add( + next => async args => { + if (args.request.body) { + const bodyContent = Buffer.from(args.request.body); + const md5Hash = crypto.createHash('md5').update(bodyContent).digest('base64'); + // eslint-disable-next-line no-param-reassign + args.request.headers['Content-MD5'] = md5Hash; + } + return await next(args); + }, + { + step: 'build', + } + ); + + s3.send(command) + .then(() => { + nDeleted += batchDeleteObjects.length; + batchDeleteObjects.forEach(v => log.info('object deleted', { + bucket, + key: v.Key, + versionId: v.VersionId, + })); + }) + .catch(err => { + log.error('batch delete error', { error: err}); nErrors += 1; batchDeleteObjects.forEach( v => log.error('object may not be deleted', { @@ -312,29 +368,23 @@ function _doBatchDelete(bucket) { versionId: v.VersionId, }), ); - } else { - nDeleted += batchDeleteObjects.length; - batchDeleteObjects.forEach(v => log.info('object deleted', { - bucket, - key: v.Key, - versionId: v.VersionId, - })); - } - if (batchDeleteOnDrain && deleteQueue.length <= 1000) { - process.nextTick(batchDeleteOnDrain); - batchDeleteOnDrain = null; - } - if (batchDeleteOnFullDrain && deleteQueue.length === 0) { - process.nextTick(batchDeleteOnFullDrain); - batchDeleteOnFullDrain = null; - } - if (deleteQueue.length > 0) { - // there are more objects to delete, keep going - _doBatchDelete(bucket); - } else { - batchDeleteInProgress = false; - } - }); + }) + .finally(() => { + if (batchDeleteOnDrain && deleteQueue.length <= 1000) { + process.nextTick(batchDeleteOnDrain); + batchDeleteOnDrain = null; + } + if (batchDeleteOnFullDrain && deleteQueue.length === 0) { + process.nextTick(batchDeleteOnFullDrain); + batchDeleteOnFullDrain = null; + } + if (deleteQueue.length > 0) { + // there are more objects to delete, keep going + _doBatchDelete(bucket); + } else { + batchDeleteInProgress = false; + } + }); } function _triggerDeletes(bucket, versionsToDelete, cb) { @@ -565,11 +615,13 @@ function triggerDeletesOnBucket(bucketName, cb) { }); return done(err); } - nListed += data.Versions.length + data.DeleteMarkers.length; + const versions = data.Versions || []; + const deleteMarkers = data.DeleteMarkers || []; + nListed += versions.length + deleteMarkers.length; const ret = _triggerDeletesOnEligibleObjects( bucket, - data.Versions, - data.DeleteMarkers, + versions, + deleteMarkers, !data.IsTruncated, err => { if (err) {