Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 73 additions & 1 deletion archives/archives.go
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,74 @@ func DeleteArchivedOrgRecords(ctx context.Context, rt *runtime.Runtime, now time
return deleted, nil
}

const sqlSelectRolledUpDailyArchives = `
SELECT id, location
FROM archives_archive
WHERE org_id = $1 AND archive_type = $2 AND period = $3 AND rollup_id IS NOT NULL AND deleted_on IS NOT NULL`

const sqlDeleteRolledUpDailyArchives = `
DELETE FROM archives_archive
WHERE org_id = $1 AND archive_type = $2 AND period = $3 AND rollup_id IS NOT NULL AND deleted_on IS NOT NULL`

// DeleteRolledUpDailyArchives deletes daily archives that have been rolled up into monthlies and had their records deleted
func DeleteRolledUpDailyArchives(ctx context.Context, rt *runtime.Runtime, org Org, archiveType ArchiveType) (int, error) {
ctx, cancel := context.WithTimeout(ctx, time.Minute*5)
defer cancel()

log := slog.With("org_id", org.ID, "org_name", org.Name, "archive_type", archiveType)

// first, get the archives to delete so we can clean up their S3 files
type archiveToDelete struct {
ID int `db:"id"`
Location null.String `db:"location"`
}
var archivesToDelete []archiveToDelete
err := rt.DB.SelectContext(ctx, &archivesToDelete, sqlSelectRolledUpDailyArchives, org.ID, archiveType, DayPeriod)
if err != nil {
return 0, fmt.Errorf("error selecting rolled up daily archives: %w", err)
}

if len(archivesToDelete) == 0 {
return 0, nil
}

// delete S3 files for archives that were uploaded
s3DeletedCount := 0
for _, archive := range archivesToDelete {
if archive.Location != "" {
// parse bucket:key from location (format is "bucket:key")
parts := strings.SplitN(string(archive.Location), ":", 2)
if len(parts) == 2 {
bucket, key := parts[0], parts[1]
err := DeleteS3File(ctx, rt.S3, bucket, key)
if err != nil {
log.Error("error deleting S3 file for rolled up daily archive", "archive_id", archive.ID, "bucket", bucket, "key", key, "error", err)
// continue to try deleting other files and the database records
} else {
s3DeletedCount++
}
}
}
}

// delete all daily archives from database
result, err := rt.DB.ExecContext(ctx, sqlDeleteRolledUpDailyArchives, org.ID, archiveType, DayPeriod)
if err != nil {
return 0, fmt.Errorf("error deleting rolled up daily archives: %w", err)
}

deletedCount, err := result.RowsAffected()
if err != nil {
return 0, fmt.Errorf("error getting deleted rows count: %w", err)
}

if deletedCount > 0 {
log.Info("deleted rolled up daily archives", "count", deletedCount, "s3_files_deleted", s3DeletedCount)
}

return int(deletedCount), nil
}

// ArchiveOrg looks for any missing archives for the passed in org, creating and uploading them as necessary, returning the created archives
func ArchiveOrg(ctx context.Context, rt *runtime.Runtime, now time.Time, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, []*Archive, []*Archive, []*Archive, error) {
log := slog.With("org_id", org.ID, "org_name", org.Name)
Expand Down Expand Up @@ -877,7 +945,11 @@ func ArchiveOrg(ctx context.Context, rt *runtime.Runtime, now time.Time, org Org
return dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, nil, fmt.Errorf("error deleting archived records: %w", err)
}

// TODO get rid of any dailies which have been deleted and rolled up
// delete daily archives that have been rolled up into monthlies and had their records deleted
_, err = DeleteRolledUpDailyArchives(ctx, rt, org, archiveType)
if err != nil {
return dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, deleted, fmt.Errorf("error deleting rolled up daily archives: %w", err)
}

return dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, deleted, nil
}
Expand Down
52 changes: 52 additions & 0 deletions archives/archives_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,3 +546,55 @@ func TestArchiveActiveOrgs(t *testing.T) {
assert.NoError(t, err)

}

func TestDeleteRolledUpDailyArchives(t *testing.T) {
ctx, rt := setup(t)

orgs, err := GetActiveOrgs(ctx, rt)
assert.NoError(t, err)
now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC)

// First, run the full archive process for org 2 which will create dailies and monthlies
dailiesCreated, _, monthliesCreated, _, deleted, err := ArchiveOrg(ctx, rt, now, orgs[1], MessageType)
assert.NoError(t, err)

// We should have created daily archives
assert.Greater(t, len(dailiesCreated), 0)
// We should have created monthly archives
assert.Greater(t, len(monthliesCreated), 0)
// Some archives should have been deleted (records removed from source tables)
assert.Greater(t, len(deleted), 0)

// Count total archives before deletion
var countBefore int
err = rt.DB.Get(&countBefore, "SELECT count(*) FROM archives_archive WHERE org_id = $1 AND archive_type = $2", orgs[1].ID, MessageType)
assert.NoError(t, err)

// Count daily archives that have been rolled up and deleted (should be > 0)
var countRolledUpDeleted int
err = rt.DB.Get(&countRolledUpDeleted, "SELECT count(*) FROM archives_archive WHERE org_id = $1 AND archive_type = $2 AND period = $3 AND rollup_id IS NOT NULL AND deleted_on IS NOT NULL", orgs[1].ID, MessageType, DayPeriod)
assert.NoError(t, err)
assert.Greater(t, countRolledUpDeleted, 0, "should have some daily archives that were rolled up and deleted")

// Now call the function explicitly to delete rolled up dailies
deletedCount, err := DeleteRolledUpDailyArchives(ctx, rt, orgs[1], MessageType)
assert.NoError(t, err)
assert.Equal(t, countRolledUpDeleted, deletedCount, "should delete all rolled up daily archives")

// After deletion, count should be reduced
var countAfter int
err = rt.DB.Get(&countAfter, "SELECT count(*) FROM archives_archive WHERE org_id = $1 AND archive_type = $2", orgs[1].ID, MessageType)
assert.NoError(t, err)
assert.Equal(t, countBefore-countRolledUpDeleted, countAfter, "archive count should be reduced by deleted count")

// No more rolled up and deleted daily archives should exist
err = rt.DB.Get(&countRolledUpDeleted, "SELECT count(*) FROM archives_archive WHERE org_id = $1 AND archive_type = $2 AND period = $3 AND rollup_id IS NOT NULL AND deleted_on IS NOT NULL", orgs[1].ID, MessageType, DayPeriod)
assert.NoError(t, err)
assert.Equal(t, 0, countRolledUpDeleted, "no rolled up daily archives should remain")

// Monthly archives should still exist
var countMonthly int
err = rt.DB.Get(&countMonthly, "SELECT count(*) FROM archives_archive WHERE org_id = $1 AND archive_type = $2 AND period = $3", orgs[1].ID, MessageType, MonthPeriod)
assert.NoError(t, err)
assert.Greater(t, countMonthly, 0, "monthly archives should still exist")
}
14 changes: 14 additions & 0 deletions archives/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,17 @@ func GetS3File(ctx context.Context, s3Client *s3x.Service, bucket, key string) (

return output.Body, nil
}

// DeleteS3File deletes a file from S3
func DeleteS3File(ctx context.Context, s3Client *s3x.Service, bucket, key string) error {
_, err := s3Client.Client.DeleteObject(ctx, &s3.DeleteObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
if err != nil {
return fmt.Errorf("error deleting S3 object bucket=%s key=%s: %w", bucket, key, err)
}

slog.Debug("deleted S3 file", "bucket", bucket, "key", key)
return nil
}
Loading