1
1
/* eslint-disable no-console */
2
2
3
- const { http } = require ( 'httpagent' ) ;
3
+ const { http, https } = require ( 'httpagent' ) ;
4
4
const async = require ( 'async' ) ;
5
- const AWS = require ( 'aws-sdk' ) ;
5
+ const crypto = require ( 'crypto' ) ;
6
+ const {
7
+ S3Client,
8
+ GetBucketVersioningCommand,
9
+ ListObjectVersionsCommand,
10
+ DeleteObjectsCommand
11
+ } = require ( '@aws-sdk/client-s3' ) ;
12
+ const { NodeHttpHandler } = require ( '@aws-sdk/node-http-handler' ) ;
6
13
const { Logger } = require ( 'werelogs' ) ;
7
14
8
15
const log = new Logger ( 's3utils::removeDeleteMarkers' ) ;
@@ -78,22 +85,21 @@ const LOG_PROGRESS_INTERVAL = (
78
85
&& Number . parseInt ( process . env . LOG_PROGRESS_INTERVAL , 10 ) )
79
86
|| DEFAULT_LOG_PROGRESS_INTERVAL ;
80
87
81
- AWS . config . update ( {
82
- accessKeyId : ACCESS_KEY ,
83
- secretAccessKey : SECRET_KEY ,
88
+ const s3 = new S3Client ( {
89
+ credentials : {
90
+ accessKeyId : ACCESS_KEY ,
91
+ secretAccessKey : SECRET_KEY ,
92
+ } ,
84
93
endpoint : ENDPOINT ,
85
94
region : 'us-east-1' ,
86
- sslEnabled : false ,
87
- s3ForcePathStyle : true ,
88
- apiVersions : { s3 : '2006-03-01' } ,
89
- signatureVersion : 'v4' ,
90
- signatureCache : false ,
91
- } ) ;
92
-
93
- const s3 = new AWS . S3 ( {
94
- httpOptions : {
95
- agent : new http . Agent ( { keepAlive : true } ) ,
96
- } ,
95
+ forcePathStyle : true ,
96
+ requestHandler : new NodeHttpHandler ( {
97
+ httpAgent : new http . Agent ( { keepAlive : true } ) ,
98
+ httpsAgent : new https . Agent ( {
99
+ keepAlive : true ,
100
+ rejectUnauthorized : false
101
+ } ) ,
102
+ } ) ,
97
103
} ) ;
98
104
99
105
const status = {
@@ -136,36 +142,50 @@ const taskQueue = async.queue((task, done) => {
136
142
return next ( ) ;
137
143
}
138
144
logProgress ( 'start scanning bucket' ) ;
139
- return s3 . getBucketVersioning ( {
145
+ return s3 . send ( new GetBucketVersioningCommand ( {
140
146
Bucket : bucket ,
141
- } , ( err , data ) => {
142
- if ( err ) {
147
+ } ) )
148
+ . then ( data => {
149
+ if ( data . Status !== 'Suspended' ) {
150
+ log . error ( 'bucket versioning status is not "Suspended", skipping bucket' , {
151
+ bucket,
152
+ versioningStatus : data . Status ,
153
+ } ) ;
154
+ bucketDone = true ;
155
+ return next ( new Error ( 'bucket not processed' ) ) ;
156
+ }
157
+ return next ( ) ;
158
+ } )
159
+ . catch ( err => {
143
160
log . error ( 'error getting bucket versioning' , {
144
161
bucket,
145
162
error : err . message ,
146
163
} ) ;
147
164
bucketDone = true ;
148
165
return next ( err ) ;
149
- }
150
- if ( data . Status !== 'Suspended' ) {
151
- log . error ( 'bucket versioning status is not "Suspended", skipping bucket' , {
152
- bucket,
153
- versioningStatus : data . Status ,
154
- } ) ;
155
- bucketDone = true ;
156
- return next ( new Error ( 'bucket not processed' ) ) ;
157
- }
158
- return next ( ) ;
159
- } ) ;
166
+ } ) ;
160
167
} ,
161
- next => s3 . listObjectVersions ( {
168
+ next => s3 . send ( new ListObjectVersionsCommand ( {
162
169
Bucket : bucket ,
163
170
MaxKeys : LISTING_LIMIT ,
164
171
Prefix : TARGET_PREFIX ,
165
172
KeyMarker : keyMarker ,
166
173
VersionIdMarker : versionIdMarker ,
167
- } , ( err , data ) => {
168
- if ( err ) {
174
+ } ) )
175
+ . then ( data => {
176
+ status . objectsListed += ( data . Versions || [ ] ) . length + ( data . DeleteMarkers || [ ] ) . length ;
177
+ if ( data . NextKeyMarker || data . NextVersionIdMarker ) {
178
+ taskQueue . push ( {
179
+ bucket,
180
+ keyMarker : data . NextKeyMarker ,
181
+ versionIdMarker : data . NextVersionIdMarker ,
182
+ } ) ;
183
+ } else {
184
+ bucketDone = true ;
185
+ }
186
+ return next ( null , data . DeleteMarkers || [ ] ) ;
187
+ } )
188
+ . catch ( err => {
169
189
log . error ( 'error listing object versions' , {
170
190
bucket,
171
191
keyMarker,
@@ -174,33 +194,48 @@ const taskQueue = async.queue((task, done) => {
174
194
} ) ;
175
195
bucketDone = true ;
176
196
return next ( err ) ;
177
- }
178
- status . objectsListed += data . Versions . length + data . DeleteMarkers . length ;
179
- if ( data . NextKeyMarker || data . NextVersionIdMarker ) {
180
- taskQueue . push ( {
181
- bucket,
182
- keyMarker : data . NextKeyMarker ,
183
- versionIdMarker : data . NextVersionIdMarker ,
184
- } ) ;
185
- } else {
186
- bucketDone = true ;
187
- }
188
- return next ( null , data . DeleteMarkers ) ;
189
- } ) ,
197
+ } ) ,
190
198
( deleteMarkers , next ) => {
191
199
if ( deleteMarkers . length === 0 ) {
192
200
return next ( ) ;
193
201
}
194
- return s3 . deleteObjects ( {
202
+ const command = new DeleteObjectsCommand ( {
195
203
Bucket : bucket ,
196
204
Delete : {
197
205
Objects : deleteMarkers . map ( item => ( {
198
206
Key : item . Key ,
199
207
VersionId : item . VersionId ,
200
208
} ) ) ,
201
209
} ,
202
- } , ( err , data ) => {
203
- if ( err ) {
210
+ } ) ;
211
+
212
+ return s3 . send ( command )
213
+ . then ( data => {
214
+ if ( data . Deleted ) {
215
+ status . deleteMarkersDeleted += data . Deleted . length ;
216
+ data . Deleted . forEach ( entry => {
217
+ log . info ( 'delete marker deleted' , {
218
+ bucket,
219
+ objectKey : entry . Key ,
220
+ versionId : entry . VersionId ,
221
+ } ) ;
222
+ } ) ;
223
+ }
224
+ if ( data . Errors ) {
225
+ status . deleteMarkersErrors += data . Errors . length ;
226
+ data . Errors . forEach ( entry => {
227
+ log . error ( 'error deleting delete marker' , {
228
+ bucket,
229
+ objectKey : entry . Key ,
230
+ versionId : entry . VersionId ,
231
+ error : entry . Code ,
232
+ errorDesc : entry . Message ,
233
+ } ) ;
234
+ } ) ;
235
+ }
236
+ return next ( ) ;
237
+ } )
238
+ . catch ( err => {
204
239
log . error ( 'batch delete request error' , {
205
240
bucket,
206
241
keyMarker,
@@ -217,31 +252,7 @@ const taskQueue = async.queue((task, done) => {
217
252
} ) ;
218
253
} ) ;
219
254
return next ( ) ;
220
- }
221
- if ( data . Deleted ) {
222
- status . deleteMarkersDeleted += data . Deleted . length ;
223
- data . Deleted . forEach ( entry => {
224
- log . info ( 'delete marker deleted' , {
225
- bucket,
226
- objectKey : entry . Key ,
227
- versionId : entry . VersionId ,
228
- } ) ;
229
- } ) ;
230
- }
231
- if ( data . Errors ) {
232
- status . deleteMarkersErrors += data . Errors . length ;
233
- data . Errors . forEach ( entry => {
234
- log . error ( 'error deleting delete marker' , {
235
- bucket,
236
- objectKey : entry . Key ,
237
- versionId : entry . VersionId ,
238
- error : entry . Code ,
239
- errorDesc : entry . Message ,
240
- } ) ;
241
- } ) ;
242
- }
243
- return next ( ) ;
244
- } ) ;
255
+ } ) ;
245
256
} ,
246
257
] , err => {
247
258
if ( bucketDone ) {
0 commit comments