Skip to content

Commit c2fe571

Browse files
committed
Batch delete files from S3
1 parent a6e8444 commit c2fe571

File tree

2 files changed

+63
-38
lines changed

2 files changed

+63
-38
lines changed

archives/archives.go

Lines changed: 23 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -847,9 +847,9 @@ func DeleteArchivedOrgRecords(ctx context.Context, rt *runtime.Runtime, now time
847847
}
848848

849849
const sqlSelectRolledUpDailyArchives = `
850-
SELECT id, location
850+
SELECT uuid, id, org_id, start_date::timestamp with time zone AS start_date, period, archive_type, hash, location, size, record_count, needs_deletion, rollup_id
851851
FROM archives_archive
852-
WHERE org_id = $1 AND archive_type = $2 AND period = $3 AND rollup_id IS NOT NULL AND deleted_on IS NOT NULL`
852+
WHERE org_id = $1 AND archive_type = $2 AND period = 'D' AND rollup_id IS NOT NULL AND deleted_on IS NOT NULL`
853853

854854
// DeleteRolledUpDailyArchives deletes daily archives that have been rolled up into monthlies and had their records deleted
855855
func DeleteRolledUpDailyArchives(ctx context.Context, rt *runtime.Runtime, org Org, archiveType ArchiveType) (int, error) {
@@ -858,39 +858,34 @@ func DeleteRolledUpDailyArchives(ctx context.Context, rt *runtime.Runtime, org O
858858

859859
log := slog.With("org_id", org.ID, "org_name", org.Name, "archive_type", archiveType)
860860

861-
// first, get the archives to delete so we can clean up their S3 files
862-
type archiveToDelete struct {
863-
ID int `db:"id"`
864-
Location null.String `db:"location"`
865-
}
866-
var archivesToDelete []archiveToDelete
867-
err := rt.DB.SelectContext(ctx, &archivesToDelete, sqlSelectRolledUpDailyArchives, org.ID, archiveType, DayPeriod)
868-
if err != nil {
861+
var toDelete []*Archive
862+
if err := rt.DB.SelectContext(ctx, &toDelete, sqlSelectRolledUpDailyArchives, org.ID, archiveType); err != nil {
869863
return 0, fmt.Errorf("error selecting rolled up daily archives: %w", err)
870864
}
871865

872-
if len(archivesToDelete) == 0 {
866+
if len(toDelete) == 0 {
873867
return 0, nil
874868
}
875869

876-
// collect IDs and delete S3 files for archives that were uploaded
877-
ids := make([]int, 0, len(archivesToDelete))
870+
// collect IDs and S3 keys grouped by bucket
871+
ids := make([]int, 0, len(toDelete))
872+
keysByBucket := make(map[string][]string)
873+
for _, a := range toDelete {
874+
ids = append(ids, a.ID)
875+
if a.isUploaded() {
876+
bucket, key := a.location()
877+
keysByBucket[bucket] = append(keysByBucket[bucket], key)
878+
}
879+
}
880+
881+
// delete S3 files
878882
s3DeletedCount := 0
879-
for _, archive := range archivesToDelete {
880-
ids = append(ids, archive.ID)
881-
882-
if archive.Location != "" {
883-
// parse bucket:key from location (format is "bucket:key")
884-
parts := strings.SplitN(string(archive.Location), ":", 2)
885-
if len(parts) == 2 {
886-
bucket, key := parts[0], parts[1]
887-
if err := DeleteS3File(ctx, rt.S3, bucket, key); err != nil {
888-
log.Error("error deleting S3 file for rolled up daily archive", "archive_id", archive.ID, "bucket", bucket, "key", key, "error", err)
889-
// continue to try deleting other files and the database records
890-
} else {
891-
s3DeletedCount++
892-
}
893-
}
883+
for bucket, keys := range keysByBucket {
884+
deleted, err := DeleteS3Files(ctx, rt.S3, bucket, keys)
885+
s3DeletedCount += deleted
886+
if err != nil {
887+
log.Error("error deleting S3 files for rolled up daily archives", "bucket", bucket, "error", err)
888+
// continue to try deleting database records
894889
}
895890
}
896891

archives/s3.go

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"io"
99
"log/slog"
1010
"os"
11+
"slices"
1112
"strings"
1213

1314
"github.com/aws/aws-sdk-go-v2/aws"
@@ -139,16 +140,45 @@ func GetS3File(ctx context.Context, s3Client *s3x.Service, bucket, key string) (
139140
return output.Body, nil
140141
}
141142

142-
// DeleteS3File deletes a file from S3
143-
func DeleteS3File(ctx context.Context, s3Client *s3x.Service, bucket, key string) error {
144-
_, err := s3Client.Client.DeleteObject(ctx, &s3.DeleteObjectInput{
145-
Bucket: aws.String(bucket),
146-
Key: aws.String(key),
147-
})
148-
if err != nil {
149-
return fmt.Errorf("error deleting S3 object bucket=%s key=%s: %w", bucket, key, err)
143+
// DeleteS3Files deletes multiple files from S3, automatically batching into requests of 1000 keys
144+
func DeleteS3Files(ctx context.Context, s3Client *s3x.Service, bucket string, keys []string) (int, error) {
145+
if len(keys) == 0 {
146+
return 0, nil
150147
}
151148

152-
slog.Debug("deleted S3 file", "bucket", bucket, "key", key)
153-
return nil
149+
totalDeleted := 0
150+
var lastErr error
151+
152+
for batch := range slices.Chunk(keys, 1000) {
153+
objects := make([]types.ObjectIdentifier, len(batch))
154+
for i, key := range batch {
155+
objects[i] = types.ObjectIdentifier{Key: aws.String(key)}
156+
}
157+
158+
output, err := s3Client.Client.DeleteObjects(ctx, &s3.DeleteObjectsInput{
159+
Bucket: aws.String(bucket),
160+
Delete: &types.Delete{Objects: objects, Quiet: aws.Bool(true)},
161+
})
162+
if err != nil {
163+
lastErr = fmt.Errorf("error batch deleting S3 objects bucket=%s: %w", bucket, err)
164+
continue
165+
}
166+
167+
// check for individual errors
168+
if len(output.Errors) > 0 {
169+
for _, e := range output.Errors {
170+
slog.Error("error deleting S3 object in batch", "bucket", bucket, "key", *e.Key, "error", *e.Message)
171+
}
172+
totalDeleted += len(batch) - len(output.Errors)
173+
lastErr = fmt.Errorf("%d errors deleting S3 objects", len(output.Errors))
174+
} else {
175+
totalDeleted += len(batch)
176+
}
177+
}
178+
179+
if totalDeleted > 0 {
180+
slog.Debug("deleted S3 files", "bucket", bucket, "count", totalDeleted)
181+
}
182+
183+
return totalDeleted, lastErr
154184
}

0 commit comments

Comments
 (0)