Skip to content

Commit 8550aea

Browse files
committed
Batch delete files from S3
1 parent a6e8444 commit 8550aea

File tree

2 files changed

+56
-30
lines changed

2 files changed

+56
-30
lines changed

archives/archives.go

Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -858,12 +858,7 @@ func DeleteRolledUpDailyArchives(ctx context.Context, rt *runtime.Runtime, org O
858858

859859
log := slog.With("org_id", org.ID, "org_name", org.Name, "archive_type", archiveType)
860860

861-
// first, get the archives to delete so we can clean up their S3 files
862-
type archiveToDelete struct {
863-
ID int `db:"id"`
864-
Location null.String `db:"location"`
865-
}
866-
var archivesToDelete []archiveToDelete
861+
var archivesToDelete []*Archive
867862
err := rt.DB.SelectContext(ctx, &archivesToDelete, sqlSelectRolledUpDailyArchives, org.ID, archiveType, DayPeriod)
868863
if err != nil {
869864
return 0, fmt.Errorf("error selecting rolled up daily archives: %w", err)
@@ -873,24 +868,25 @@ func DeleteRolledUpDailyArchives(ctx context.Context, rt *runtime.Runtime, org O
873868
return 0, nil
874869
}
875870

876-
// collect IDs and delete S3 files for archives that were uploaded
871+
// collect IDs and S3 keys grouped by bucket
877872
ids := make([]int, 0, len(archivesToDelete))
878-
s3DeletedCount := 0
873+
keysByBucket := make(map[string][]string)
879874
for _, archive := range archivesToDelete {
880875
ids = append(ids, archive.ID)
876+
if archive.isUploaded() {
877+
bucket, key := archive.location()
878+
keysByBucket[bucket] = append(keysByBucket[bucket], key)
879+
}
880+
}
881881

882-
if archive.Location != "" {
883-
// parse bucket:key from location (format is "bucket:key")
884-
parts := strings.SplitN(string(archive.Location), ":", 2)
885-
if len(parts) == 2 {
886-
bucket, key := parts[0], parts[1]
887-
if err := DeleteS3File(ctx, rt.S3, bucket, key); err != nil {
888-
log.Error("error deleting S3 file for rolled up daily archive", "archive_id", archive.ID, "bucket", bucket, "key", key, "error", err)
889-
// continue to try deleting other files and the database records
890-
} else {
891-
s3DeletedCount++
892-
}
893-
}
882+
// delete S3 files
883+
s3DeletedCount := 0
884+
for bucket, keys := range keysByBucket {
885+
deleted, err := DeleteS3Files(ctx, rt.S3, bucket, keys)
886+
s3DeletedCount += deleted
887+
if err != nil {
888+
log.Error("error deleting S3 files for rolled up daily archives", "bucket", bucket, "error", err)
889+
// continue to try deleting database records
894890
}
895891
}
896892

archives/s3.go

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"io"
99
"log/slog"
1010
"os"
11+
"slices"
1112
"strings"
1213

1314
"github.com/aws/aws-sdk-go-v2/aws"
@@ -139,16 +140,45 @@ func GetS3File(ctx context.Context, s3Client *s3x.Service, bucket, key string) (
139140
return output.Body, nil
140141
}
141142

142-
// DeleteS3File deletes a file from S3
143-
func DeleteS3File(ctx context.Context, s3Client *s3x.Service, bucket, key string) error {
144-
_, err := s3Client.Client.DeleteObject(ctx, &s3.DeleteObjectInput{
145-
Bucket: aws.String(bucket),
146-
Key: aws.String(key),
147-
})
148-
if err != nil {
149-
return fmt.Errorf("error deleting S3 object bucket=%s key=%s: %w", bucket, key, err)
143+
// DeleteS3Files deletes multiple files from S3, automatically batching into requests of 1000 keys
144+
func DeleteS3Files(ctx context.Context, s3Client *s3x.Service, bucket string, keys []string) (int, error) {
145+
if len(keys) == 0 {
146+
return 0, nil
150147
}
151148

152-
slog.Debug("deleted S3 file", "bucket", bucket, "key", key)
153-
return nil
149+
totalDeleted := 0
150+
var lastErr error
151+
152+
for batch := range slices.Chunk(keys, 1000) {
153+
objects := make([]types.ObjectIdentifier, len(batch))
154+
for i, key := range batch {
155+
objects[i] = types.ObjectIdentifier{Key: aws.String(key)}
156+
}
157+
158+
output, err := s3Client.Client.DeleteObjects(ctx, &s3.DeleteObjectsInput{
159+
Bucket: aws.String(bucket),
160+
Delete: &types.Delete{Objects: objects, Quiet: aws.Bool(true)},
161+
})
162+
if err != nil {
163+
lastErr = fmt.Errorf("error batch deleting S3 objects bucket=%s: %w", bucket, err)
164+
continue
165+
}
166+
167+
// check for individual errors
168+
if len(output.Errors) > 0 {
169+
for _, e := range output.Errors {
170+
slog.Error("error deleting S3 object in batch", "bucket", bucket, "key", *e.Key, "error", *e.Message)
171+
}
172+
totalDeleted += len(batch) - len(output.Errors)
173+
lastErr = fmt.Errorf("%d errors deleting S3 objects", len(output.Errors))
174+
} else {
175+
totalDeleted += len(batch)
176+
}
177+
}
178+
179+
if totalDeleted > 0 {
180+
slog.Debug("deleted S3 files", "bucket", bucket, "count", totalDeleted)
181+
}
182+
183+
return totalDeleted, lastErr
154184
}

0 commit comments

Comments
 (0)