Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 66 additions & 1 deletion archives/archives.go
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,67 @@ func DeleteArchivedOrgRecords(ctx context.Context, rt *runtime.Runtime, now time
return deleted, nil
}

const sqlSelectDeletableArchives = `
SELECT uuid, id, org_id, start_date::timestamp with time zone AS start_date, period, archive_type, hash, location, size, record_count, needs_deletion, rollup_id
FROM archives_archive
WHERE org_id = $1 AND archive_type = $2 AND period = 'D' AND rollup_id IS NOT NULL AND NOT needs_deletion`

// DeleteRolledUpDailyArchives deletes daily archives that have been rolled up into monthlies and had their records deleted
func DeleteRolledUpDailyArchives(ctx context.Context, rt *runtime.Runtime, org Org, archiveType ArchiveType) (int, error) {
ctx, cancel := context.WithTimeout(ctx, time.Minute*5)
defer cancel()

log := slog.With("org_id", org.ID, "org_name", org.Name, "archive_type", archiveType)

var toDelete []*Archive
if err := rt.DB.SelectContext(ctx, &toDelete, sqlSelectDeletableArchives, org.ID, archiveType); err != nil {
return 0, fmt.Errorf("error selecting rolled up daily archives: %w", err)
}

if len(toDelete) == 0 {
return 0, nil
}

// collect IDs and S3 keys grouped by bucket
ids := make([]int, 0, len(toDelete))
keysByBucket := make(map[string][]string)
for _, a := range toDelete {
ids = append(ids, a.ID)
if a.isUploaded() {
bucket, key := a.location()
keysByBucket[bucket] = append(keysByBucket[bucket], key)
}
}

// delete S3 files
s3DeletedCount := 0
for bucket, keys := range keysByBucket {
deleted, err := DeleteS3Files(ctx, rt.S3, bucket, keys)
s3DeletedCount += deleted
if err != nil {
log.Error("error deleting S3 files for rolled up daily archives", "bucket", bucket, "error", err)
// continue to try deleting database records
}
}

// delete archives from database by their IDs
result, err := rt.DB.ExecContext(ctx, `DELETE FROM archives_archive WHERE id = ANY($1)`, pq.Array(ids))
if err != nil {
return 0, fmt.Errorf("error deleting rolled up daily archives: %w", err)
}

deletedCount, err := result.RowsAffected()
if err != nil {
return 0, fmt.Errorf("error getting deleted rows count: %w", err)
}

if deletedCount > 0 {
log.Info("deleted rolled up daily archives", "count", deletedCount, "s3_files_deleted", s3DeletedCount)
}

return int(deletedCount), nil
}

// ArchiveOrg looks for any missing archives for the passed in org, creating and uploading them as necessary, returning the created archives
func ArchiveOrg(ctx context.Context, rt *runtime.Runtime, now time.Time, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, []*Archive, []*Archive, []*Archive, error) {
log := slog.With("org_id", org.ID, "org_name", org.Name)
Expand Down Expand Up @@ -877,7 +938,11 @@ func ArchiveOrg(ctx context.Context, rt *runtime.Runtime, now time.Time, org Org
return dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, nil, fmt.Errorf("error deleting archived records: %w", err)
}

// TODO get rid of any dailies which have been deleted and rolled up
// delete daily archives that have been rolled up into monthlies and had their records deleted
_, err = DeleteRolledUpDailyArchives(ctx, rt, org, archiveType)
if err != nil {
return dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, deleted, fmt.Errorf("error deleting rolled up daily archives: %w", err)
}

return dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, deleted, nil
}
Expand Down
58 changes: 58 additions & 0 deletions archives/archives_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,3 +546,61 @@ func TestArchiveActiveOrgs(t *testing.T) {
assert.NoError(t, err)

}

func TestDeleteRolledUpDailyArchives(t *testing.T) {
ctx, rt := setup(t)

orgs, err := GetActiveOrgs(ctx, rt)
assert.NoError(t, err)
now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC)

org := orgs[1]

// Step 1: Create daily archives (monthlies are created via RollupOrgArchives, not CreateOrgArchives unless it's a backfill)
dailiesCreated, _, _, _, err := CreateOrgArchives(ctx, rt, now, org, MessageType)
assert.NoError(t, err)
assert.Greater(t, len(dailiesCreated), 0, "should have created daily archives")

// Step 2: Roll up daily archives into monthlies
rollupsCreated, _, err := RollupOrgArchives(ctx, rt, now, org, MessageType)
assert.NoError(t, err)
assert.Greater(t, len(rollupsCreated), 0, "should have created rollup archives")

// Step 3: Delete archived records (this sets deleted_on on archives)
deleted, err := DeleteArchivedOrgRecords(ctx, rt, now, org, MessageType)
assert.NoError(t, err)
assert.Greater(t, len(deleted), 0, "should have deleted some archived records")

// Count total archives before deletion
var countBefore int
err = rt.DB.Get(&countBefore, "SELECT count(*) FROM archives_archive WHERE org_id = $1 AND archive_type = $2", org.ID, MessageType)
assert.NoError(t, err)

// Count daily archives that have been rolled up and are ready for cleanup (needs_deletion = false means records have been deleted or archive was empty)
var countRolledUp int
err = rt.DB.Get(&countRolledUp, "SELECT count(*) FROM archives_archive WHERE org_id = $1 AND archive_type = $2 AND period = $3 AND rollup_id IS NOT NULL AND needs_deletion = FALSE", org.ID, MessageType, DayPeriod)
assert.NoError(t, err)
assert.Greater(t, countRolledUp, 0, "should have some daily archives that were rolled up and ready for cleanup")

// Now call the function explicitly to delete rolled up dailies
deletedCount, err := DeleteRolledUpDailyArchives(ctx, rt, org, MessageType)
assert.NoError(t, err)
assert.Equal(t, countRolledUp, deletedCount, "should delete all rolled up daily archives")

// After deletion, count should be reduced
var countAfter int
err = rt.DB.Get(&countAfter, "SELECT count(*) FROM archives_archive WHERE org_id = $1 AND archive_type = $2", org.ID, MessageType)
assert.NoError(t, err)
assert.Equal(t, countBefore-countRolledUp, countAfter, "archive count should be reduced by deleted count")

// No more rolled up daily archives ready for cleanup should exist
err = rt.DB.Get(&countRolledUp, "SELECT count(*) FROM archives_archive WHERE org_id = $1 AND archive_type = $2 AND period = $3 AND rollup_id IS NOT NULL AND needs_deletion = FALSE", org.ID, MessageType, DayPeriod)
assert.NoError(t, err)
assert.Equal(t, 0, countRolledUp, "no rolled up daily archives should remain")

// Monthly archives should still exist
var countMonthly int
err = rt.DB.Get(&countMonthly, "SELECT count(*) FROM archives_archive WHERE org_id = $1 AND archive_type = $2 AND period = $3", org.ID, MessageType, MonthPeriod)
assert.NoError(t, err)
assert.Greater(t, countMonthly, 0, "monthly archives should still exist")
}
44 changes: 44 additions & 0 deletions archives/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io"
"log/slog"
"os"
"slices"
"strings"

"github.com/aws/aws-sdk-go-v2/aws"
Expand Down Expand Up @@ -138,3 +139,46 @@ func GetS3File(ctx context.Context, s3Client *s3x.Service, bucket, key string) (

return output.Body, nil
}

// DeleteS3Files deletes multiple files from S3, automatically batching into requests of 1000 keys
func DeleteS3Files(ctx context.Context, s3Client *s3x.Service, bucket string, keys []string) (int, error) {
if len(keys) == 0 {
return 0, nil
}

totalDeleted := 0
var lastErr error

for batch := range slices.Chunk(keys, 1000) {
objects := make([]types.ObjectIdentifier, len(batch))
for i, key := range batch {
objects[i] = types.ObjectIdentifier{Key: aws.String(key)}
}

output, err := s3Client.Client.DeleteObjects(ctx, &s3.DeleteObjectsInput{
Bucket: aws.String(bucket),
Delete: &types.Delete{Objects: objects, Quiet: aws.Bool(true)},
})
if err != nil {
lastErr = fmt.Errorf("error batch deleting S3 objects bucket=%s: %w", bucket, err)
continue
}

// check for individual errors
if len(output.Errors) > 0 {
for _, e := range output.Errors {
slog.Error("error deleting S3 object in batch", "bucket", bucket, "key", *e.Key, "error", *e.Message)
Copy link

Copilot AI Dec 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential nil pointer dereference. The code assumes that e.Key and e.Message are non-nil, but the AWS SDK types may return nil pointers for these fields in error cases. Consider adding nil checks before dereferencing these pointers to avoid panics.

Suggested change
slog.Error("error deleting S3 object in batch", "bucket", bucket, "key", *e.Key, "error", *e.Message)
key := "<nil>"

Copilot uses AI. Check for mistakes.
}
totalDeleted += len(batch) - len(output.Errors)
lastErr = fmt.Errorf("%d errors deleting S3 objects", len(output.Errors))
Copy link

Copilot AI Dec 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message only reflects the count of errors from the last batch with failures, not the cumulative count across all batches. If multiple batches have errors, only the last batch's error count is reported in the returned error. Consider tracking and reporting the total number of errors across all batches, or clarifying in the message that this is the last batch's error count.

Copilot uses AI. Check for mistakes.
} else {
totalDeleted += len(batch)
}
}

if totalDeleted > 0 {
slog.Debug("deleted S3 files", "bucket", bucket, "count", totalDeleted)
}

return totalDeleted, lastErr
}
2 changes: 1 addition & 1 deletion testdb.sql
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ INSERT INTO archives_archive(id, uuid, org_id, archive_type, created_on, start_d
(NEXTVAL('archives_archive_id_seq'), '019ae060-bfdf-723c-b2d1-d5234266bf03', 3, 'message', '2017-08-10 00:00:00.000000+00', '2017-08-10 00:00:00.000000+00', 'D', 35, 14, '2a80be2a47bfbb270ffe7ab5542351eb', 'temba-archives:3/message_D20170810_2a80be2a47bfbb270ffe7ab5542351eb.jsonl.gz', TRUE, 0),
(NEXTVAL('archives_archive_id_seq'), '019ae060-bfdf-76a4-84d1-9305a7340401', 3, 'message', '2017-09-10 00:00:00.000000+00', '2017-09-10 00:00:00.000000+00', 'D', 544, 38, '4a1664b669fb496596113623a22e677f', 'temba-archives:3/message_D20170910_4a1664b669fb496596113623a22e677f.jsonl.gz', TRUE, 0),
(NEXTVAL('archives_archive_id_seq'), '019ae060-bfdf-7b5a-8180-b510d7380fd8', 3, 'message', '2017-09-02 00:00:00.000000+00', '2017-09-01 00:00:00.000000+00', 'M', 34, 23, 'eba9aadcd2334b21ad6d58e5d82707a0', 'temba-archives:3/message_M20170902_eba9aadcd2334b21ad6d58e5d82707a0.jsonl.gz', TRUE, 0),
(NEXTVAL('archives_archive_id_seq'), '019ae060-bfdf-7376-a03c-3469201c709e', 2, 'message', '2017-10-08 00:00:00.000000+00', '2017-10-08 00:00:00.000000+00', 'D', 46, 65, '76869d6fe752447c83d1e5cc884b7cd8', 'temba-archives:2/message_D20171008_76869d6fe752447c83d1e5cc884b7cd8.jsonl.gz', TRUE, 0);
(NEXTVAL('archives_archive_id_seq'), '019ae060-bfdf-7376-a03c-3469201c709e', 2, 'message', '2017-10-08 00:00:00.000000+00', '2017-10-08 00:00:00.000000+00', 'D', 46, 65, '76869d6fe752447c83d1e5cc884b7cd8', 'temba-archives:2/message_D20171008_76869d6fe752447c83d1e5cc884b7cd8.jsonl.gz', FALSE, 0);

INSERT INTO contacts_contact(id, uuid, org_id, is_active, created_by_id, created_on, modified_by_id, modified_on, name, language) VALUES
(1, 'c7a2dd87-a80e-420b-8431-ca48d422e924', 1, TRUE, -1, '2017-11-10 21:11:59.890662+00', -1, '2017-11-10 21:11:59.890662+00', NULL, 'eng'),
Expand Down