diff --git a/pkg/backup/backup_job.go b/pkg/backup/backup_job.go index bc2d14e2bbca..cbe1278c2512 100644 --- a/pkg/backup/backup_job.go +++ b/pkg/backup/backup_job.go @@ -1889,9 +1889,12 @@ func (b *backupResumer) deleteCheckpoint( defer exportStore.Close() // Delete will not delete a nonempty directory, so we have to go through // all files and delete each file one by one. - return exportStore.List(ctx, backupinfo.BackupProgressDirectory, "", func(p string) error { - return exportStore.Delete(ctx, backupinfo.BackupProgressDirectory+p) - }) + return exportStore.List( + ctx, backupinfo.BackupProgressDirectory, cloud.ListOptions{}, + func(p string) error { + return exportStore.Delete(ctx, backupinfo.BackupProgressDirectory+p) + }, + ) }(); err != nil { log.Dev.Warningf(ctx, "unable to delete checkpointed backup descriptor file in progress directory: %+v", err) } diff --git a/pkg/backup/backup_test.go b/pkg/backup/backup_test.go index d427c6acd9b1..6d6d4a6c3f1c 100644 --- a/pkg/backup/backup_test.go +++ b/pkg/backup/backup_test.go @@ -10083,16 +10083,18 @@ func TestBackupNoOverwriteCheckpoint(t *testing.T) { r.Close(ctx) var actualNumCheckpointsWritten int - require.NoError(t, store.List(ctx, latestFilePath+"/progress/", "", func(f string) error { - // Don't double count checkpoints as there will be the manifest and - // the checksum. - if !strings.HasSuffix(f, backupinfo.BackupManifestChecksumSuffix) { - if strings.HasPrefix(f, backupinfo.BackupManifestCheckpointName) { - actualNumCheckpointsWritten++ + require.NoError(t, store.List( + ctx, latestFilePath+"/progress/", cloud.ListOptions{}, + func(f string) error { + // Don't double count checkpoints as there will be the manifest and + // the checksum. + if !strings.HasSuffix(f, backupinfo.BackupManifestChecksumSuffix) { + if strings.HasPrefix(f, backupinfo.BackupManifestCheckpointName) { + actualNumCheckpointsWritten++ + } } - } - return nil - })) + return nil + })) // numCheckpointWritten only accounts for checkpoints written in the // progress loop, each time we Resume we write another checkpoint. @@ -10217,7 +10219,7 @@ func TestBackupTimestampedCheckpointsAreLexicographical(t *testing.T) { } require.NoError(t, err) var actual string - err = store.List(ctx, "/progress/", "", func(f string) error { + err = store.List(ctx, "/progress/", cloud.ListOptions{}, func(f string) error { actual = f return cloud.ErrListingDone }) @@ -10248,13 +10250,15 @@ func TestBackupNoOverwriteLatest(t *testing.T) { findNumLatestFiles := func() (int, string) { var numLatestFiles int var latestFile string - err = store.List(ctx, backupbase.LatestHistoryDirectory, "", func(p string) error { - if numLatestFiles == 0 { - latestFile = p - } - numLatestFiles++ - return nil - }) + err = store.List( + ctx, backupbase.LatestHistoryDirectory, cloud.ListOptions{}, + func(p string) error { + if numLatestFiles == 0 { + latestFile = p + } + numLatestFiles++ + return nil + }) require.NoError(t, err) return numLatestFiles, latestFile } diff --git a/pkg/backup/backupdest/backup_destination.go b/pkg/backup/backupdest/backup_destination.go index 4297d07f7241..93882bd61e4e 100644 --- a/pkg/backup/backupdest/backup_destination.go +++ b/pkg/backup/backupdest/backup_destination.go @@ -303,18 +303,21 @@ func FindLatestFile( // empty object with a trailing '/'. The latter is never created by our code // but can be created by other tools, e.g., AWS DataSync to transfer an existing backup to // another bucket. (See https://github.com/cockroachdb/cockroach/issues/106070.) - err := exportStore.List(ctx, backupbase.LatestHistoryDirectory, "", func(p string) error { - p = strings.TrimPrefix(p, "/") - if p == "" { - // N.B. skip the empty object with a trailing '/', created by a third-party tool. - return nil - } - latestFile = p - latestFileFound = true - // We only want the first latest file so return an error that it is - // done listing. - return cloud.ErrListingDone - }) + err := exportStore.List( + ctx, backupbase.LatestHistoryDirectory, cloud.ListOptions{}, + func(p string) error { + p = strings.TrimPrefix(p, "/") + if p == "" { + // N.B. skip the empty object with a trailing '/', created by a third-party tool. + return nil + } + latestFile = p + latestFileFound = true + // We only want the first latest file so return an error that it is + // done listing. + return cloud.ErrListingDone + }, + ) // If the list failed because the storage used does not support listing, // such as http, we can try reading the non-timestamped backup latest // file directly. This can still fail if it is a mixed cluster and the @@ -478,12 +481,15 @@ func ListFullBackupsInCollection( ctx context.Context, store cloud.ExternalStorage, ) ([]string, error) { var backupPaths []string - if err := store.List(ctx, "", backupbase.ListingDelimDataSlash, func(f string) error { - if deprecatedBackupPathRE.MatchString(f) { - backupPaths = append(backupPaths, f) - } - return nil - }); err != nil { + if err := store.List( + ctx, "", cloud.ListOptions{Delimiter: backupbase.ListingDelimDataSlash}, + func(f string) error { + if deprecatedBackupPathRE.MatchString(f) { + backupPaths = append(backupPaths, f) + } + return nil + }, + ); err != nil { // Can't happen, just required to handle the error for lint. return nil, err } diff --git a/pkg/backup/backupdest/incrementals.go b/pkg/backup/backupdest/incrementals.go index 16ce1c98d09f..1232c2805abe 100644 --- a/pkg/backup/backupdest/incrementals.go +++ b/pkg/backup/backupdest/incrementals.go @@ -131,26 +131,29 @@ func LegacyFindPriorBackups( defer sp.Finish() var prev []string - if err := store.List(ctx, "", backupbase.ListingDelimDataSlash, func(p string) error { - matchesGlob, err := path.Match(incBackupSubdirGlob+backupbase.DeprecatedBackupManifestName, p) - if err != nil { - return err - } else if !matchesGlob { - matchesGlob, err = path.Match(incBackupSubdirGlobWithSuffix+backupbase.DeprecatedBackupManifestName, p) + if err := store.List( + ctx, "", cloud.ListOptions{Delimiter: backupbase.ListingDelimDataSlash}, + func(p string) error { + matchesGlob, err := path.Match(incBackupSubdirGlob+backupbase.DeprecatedBackupManifestName, p) if err != nil { return err + } else if !matchesGlob { + matchesGlob, err = path.Match(incBackupSubdirGlobWithSuffix+backupbase.DeprecatedBackupManifestName, p) + if err != nil { + return err + } } - } - if matchesGlob { - if !includeManifest { - p = strings.TrimSuffix(p, "/"+backupbase.DeprecatedBackupManifestName) + if matchesGlob { + if !includeManifest { + p = strings.TrimSuffix(p, "/"+backupbase.DeprecatedBackupManifestName) + } + prev = append(prev, p) + return nil } - prev = append(prev, p) return nil - } - return nil - }); err != nil { + }, + ); err != nil { return nil, errors.Wrap(err, "reading previous backup layers") } sort.Strings(prev) diff --git a/pkg/backup/backupencryption/encryption.go b/pkg/backup/backupencryption/encryption.go index 9acdb657d99f..1864ac413bd0 100644 --- a/pkg/backup/backupencryption/encryption.go +++ b/pkg/backup/backupencryption/encryption.go @@ -479,15 +479,18 @@ func GetEncryptionInfoFiles(ctx context.Context, dest cloud.ExternalStorage) ([] var files []string // Look for all files in dest that start with "/ENCRYPTION-INFO" // and return them. - err := dest.List(ctx, "", backupbase.ListingDelimDataSlash, func(p string) error { - paths := strings.Split(p, "/") - p = paths[len(paths)-1] - if match := strings.HasPrefix(p, backupEncryptionInfoFile); match { - files = append(files, p) - } + err := dest.List( + ctx, "", cloud.ListOptions{Delimiter: backupbase.ListingDelimDataSlash}, + func(p string) error { + paths := strings.Split(p, "/") + p = paths[len(paths)-1] + if match := strings.HasPrefix(p, backupEncryptionInfoFile); match { + files = append(files, p) + } - return nil - }) + return nil + }, + ) if len(files) < 1 { return nil, errors.New("no ENCRYPTION-INFO files found") } diff --git a/pkg/backup/backupinfo/BUILD.bazel b/pkg/backup/backupinfo/BUILD.bazel index 3a15159d52c9..7c988d1bf9f3 100644 --- a/pkg/backup/backupinfo/BUILD.bazel +++ b/pkg/backup/backupinfo/BUILD.bazel @@ -41,6 +41,7 @@ go_library( "//pkg/sql/stats", "//pkg/storage", "//pkg/util", + "//pkg/util/besteffort", "//pkg/util/bulk", "//pkg/util/ctxgroup", "//pkg/util/encoding", diff --git a/pkg/backup/backupinfo/backup_index.go b/pkg/backup/backupinfo/backup_index.go index 8157d01568d6..3e524dc60b7b 100644 --- a/pkg/backup/backupinfo/backup_index.go +++ b/pkg/backup/backupinfo/backup_index.go @@ -8,6 +8,7 @@ package backupinfo import ( "bytes" "context" + "encoding/base64" "fmt" "path" "slices" @@ -24,7 +25,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/besteffort" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/ioctx" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -125,7 +128,7 @@ func WriteBackupIndexMetadata( // 2. The backup was taken on a v25.4+ cluster. // // The store should be rooted at the default collection URI (the one that -// contains the `index/` directory). +// contains the `metadata/` directory). // // TODO (kev-cao): v25.4+ backups will always contain an index file. In other // words, we can remove these checks in v26.2+. @@ -138,7 +141,7 @@ func IndexExists(ctx context.Context, store cloud.ExternalStorage, subdir string if err := store.List( ctx, indexDir, - "/", + cloud.ListOptions{Delimiter: "/"}, func(file string) error { indexExists = true // Because we delimit on `/` and the index subdir does not contain a @@ -154,7 +157,7 @@ func IndexExists(ctx context.Context, store cloud.ExternalStorage, subdir string // ListIndexes lists all the index files for a backup chain rooted by the full // backup indicated by the subdir. The store should be rooted at the default -// collection URI (the one that contains the `index/` directory). It returns +// collection URI (the one that contains the `metadata/` directory). It returns // the basenames of the listed index files. It assumes that the subdir is // resolved and not `LATEST`. // @@ -176,7 +179,7 @@ func ListIndexes( if err := store.List( ctx, indexDir+"/", - "", + cloud.ListOptions{}, func(file string) error { // We assert that if a file ends with .pb in the index, it should be a // parsable index file. Otherwise, we ignore it. This circumvents any temp @@ -190,7 +193,7 @@ func ListIndexes( i := indexTimes{ file: base, } - i.start, i.end, err = parseIndexFilename(base) + i.start, i.end, err = parseIndexBasename(base) if err != nil { return err } @@ -220,9 +223,147 @@ func ListIndexes( }), nil } +// RestorableBackup represents a row in the `SHOW BACKUPS` output +type RestorableBackup struct { + ID string + EndTime hlc.Timestamp + MVCCFilter backuppb.MVCCFilter + RevisionStartTime hlc.Timestamp +} + +// ListRestorableBackups lists all restorable backups from the backup index +// within the specified time interval (inclusive at both ends). The store should +// be rooted at the default collection URI (the one that contains the +// `metadata/` directory). +// +// Note: Backups with duplicate end times (e.g. compacted backups) are elided +// and only one is returned. In the case of revision-history backups, the +// backups will be marked as containing revision-history, despite the fact +// that the compacted backups specifically do not contain revision history. +func ListRestorableBackups( + ctx context.Context, store cloud.ExternalStorage, after, before time.Time, +) ([]RestorableBackup, error) { + idxInRange, err := listIndexesWithinRange(ctx, store, after, before) + if err != nil { + return nil, err + } + + var elidedIndexes []parsedIndex + for _, index := range idxInRange { + if len(elidedIndexes) > 0 { + last := &elidedIndexes[len(elidedIndexes)-1] + // Elide duplicate end times within a chain. Because the indexes are + // sorted with ascending start times breaking ties, keeping the last one + // ensures that we keep the non-compacted backup. + if last.end.Equal(index.end) && last.fullEnd.Equal(index.fullEnd) { + last.filePath = index.filePath + continue + } + } + elidedIndexes = append(elidedIndexes, index) + } + + backups := make([]RestorableBackup, 0, len(elidedIndexes)) + for _, index := range elidedIndexes { + reader, _, err := store.ReadFile(ctx, index.filePath, cloud.ReadOptions{}) + if err != nil { + return nil, errors.Wrapf(err, "reading index file %s", index.filePath) + } + + bytes, err := ioctx.ReadAll(ctx, reader) + besteffort.Error(ctx, "cleanup-index-reader", func(ctx context.Context) error { + return reader.Close(ctx) + }) + if err != nil { + return nil, errors.Wrapf(err, "reading index file %s", index.filePath) + } + + idxMeta := backuppb.BackupIndexMetadata{} + if err := protoutil.Unmarshal(bytes, &idxMeta); err != nil { + return nil, errors.Wrapf(err, "unmarshalling index file %s", index.filePath) + } + + backups = append(backups, RestorableBackup{ + ID: encodeBackupID(index.fullEnd, index.end), + EndTime: idxMeta.EndTime, + MVCCFilter: idxMeta.MVCCFilter, + RevisionStartTime: idxMeta.RevisionStartTime, + }) + } + return backups, nil +} + +type parsedIndex struct { + filePath string // path to the index relative to the backup collection root + fullEnd, end time.Time +} + +// listIndexesWithinRange lists all index files whose end time falls within the +// specified time interval (inclusive at both ends). The store should be rooted +// at the default collection URI (the one that contains the `metadata/` +// directory). The returned index filenames are relative to the `metadata/index` +// directory and sorted in descending order by end time, with ties broken by +// ascending start time. +func listIndexesWithinRange( + ctx context.Context, store cloud.ExternalStorage, after, before time.Time, +) ([]parsedIndex, error) { + // First, find the full backup end time prefix we begin listing from. Since + // full backup end times are stored in descending order in the index, we add + // ten milliseconds (the maximum granularity of the timestamp encoding) to + // ensure an inclusive start. + startTS := before.Add(10 * time.Millisecond) + startPoint, err := endTimeToIndexSubdir(startTS) + if err != nil { + return nil, err + } + + var idxInRange []parsedIndex + err = store.List( + ctx, + backupbase.BackupIndexDirectoryPath+"/", + cloud.ListOptions{AfterKey: startPoint}, + func(file string) error { + if !strings.HasSuffix(file, ".pb") { + return nil + } + full, start, end, err := parseTimesFromIndexFilepath(file) + if err != nil { + return err + } + // Once we see an *incremental* backup with an end time before `after`, we + // can stop iterating as we have found all backups within the time range. + if !start.IsZero() && end.Before(after) { + return cloud.ErrListingDone + } + if end.After(before) || end.Before(after) { + return nil + } + entry := parsedIndex{ + filePath: path.Join(backupbase.BackupIndexDirectoryPath, file), + fullEnd: full, + end: end, + } + // Maintain descending end time order. May need to swap with the last + // index added. + if len(idxInRange) > 0 && end.After(idxInRange[len(idxInRange)-1].end) { + tmp := idxInRange[len(idxInRange)-1] + idxInRange[len(idxInRange)-1] = entry + entry = tmp + } + idxInRange = append(idxInRange, entry) + return nil + }, + ) + if err != nil && !errors.Is(err, cloud.ErrListingDone) { + return nil, err + } + + return idxInRange, nil +} + // GetBackupTreeIndexMetadata concurrently retrieves the index metadata for all // backups within the specified subdir. The store should be rooted at the -// collection URI that contains the `index/` directory. Indexes are returned in +// collection URI that contains the `metadata/` directory. Indexes are returned in // ascending end time order, with ties broken by ascending start time order. func GetBackupTreeIndexMetadata( ctx context.Context, store cloud.ExternalStorage, subdir string, @@ -280,7 +421,7 @@ func GetBackupTreeIndexMetadata( // and derive it from the filename solely because backup paths are // millisecond-precise and so are the timestamps encoded in the filename. func ParseBackupFilePathFromIndexFileName(subdir, basename string) (string, error) { - start, end, err := parseIndexFilename(basename) + start, end, err := parseIndexBasename(basename) if err != nil { return "", err } @@ -296,7 +437,7 @@ func ParseBackupFilePathFromIndexFileName(subdir, basename string) (string, erro // // Note: The timestamps are only millisecond-precise and so do not represent the // exact nano-specific times in the corresponding backup manifest. -func parseIndexFilename(basename string) (start time.Time, end time.Time, err error) { +func parseIndexBasename(basename string) (start time.Time, end time.Time, err error) { invalidFmtErr := errors.Newf("invalid index filename format: %s", basename) if !strings.HasSuffix(basename, "_metadata.pb") { @@ -393,10 +534,43 @@ func getBackupIndexFileName(startTime, endTime hlc.Timestamp) string { ) } +// endTimeToIndexSubdir converts an end time to the full path to its +// corresponding index subdir. +// +// Example: +// 2025-08-13 12:00:00.00 -> metadata/index/_20250813-120000.00 +func endTimeToIndexSubdir(endTime time.Time) (string, error) { + subdir := endTime.Format(backupbase.DateBasedIntoFolderName) + return indexSubdir(subdir) +} + +// indexSubdirToEndTime extracts the end time from an index subdir. +// +// Example: +// _20250813-120000.00 -> 2025-08-13 12:00:00.00 +func indexSubdirToEndTime(indexSubdir string) (time.Time, error) { + parts := strings.Split(indexSubdir, "_") + if len(parts) != 2 { + return time.Time{}, errors.Newf( + "invalid index subdir format: %s", indexSubdir, + ) + } + endTime, err := time.Parse(backupbase.BackupIndexFilenameTimestampFormat, parts[1]) + if err != nil { + return time.Time{}, errors.Wrapf( + err, "index subdir %s could not be decoded", indexSubdir, + ) + } + return endTime, nil +} + // indexSubdir is a convenient helper function to get the corresponding index // path for a given full backup subdir. The path is relative to the root of the // collection URI and does not contain a trailing slash. It assumes that subdir // has been resolved and is not `LATEST`. +// +// Example: +// /2025/08/13-120000.00 -> metadata/index/_20250813-120000.00 func indexSubdir(subdir string) (string, error) { flattened, err := convertSubdirToIndexSubdir(subdir) if err != nil { @@ -420,8 +594,11 @@ func indexSubdir(subdir string) (string, error) { // |_ _20250814-120000.00/ // |_ .pb // -// Listing on `index/` and delimiting on `/` will return the subdirectories -// without listing the files in them. +// Listing on `metadata/index/` and delimiting on `/` will return the +// subdirectories without listing the files in them. +// +// Example: +// /2025/08/13-120000.00 -> _20250813-120000.00 func convertSubdirToIndexSubdir(subdir string) (string, error) { subdirTime, err := time.Parse(backupbase.DateBasedIntoFolderName, subdir) if err != nil { @@ -438,6 +615,9 @@ func convertSubdirToIndexSubdir(subdir string) (string, error) { // convertIndexSubdirToSubdir converts an index subdir back to the // original full backup subdir. +// +// Example: +// _20250813-120000.00 -> /2025/08/13-120000.00 func convertIndexSubdirToSubdir(flattened string) (string, error) { parts := strings.Split(flattened, "_") if len(parts) != 2 { @@ -466,3 +646,41 @@ func convertIndexSubdirToSubdir(flattened string) (string, error) { unflattened := descSubdirTime.Format(backupbase.DateBasedIntoFolderName) return unflattened, nil } + +// parseTimesFromIndexFilepath extracts the full end time, start time, and end +// time from the index file path. The filepath is relative to the index +// directory. +// +// Example: +// _/___metadata.pb -> +// +// full_end, start, end +func parseTimesFromIndexFilepath(filepath string) (fullEnd, start, end time.Time, err error) { + parts := strings.Split(filepath, "/") + if len(parts) != 2 { + return time.Time{}, time.Time{}, time.Time{}, errors.Newf( + "invalid index filepath format: %s", filepath, + ) + } + + fullEnd, err = indexSubdirToEndTime(parts[0]) + if err != nil { + return time.Time{}, time.Time{}, time.Time{}, err + } + + start, end, err = parseIndexBasename(path.Base(parts[1])) + if err != nil { + return time.Time{}, time.Time{}, time.Time{}, err + } + + return fullEnd, start, end, nil +} + +// encodeBackupID generates a backup ID for a backup identified by its parent +// full end time and its own end time. +func encodeBackupID(fullEnd time.Time, backupEnd time.Time) string { + var buf []byte + buf = encoding.EncodeUint64Ascending(buf, uint64(fullEnd.UnixMilli())) + buf = encoding.EncodeUint64Ascending(buf, uint64(backupEnd.UnixMilli())) + return base64.URLEncoding.EncodeToString(buf) +} diff --git a/pkg/backup/backupinfo/backup_index_test.go b/pkg/backup/backupinfo/backup_index_test.go index 980dd1e62342..5bdda7f42310 100644 --- a/pkg/backup/backupinfo/backup_index_test.go +++ b/pkg/backup/backupinfo/backup_index_test.go @@ -9,6 +9,7 @@ import ( "context" "fmt" "math/rand" + "net/url" "os" "path" "slices" @@ -647,78 +648,31 @@ func TestGetBackupTreeIndexMetadata(t *testing.T) { return externalStorage, nil } - writeIndex := func( - t *testing.T, subdirEnd, start, end int, - ) { - t.Helper() - subdirTS := intToTimeWithNano(subdirEnd).GoTime() - startTS := intToTimeWithNano(start) - endTS := intToTimeWithNano(end) - subdir := subdirTS.Format(backupbase.DateBasedIntoFolderName) - - details := jobspb.BackupDetails{ - Destination: jobspb.BackupDetails_Destination{ - To: []string{collectionURI}, - Subdir: subdir, - }, - StartTime: startTS, - EndTime: endTS, - CollectionURI: collectionURI, - // This test doesn't look at the URI stored in the index metadata, so it - // doesn't need to be accurate to the exact path differences between full - // and incremental backups. We can just set URI to something that looks - // valid. - URI: collectionURI + subdir, - } - require.NoError(t, WriteBackupIndexMetadata( - ctx, execCfg, username.RootUserName(), storageFactory, details, hlc.Timestamp{}, - )) + simpleChain := fakeBackupChain{{0, 2, false}, {2, 4, false}, {4, 6, false}, {6, 8, false}} + compactedChain := fakeBackupChain{ + {0, 10, false}, {10, 11, false}, {10, 12, false}, {11, 12, false}, + {12, 14, false}, {14, 16, false}, } - - randIdx := func(n int) []int { - idxs := make([]int, n) - for i := 0; i < n; i++ { - idxs[i] = i - } - rand.Shuffle(n, func(i, j int) { - // Don't shuffle the full backup as it must be written before incs. - if i == 0 || j == 0 { - return - } - idxs[i], idxs[j] = idxs[j], idxs[i] - }) - return idxs + doubleCompactedChain := fakeBackupChain{ + {0, 18, false}, {18, 20, false}, {18, 22, false}, {20, 22, false}, + {22, 24, false}, {18, 26, false}, {24, 26, false}, } + fullOnly := fakeBackupChain{{0, 28, false}} - type chain = [][2]int - simpleChain := chain{{0, 2}, {2, 4}, {4, 6}, {6, 8}} - compactedChain := chain{{0, 10}, {10, 11}, {10, 12}, {11, 12}, {12, 14}, {14, 16}} - doubleCompactedChain := chain{{0, 18}, {18, 20}, {18, 22}, {20, 22}, {22, 24}, {18, 26}, {24, 26}} - fullOnly := chain{{0, 28}} - - indexes := []chain{ + fakeBackupCollection{ simpleChain, compactedChain, doubleCompactedChain, fullOnly, - } - - for _, index := range indexes { - // Write the index files in random time order to ensure the read always - // returns in them in the correct order. - rndIdxs := randIdx(len(index)) - for _, idx := range rndIdxs { - writeIndex(t, index[0][1], index[idx][0], index[idx][1]) - } - } + }.writeIndexes(t, ctx, execCfg, storageFactory, collectionURI) testcases := []struct { name string - chain chain + chain fakeBackupChain error string // expectedIndexTimes should be sorted in ascending order by end time, with // ties broken by ascending start time. - expectedIndexTimes chain + expectedIndexTimes [][2]int }{ { name: "fetch all indexes from chain with no compacted backups", @@ -728,25 +682,25 @@ func TestGetBackupTreeIndexMetadata(t *testing.T) { { name: "fetch all indexes from tree with compacted backups", chain: compactedChain, - expectedIndexTimes: chain{{0, 10}, {10, 11}, {10, 12}, {11, 12}, {12, 14}, {14, 16}}, + expectedIndexTimes: [][2]int{{0, 10}, {10, 11}, {10, 12}, {11, 12}, {12, 14}, {14, 16}}, }, { name: "fetch all indexes from tree with double compacted backups", chain: doubleCompactedChain, - expectedIndexTimes: chain{ + expectedIndexTimes: [][2]int{ {0, 18}, {18, 20}, {18, 22}, {20, 22}, {22, 24}, {18, 26}, {24, 26}, }, }, { name: "index only contains a full backup", chain: fullOnly, - expectedIndexTimes: chain{{0, 28}}, + expectedIndexTimes: [][2]int{{0, 28}}, }, } for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { - subdirTS := intToTimeWithNano(tc.chain[0][1]).GoTime() + subdirTS := intToTimeWithNano(tc.chain[0].end).GoTime() subdir := subdirTS.Format(backupbase.DateBasedIntoFolderName) metadatas, err := GetBackupTreeIndexMetadata(ctx, externalStorage, subdir) @@ -774,6 +728,128 @@ func TestGetBackupTreeIndexMetadata(t *testing.T) { } } +func TestListRestorableBackups(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + execCfg := &sql.ExecutorConfig{Settings: st} + const collectionURI = "nodelocal://1/backup" + dir, dirCleanupFn := testutils.TempDir(t) + defer dirCleanupFn() + externalStorage, err := cloud.ExternalStorageFromURI( + ctx, + collectionURI, + base.ExternalIODirConfig{}, + st, + blobs.TestBlobServiceClient(dir), + username.RootUserName(), + nil, /* db */ + nil, /* limiters */ + cloud.NilMetrics, + ) + require.NoError(t, err) + makeExternalStorage := func( + _ context.Context, _ string, _ username.SQLUsername, _ ...cloud.ExternalStorageOption, + ) (cloud.ExternalStorage, error) { + return externalStorage, nil + } + + fakeBackupCollection{ + { + // Simple chain. + {0, 2, false}, {2, 4, false}, {4, 6, false}, + }, + { + // Chain with compacted backup and last backup intersects next chain. + {0, 10, false}, {10, 14, false}, {14, 18, false}, {10, 22, false}, + {18, 22, false}, {22, 26, false}, + }, + { + // Chain with double compacted backups + {0, 24, false}, {24, 28, false}, {28, 32, false}, {24, 36, false}, + {32, 36, false}, {36, 40, false}, {24, 44, false}, {40, 44, false}, + }, + { + // Chain with revision history + {0, 50, true}, {50, 52, true}, {52, 54, true}, {50, 56, false}, {54, 56, true}, + }, + }.writeIndexes(t, ctx, execCfg, makeExternalStorage, collectionURI) + + type output struct { + end int + rev bool + } + testcases := []struct { + name string + after, before int + expectedOutput []output + }{ + { + "simple chain/full chain inclusive", + 1, 6, + []output{{end: 6}, {end: 4}, {end: 2}}, + }, + { + "simple chain/only incs", + 3, 6, + []output{{end: 6}, {end: 4}}, + }, + { + "simple chain/one matching backup", + 3, 5, + []output{{end: 4}}, + }, + { + "compacted chain/elided duplicates", + 15, 23, + []output{{end: 22}, {end: 18}}, + }, + { + "double compacted chain/elided duplicates", + 27, 45, + []output{{end: 44}, {end: 40}, {end: 36}, {end: 32}, {end: 28}}, + }, + { + "revision history/ignore compacted", + 51, 58, + []output{{end: 56, rev: true}, {end: 54, rev: true}, {end: 52, rev: true}}, + }, + { + "collection/all backups", + 0, 56, + []output{ + {end: 56, rev: true}, {end: 54, rev: true}, {end: 52, rev: true}, {end: 50, rev: true}, + {end: 44}, {end: 40}, {end: 36}, {end: 32}, {end: 28}, {end: 26}, {end: 24}, {end: 22}, + {end: 18}, {end: 14}, {end: 10}, {end: 6}, {end: 4}, {end: 2}, + }, + }, + { + "collection/intersecting chains", + 24, 28, + []output{{end: 28}, {end: 26}, {end: 24}}, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + afterTS := hlc.Timestamp{WallTime: int64(tc.after) * 1e9}.GoTime() + beforeTS := hlc.Timestamp{WallTime: int64(tc.before) * 1e9}.GoTime() + + backups, err := ListRestorableBackups( + ctx, externalStorage, afterTS, beforeTS, + ) + require.NoError(t, err) + + actualOutput := util.Map(backups, func(b RestorableBackup) output { + return output{end: int(b.EndTime.WallTime / 1e9), rev: !b.RevisionStartTime.IsEmpty()} + }) + require.Equal(t, tc.expectedOutput, actualOutput) + }) + } +} + func TestConvertIndexSubdirToSubdir(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -839,7 +915,124 @@ func TestConvertIndexSubdirToSubdir(t *testing.T) { // stress backup's handling of nanosecond precision timestamps. We use ints to // more easily write times for test cases. func intToTimeWithNano(t int) hlc.Timestamp { + if t == 0 { + return hlc.Timestamp{} + } // Value needs to be large enough to be represented in milliseconds and be // larger than GoTime zero. return hlc.Timestamp{WallTime: int64(t)*1e9 + int64(t)} } + +// fakeBackupCollection represents a collection of backup chains. +type fakeBackupCollection []fakeBackupChain + +// fakeBackupChain represents a chain of backups. Every chain must contain a +// full backup (i.e. start == 0). Compacted backups are representing by having +// fullBackupSpecs with duplicate end times. +// +// This is used to easily create indexes that represent backup chains. +type fakeBackupChain []fakeBackupSpec + +// fakeBackupSpec represents a single backup within a backup chain. The times +// are integers that will be converted to hlc.Timestamps using +// intToTimeWithNano. +type fakeBackupSpec struct { + start, end int + revHistory bool +} + +// writeIndexes writes index metadata files for every backup in the collection. +func (c fakeBackupCollection) writeIndexes( + t *testing.T, + ctx context.Context, + execCfg *sql.ExecutorConfig, + storageFactory cloud.ExternalStorageFromURIFactory, + collectionURI string, +) { + t.Helper() + for _, chain := range c { + chain.writeIndexes(t, ctx, execCfg, storageFactory, collectionURI) + } +} + +// writeIndexes writes index metadata files for every backup in the chain. +func (c fakeBackupChain) writeIndexes( + t *testing.T, + ctx context.Context, + execCfg *sql.ExecutorConfig, + storageFactory cloud.ExternalStorageFromURIFactory, + collectionURI string, +) { + t.Helper() + sorted := slices.Clone(c) + slices.SortFunc(sorted, func(a, b fakeBackupSpec) int { + if a.end < b.end { + return -1 + } else if a.end > b.end { + return 1 + } + if a.start > b.start { + return 1 + } else { + return -1 + } + }) + if sorted[0].start != 0 { + t.Fatalf("backup chain does not contain a full backup") + } + subdir := intToTimeWithNano(sorted[0].end).GoTime().Format(backupbase.DateBasedIntoFolderName) + + // We write the indexes in a random order to test that the order they are + // written do not matter. But fulls must always be written first or else + // incrementals are not able to be written. + shuffledIdx := append([]int{0}, util.Map(rand.Perm(len(sorted)-1), func(i int) int { + return i + 1 + })...) + for _, idx := range shuffledIdx { + spec := sorted[idx] + startTS, endTS := intToTimeWithNano(spec.start), intToTimeWithNano(spec.end) + + uri, err := url.Parse(collectionURI) + require.NoError(t, err) + if spec.start != 0 { + uri.Path = path.Join( + uri.Path, + backupbase.DefaultIncrementalsSubdir, + subdir, + ConstructDateBasedIncrementalFolderName(startTS.GoTime(), endTS.GoTime()), + ) + } else { + uri.Path = path.Join(uri.Path, subdir) + } + + isCompacted := idx < len(c)-1 && sorted[idx].end == sorted[idx+1].end + revStartTS := hlc.Timestamp{} + if !isCompacted && spec.revHistory { + if startTS.IsEmpty() { + revStartTS = hlc.Timestamp{WallTime: endTS.WallTime / 2} + } else { + revStartTS = startTS + } + } + + details := jobspb.BackupDetails{ + Destination: jobspb.BackupDetails_Destination{ + To: []string{collectionURI}, + Subdir: subdir, + }, + StartTime: startTS, + EndTime: endTS, + Compact: isCompacted, + CollectionURI: collectionURI, + URI: uri.String(), + RevisionHistory: !revStartTS.IsEmpty(), + } + require.NoError( + t, + WriteBackupIndexMetadata( + ctx, execCfg, username.RootUserName(), + storageFactory, details, revStartTS, + ), + ) + } +} diff --git a/pkg/backup/backupinfo/manifest_handling.go b/pkg/backup/backupinfo/manifest_handling.go index d75e3d4ce422..54ab2297c066 100644 --- a/pkg/backup/backupinfo/manifest_handling.go +++ b/pkg/backup/backupinfo/manifest_handling.go @@ -1334,21 +1334,24 @@ func CheckForPreviousBackup( // Check for the presence of a BACKUP-LOCK file with a job ID different from // that of our job. - if err := defaultStore.List(ctx, "", backupbase.ListingDelimDataSlash, func(s string) error { - s = strings.TrimPrefix(s, "/") - if strings.HasPrefix(s, BackupLockFilePrefix) { - jobIDSuffix := strings.TrimPrefix(s, BackupLockFilePrefix) - if len(jobIDSuffix) == 0 { - return errors.AssertionFailedf("malformed BACKUP-LOCK file %s, expected a job ID suffix", s) - } - if jobIDSuffix != strconv.FormatInt(int64(jobID), 10) { - return pgerror.Newf(pgcode.FileAlreadyExists, - "%s already contains a `BACKUP-LOCK` file written by job %s", - redactedURI, jobIDSuffix) + if err := defaultStore.List( + ctx, "", cloud.ListOptions{Delimiter: backupbase.ListingDelimDataSlash}, + func(s string) error { + s = strings.TrimPrefix(s, "/") + if strings.HasPrefix(s, BackupLockFilePrefix) { + jobIDSuffix := strings.TrimPrefix(s, BackupLockFilePrefix) + if len(jobIDSuffix) == 0 { + return errors.AssertionFailedf("malformed BACKUP-LOCK file %s, expected a job ID suffix", s) + } + if jobIDSuffix != strconv.FormatInt(int64(jobID), 10) { + return pgerror.Newf(pgcode.FileAlreadyExists, + "%s already contains a `BACKUP-LOCK` file written by job %s", + redactedURI, jobIDSuffix) + } } - } - return nil - }); err != nil { + return nil + }, + ); err != nil { // HTTP external storage does not support listing, and so we skip checking // for a BACKUP-LOCK file. if !errors.Is(err, cloud.ErrListingUnsupported) { @@ -1536,7 +1539,7 @@ func readLatestCheckpointFile( // We name files such that the most recent checkpoint will always // be at the top, so just grab the first filename. - err = exportStore.List(ctx, BackupProgressDirectory, "", func(p string) error { + err = exportStore.List(ctx, BackupProgressDirectory, cloud.ListOptions{}, func(p string) error { // The first file returned by List could be either the checkpoint or // checksum file, but we are only concerned with the timestamped prefix. // We resolve if it is a checkpoint or checksum file separately below. diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go index 136a3082e8e9..fafb1813d698 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go @@ -997,7 +997,9 @@ func (n *mockSinkStorage) Writer(_ context.Context, _ string) (io.WriteCloser, e return n.writer(), nil } -func (n *mockSinkStorage) List(_ context.Context, _, _ string, _ cloud.ListingFn) error { +func (n *mockSinkStorage) List( + _ context.Context, _ string, _ cloud.ListOptions, _ cloud.ListingFn, +) error { return nil } diff --git a/pkg/ccl/workloadccl/fixture.go b/pkg/ccl/workloadccl/fixture.go index ef56baa3189d..92d0bd274967 100644 --- a/pkg/ccl/workloadccl/fixture.go +++ b/pkg/ccl/workloadccl/fixture.go @@ -694,7 +694,7 @@ func listDir( if log.V(1) { log.Dev.Infof(ctx, "Listing %s", dir) } - return es.List(ctx, dir, "/", lsFn) + return es.List(ctx, dir, cloud.ListOptions{Delimiter: "/"}, lsFn) } // ListFixtures returns the object paths to all fixtures stored in a FixtureConfig. diff --git a/pkg/cli/userfile.go b/pkg/cli/userfile.go index df1e26ad0c25..a71f6961f04f 100644 --- a/pkg/cli/userfile.go +++ b/pkg/cli/userfile.go @@ -248,7 +248,7 @@ func runUserFileGet(cmd *cobra.Command, args []string) (resErr error) { defer f.Close() var files []string - if err := f.List(ctx, "", "", func(s string) error { + if err := f.List(ctx, "", cloud.ListOptions{}, func(s string) error { if pattern != "" { if ok, err := path.Match(pattern, s); err != nil || !ok { return err @@ -442,7 +442,7 @@ func listUserFile(ctx context.Context, conn clisqlclient.Conn, glob string) ([]s displayPrefix := strings.TrimPrefix(conf.Path, "/") var res []string - if err := f.List(ctx, "", "", func(s string) error { + if err := f.List(ctx, "", cloud.ListOptions{}, func(s string) error { if pattern != "" { ok, err := path.Match(pattern, s) if err != nil || !ok { @@ -519,7 +519,7 @@ func deleteUserFile(ctx context.Context, conn clisqlclient.Conn, glob string) ([ displayRoot := strings.TrimPrefix(userFileTableConf.FileTableConfig.Path, "/") var deleted []string - if err := f.List(ctx, "", "", func(s string) error { + if err := f.List(ctx, "", cloud.ListOptions{}, func(s string) error { if pattern != "" { ok, err := path.Match(pattern, s) if err != nil || !ok { diff --git a/pkg/cloud/amazon/s3_storage.go b/pkg/cloud/amazon/s3_storage.go index d0a77f6ba8da..da322833c5ed 100644 --- a/pkg/cloud/amazon/s3_storage.go +++ b/pkg/cloud/amazon/s3_storage.go @@ -942,12 +942,15 @@ func (s *s3Storage) ReadFile( cloud.ResumingReaderRetryOnErrFnForSettings(ctx, s.settings), s3ErrDelay), fileSize, nil } -func (s *s3Storage) List(ctx context.Context, prefix, delim string, fn cloud.ListingFn) error { +func (s *s3Storage) List( + ctx context.Context, prefix string, opts cloud.ListOptions, fn cloud.ListingFn, +) error { ctx, sp := tracing.ChildSpan(ctx, "s3.List") defer sp.Finish() dest := cloud.JoinPathPreservingTrailingSlash(s.prefix, prefix) sp.SetTag("path", attribute.StringValue(dest)) + afterKey := opts.CanonicalAfterKey(s.prefix) client, err := s.getClient(ctx) if err != nil { @@ -960,9 +963,18 @@ func (s *s3Storage) List(ctx context.Context, prefix, delim string, fn cloud.Lis // s3 clones which return s3:/// as the first result of listing // s3:// to exclude that result. if envutil.EnvOrDefaultBool("COCKROACH_S3_LIST_WITH_PREFIX_SLASH_MARKER", false) { - s3Input = &s3.ListObjectsV2Input{Bucket: s.bucket, Prefix: aws.String(dest), Delimiter: nilIfEmpty(delim), StartAfter: aws.String(dest + "/")} + s3Input = &s3.ListObjectsV2Input{ + Bucket: s.bucket, + Prefix: aws.String(dest), + Delimiter: nilIfEmpty(opts.Delimiter), + StartAfter: aws.String(dest + "/"), + } } else { - s3Input = &s3.ListObjectsV2Input{Bucket: s.bucket, Prefix: aws.String(dest), Delimiter: nilIfEmpty(delim)} + s3Input = &s3.ListObjectsV2Input{ + Bucket: s.bucket, + Prefix: aws.String(dest), + Delimiter: nilIfEmpty(opts.Delimiter), + } } paginator := s3.NewListObjectsV2Paginator(client, s3Input) @@ -980,12 +992,18 @@ func (s *s3Storage) List(ctx context.Context, prefix, delim string, fn cloud.Lis } for _, x := range page.CommonPrefixes { + if *x.Prefix <= afterKey { + continue + } if err := fn(strings.TrimPrefix(*x.Prefix, dest)); err != nil { return err } } for _, fileObject := range page.Contents { + if *fileObject.Key <= afterKey { + continue + } if err := fn(strings.TrimPrefix(*fileObject.Key, dest)); err != nil { return err } diff --git a/pkg/cloud/azure/azure_file_credentials_test.go b/pkg/cloud/azure/azure_file_credentials_test.go index f5bc020347b3..f35787612af3 100644 --- a/pkg/cloud/azure/azure_file_credentials_test.go +++ b/pkg/cloud/azure/azure_file_credentials_test.go @@ -204,7 +204,7 @@ func TestAzureFileCredential(t *testing.T) { // Despite an error being produced, reloading the file will still // yield a valid token for storage access. - require.NoError(t, s.List(ctx, "/", "", func(f string) error { + require.NoError(t, s.List(ctx, "/", cloud.ListOptions{}, func(f string) error { return nil })) }) @@ -223,7 +223,7 @@ func TestAzureFileCredential(t *testing.T) { // are loaded. require.NoError(t, writeAzureCredentialsFile(credFile, cfg.tenantID, cfg.clientID, cfg.clientSecret+"garbage")) - err := s.List(ctx, "/", "", func(f string) error { + err := s.List(ctx, "/", cloud.ListOptions{}, func(f string) error { return nil }) require.ErrorContains(t, err, "authentication failed") diff --git a/pkg/cloud/azure/azure_storage.go b/pkg/cloud/azure/azure_storage.go index 0d92b6c9465e..89f9fd7e32e7 100644 --- a/pkg/cloud/azure/azure_storage.go +++ b/pkg/cloud/azure/azure_storage.go @@ -464,14 +464,17 @@ func (s *azureStorage) ReadFile( return r, r.Size, nil } -func (s *azureStorage) List(ctx context.Context, prefix, delim string, fn cloud.ListingFn) error { +func (s *azureStorage) List( + ctx context.Context, prefix string, opts cloud.ListOptions, fn cloud.ListingFn, +) error { ctx, sp := tracing.ChildSpan(ctx, "azure.List") defer sp.Finish() dest := cloud.JoinPathPreservingTrailingSlash(s.prefix, prefix) sp.SetTag("path", attribute.StringValue(dest)) + afterKey := opts.CanonicalAfterKey(s.prefix) - pager := s.container.NewListBlobsHierarchyPager(delim, &container.ListBlobsHierarchyOptions{Prefix: &dest}) + pager := s.container.NewListBlobsHierarchyPager(opts.Delimiter, &container.ListBlobsHierarchyOptions{Prefix: &dest}) for pager.More() { response, err := pager.NextPage(ctx) @@ -479,11 +482,17 @@ func (s *azureStorage) List(ctx context.Context, prefix, delim string, fn cloud. return errors.Wrap(err, "unable to list files for specified blob") } for _, blob := range response.Segment.BlobPrefixes { + if *blob.Name <= afterKey { + continue + } if err := fn(strings.TrimPrefix(*blob.Name, dest)); err != nil { return err } } for _, blob := range response.Segment.BlobItems { + if *blob.Name <= afterKey { + continue + } if err := fn(strings.TrimPrefix(*blob.Name, dest)); err != nil { return err } diff --git a/pkg/cloud/cloudtestutils/cloud_nemesis.go b/pkg/cloud/cloudtestutils/cloud_nemesis.go index d99d576570cc..9eef64e7caa6 100644 --- a/pkg/cloud/cloudtestutils/cloud_nemesis.go +++ b/pkg/cloud/cloudtestutils/cloud_nemesis.go @@ -249,7 +249,7 @@ func (c *cloudNemesis) listObjects(ctx context.Context) (err error) { before := c.snapshotObjects() listedFiles := map[string]bool{} - err = c.storage.List(ctx, "", "", func(filename string) error { + err = c.storage.List(ctx, "", cloud.ListOptions{}, func(filename string) error { listedFiles[strings.TrimPrefix(filename, "/")] = true return nil }) diff --git a/pkg/cloud/cloudtestutils/cloud_test_helpers.go b/pkg/cloud/cloudtestutils/cloud_test_helpers.go index fa05261cf805..ae21abf0d8ba 100644 --- a/pkg/cloud/cloudtestutils/cloud_test_helpers.go +++ b/pkg/cloud/cloudtestutils/cloud_test_helpers.go @@ -399,81 +399,133 @@ func CheckListFilesCanonical(t *testing.T, info StoreInfo, canonical string) { } t.Run("List", func(t *testing.T) { + // NB: When using AfterKey in tests, pick keys such that they do not exist + // in the expected output to avoid tests flaking due to different cloud + // provider behaviors around AfterKey's inclusiveness. for _, tc := range []struct { - name string - uri string - prefix string - delimiter string - expected []string + name string + uri string + prefix string + opts cloud.ListOptions + expected []string }{ { "root", info.URI, "", - "", + cloud.ListOptions{}, foreach(fileNames, func(s string) string { return "/" + s }), }, { "file-slash-numbers-slash", info.URI, "file/numbers/", - "", + cloud.ListOptions{}, []string{"data1.csv", "data2.csv", "data3.csv"}, }, { "root-slash", info.URI, "/", - "", + cloud.ListOptions{}, foreach(fileNames, func(s string) string { return s }), }, { "file", info.URI, "file", - "", + cloud.ListOptions{}, foreach(fileNames, func(s string) string { return strings.TrimPrefix(s, "file") }), }, { "file-slash", info.URI, "file/", - "", + cloud.ListOptions{}, foreach(fileNames, func(s string) string { return strings.TrimPrefix(s, "file/") }), }, { "slash-f", info.URI, "/f", - "", + cloud.ListOptions{}, foreach(fileNames, func(s string) string { return strings.TrimPrefix(s, "f") }), }, { "nothing", info.URI, "nothing", - "", + cloud.ListOptions{}, nil, }, { "delim-slash-file-slash", info.URI, "file/", - "/", + cloud.ListOptions{Delimiter: "/"}, []string{"abc/", "letters/", "numbers/"}, }, { "delim-data", info.URI, "", - "data", + cloud.ListOptions{Delimiter: "data"}, []string{"/file/abc/A.csv", "/file/abc/B.csv", "/file/abc/C.csv", "/file/letters/data", "/file/numbers/data"}, }, + { + "afterkey-no-prefix", + info.URI, + "", + cloud.ListOptions{AfterKey: "file/letters/dataB"}, + []string{"/file/letters/dataB.csv", "/file/letters/dataC.csv", "/file/numbers/data1.csv", "/file/numbers/data2.csv", "/file/numbers/data3.csv"}, + }, + { + "afterkey-with-prefix", + info.URI, + "file/letters/", + cloud.ListOptions{AfterKey: "file/letters/dataB"}, + []string{"dataB.csv", "dataC.csv"}, + }, + { + "afterkey-before-prefix", + info.URI, + "file/numbers/", + cloud.ListOptions{AfterKey: "file/abc/D"}, + []string{"data1.csv", "data2.csv", "data3.csv"}, + }, + { + "afterkey-after-prefix", + info.URI, + "file/abc/", + cloud.ListOptions{AfterKey: "file/z"}, + nil, + }, + { + "afterkey-excluded-from-results", + info.URI, + "file/abc/", + cloud.ListOptions{AfterKey: "file/abc/B.csv"}, + []string{"C.csv"}, + }, + { + "afterkey-with-delim", + info.URI, + "file/", + cloud.ListOptions{Delimiter: "/", AfterKey: "file/bar"}, + []string{"letters/", "numbers/"}, + }, + { + "afterkey-applied-after-delim-grouping", + info.URI, + "file/", + cloud.ListOptions{Delimiter: "/", AfterKey: "file/abc/B"}, + []string{"letters/", "numbers/"}, + }, } { t.Run(tc.name, func(t *testing.T) { s := storeFromURI(ctx, t, tc.uri, clientFactory, info.User, info.DB, testSettings) var actual []string - require.NoError(t, s.List(ctx, tc.prefix, tc.delimiter, func(f string) error { + require.NoError(t, s.List(ctx, tc.prefix, tc.opts, func(f string) error { actual = append(actual, f) return nil })) @@ -584,7 +636,7 @@ func CheckNoPermission(t *testing.T, info StoreInfo) { } defer s.Close() - err = s.List(ctx, "", "", nil) + err = s.List(ctx, "", cloud.ListOptions{}, nil) if err == nil { t.Fatalf("expected error when listing %s with no permissions", info.URI) } diff --git a/pkg/cloud/external_storage.go b/pkg/cloud/external_storage.go index 17d2cdec7efd..90fcb62bd1a0 100644 --- a/pkg/cloud/external_storage.go +++ b/pkg/cloud/external_storage.go @@ -77,11 +77,9 @@ type ExternalStorage interface { // List enumerates files within the supplied prefix, calling the passed // function with the name of each file found, relative to the external storage // destination's configured prefix. If the passed function returns a non-nil - // error, iteration is stopped it is returned. If delimiter is non-empty - // names which have the same prefix, prior to the delimiter, are grouped - // into a single result which is that prefix. The order that results are + // error, iteration is stopped, and it is returned. The order that results are // passed to the callback is undefined. - List(ctx context.Context, prefix, delimiter string, fn ListingFn) error + List(ctx context.Context, prefix string, opts ListOptions, fn ListingFn) error // Delete removes the named file from the store. If the file does not exist, // Delete returns nil. @@ -91,6 +89,40 @@ type ExternalStorage interface { Size(ctx context.Context, basename string) (int64, error) } +type ListOptions struct { + // If a Delimiter is set, names which have the same prefix, prior to the + // Delimiter, are grouped into a single result which is that prefix. + Delimiter string + + // When AfterKey is set, listing will only return results whose names are + // strictly lexicographically greater than AfterKey. AfterKey must be a full + // key relative to the external storage's base prefix. If a Delimiter is set, + // AfterKey filtering is applied after grouping. + // + // NB: Due to the fact that Azure does not support AfterKey semantics in + // its list API, we decided to handle all AfterKey filtering client-side + // across all external storage implementations. This gives us relatively + // comparable performance behaviors across all cloud providers, reducing our + // blind spots when it comes to testing and benchmarking. + // + // NB: If at some point Azure does add support for AfterKey, note that Cloud + // providers have different semantics for how AfterKey is applied + // (inclusive/exclusive, before/after delimiter grouping). This should be + // standardized across all implementations at that time. + AfterKey string +} + +// CanonicalAfterKey returns the canonicalized AfterKey, given an external +// storage base prefix. All cloud providers expect a full key to be provided as +// an AfterKey, so we join the base prefix of the store and AfterKey here. If no +// AfterKey is set, an empty string is returned. +func (o ListOptions) CanonicalAfterKey(prefix string) string { + if o.AfterKey == "" { + return "" + } + return JoinPathPreservingTrailingSlash(prefix, o.AfterKey) +} + type ReadOptions struct { Offset int64 diff --git a/pkg/cloud/externalconn/utils/connection_utils.go b/pkg/cloud/externalconn/utils/connection_utils.go index 5a15885f202d..ea779c1e89d0 100644 --- a/pkg/cloud/externalconn/utils/connection_utils.go +++ b/pkg/cloud/externalconn/utils/connection_utils.go @@ -48,7 +48,7 @@ func CheckExternalStorageConnection( // List the sentinel file. var foundFile bool - if err := es.List(ctx, "", "", func(s string) error { + if err := es.List(ctx, "", cloud.ListOptions{}, func(s string) error { paths := strings.Split(s, "/") s = paths[len(paths)-1] if match := strings.HasPrefix(s, markerFile); match { diff --git a/pkg/cloud/gcp/gcs_storage.go b/pkg/cloud/gcp/gcs_storage.go index e49c89afab47..0a2439e04c9f 100644 --- a/pkg/cloud/gcp/gcs_storage.go +++ b/pkg/cloud/gcp/gcs_storage.go @@ -339,13 +339,22 @@ func (g *gcsStorage) ReadFile( return r, r.Reader.(*gcs.Reader).Attrs.Size, nil } -func (g *gcsStorage) List(ctx context.Context, prefix, delim string, fn cloud.ListingFn) error { +func (g *gcsStorage) List( + ctx context.Context, prefix string, opts cloud.ListOptions, fn cloud.ListingFn, +) error { dest := cloud.JoinPathPreservingTrailingSlash(g.prefix, prefix) + afterKey := opts.CanonicalAfterKey(g.prefix) ctx, sp := tracing.ChildSpan(ctx, "gcs.List") defer sp.Finish() sp.SetTag("path", attribute.StringValue(dest)) - it := g.bucket.Objects(ctx, &gcs.Query{Prefix: dest, Delimiter: delim}) + it := g.bucket.Objects( + ctx, + &gcs.Query{ + Prefix: dest, + Delimiter: opts.Delimiter, + }, + ) for { attrs, err := it.Next() if errors.Is(err, iterator.Done) { @@ -358,6 +367,9 @@ func (g *gcsStorage) List(ctx context.Context, prefix, delim string, fn cloud.Li if name == "" { name = attrs.Prefix } + if name <= afterKey { + continue + } if err := fn(strings.TrimPrefix(name, dest)); err != nil { return err } diff --git a/pkg/cloud/httpsink/http_storage.go b/pkg/cloud/httpsink/http_storage.go index f680373b6e11..8a05994195c8 100644 --- a/pkg/cloud/httpsink/http_storage.go +++ b/pkg/cloud/httpsink/http_storage.go @@ -179,7 +179,9 @@ func (h *httpStorage) Writer(ctx context.Context, basename string) (io.WriteClos }), nil } -func (h *httpStorage) List(_ context.Context, _, _ string, _ cloud.ListingFn) error { +func (h *httpStorage) List( + _ context.Context, _ string, _ cloud.ListOptions, _ cloud.ListingFn, +) error { return errors.Mark(errors.New("http storage does not support listing"), cloud.ErrListingUnsupported) } diff --git a/pkg/cloud/impl_registry.go b/pkg/cloud/impl_registry.go index 3734101b0d6f..8b4cab70dfbf 100644 --- a/pkg/cloud/impl_registry.go +++ b/pkg/cloud/impl_registry.go @@ -414,7 +414,7 @@ func (e *esWrapper) ReadFile( return e.wrapReader(ctx, r), s, nil } -func (e *esWrapper) List(ctx context.Context, prefix, delimiter string, fn ListingFn) error { +func (e *esWrapper) List(ctx context.Context, prefix string, opts ListOptions, fn ListingFn) error { if e.httpTracer != nil { ctx = httptrace.WithClientTrace(ctx, e.httpTracer) } @@ -427,7 +427,7 @@ func (e *esWrapper) List(ctx context.Context, prefix, delimiter string, fn Listi return fn(s) } } - return e.ExternalStorage.List(ctx, prefix, delimiter, countingFn) + return e.ExternalStorage.List(ctx, prefix, opts, countingFn) } func (e *esWrapper) Writer(ctx context.Context, basename string) (io.WriteCloser, error) { diff --git a/pkg/cloud/nodelocal/nodelocal_storage.go b/pkg/cloud/nodelocal/nodelocal_storage.go index c701030f0b2d..037e22f27b58 100644 --- a/pkg/cloud/nodelocal/nodelocal_storage.go +++ b/pkg/cloud/nodelocal/nodelocal_storage.go @@ -176,9 +176,10 @@ func (l *localFileStorage) ReadFile( } func (l *localFileStorage) List( - ctx context.Context, prefix, delim string, fn cloud.ListingFn, + ctx context.Context, prefix string, opts cloud.ListOptions, fn cloud.ListingFn, ) error { dest := cloud.JoinPathPreservingTrailingSlash(l.base, prefix) + afterKey := opts.CanonicalAfterKey(l.base) res, err := l.blobClient.List(ctx, dest) if err != nil { @@ -190,15 +191,21 @@ func (l *localFileStorage) List( var prevPrefix string for _, f := range res { f = strings.TrimPrefix(f, dest) - if delim != "" { - if i := strings.Index(f, delim); i >= 0 { - f = f[:i+len(delim)] + if opts.Delimiter != "" { + if i := strings.Index(f, opts.Delimiter); i >= 0 { + f = f[:i+len(opts.Delimiter)] } if f == prevPrefix { continue } prevPrefix = f } + + // afterKey is a full key so we must compare against the full file key. + if dest+f <= afterKey { + continue + } + if err := fn(f); err != nil { return err } diff --git a/pkg/cloud/nullsink/nullsink_storage.go b/pkg/cloud/nullsink/nullsink_storage.go index 4b3dc01e79f1..30ed3f59e82d 100644 --- a/pkg/cloud/nullsink/nullsink_storage.go +++ b/pkg/cloud/nullsink/nullsink_storage.go @@ -88,7 +88,9 @@ func (n *nullSinkStorage) Writer(_ context.Context, _ string) (io.WriteCloser, e return nullWriter{}, nil } -func (n *nullSinkStorage) List(_ context.Context, _, _ string, _ cloud.ListingFn) error { +func (n *nullSinkStorage) List( + _ context.Context, _ string, _ cloud.ListOptions, _ cloud.ListingFn, +) error { return nil } diff --git a/pkg/cloud/userfile/file_table_storage.go b/pkg/cloud/userfile/file_table_storage.go index f89c7c71c762..7928480aee5a 100644 --- a/pkg/cloud/userfile/file_table_storage.go +++ b/pkg/cloud/userfile/file_table_storage.go @@ -247,9 +247,10 @@ func (f *fileTableStorage) Writer(ctx context.Context, basename string) (io.Writ // List implements the ExternalStorage interface. func (f *fileTableStorage) List( - ctx context.Context, prefix, delim string, fn cloud.ListingFn, + ctx context.Context, prefix string, opts cloud.ListOptions, fn cloud.ListingFn, ) error { dest := cloud.JoinPathPreservingTrailingSlash(f.prefix, prefix) + afterKey := opts.CanonicalAfterKey(f.prefix) res, err := f.fs.ListFiles(ctx, dest) if err != nil { @@ -260,15 +261,21 @@ func (f *fileTableStorage) List( var prevPrefix string for _, f := range res { f = strings.TrimPrefix(f, dest) - if delim != "" { - if i := strings.Index(f, delim); i >= 0 { - f = f[:i+len(delim)] + if opts.Delimiter != "" { + if i := strings.Index(f, opts.Delimiter); i >= 0 { + f = f[:i+len(opts.Delimiter)] } if f == prevPrefix { continue } prevPrefix = f } + + // afterKey is a full key so we must compare against the full file key. + if dest+f <= afterKey { + continue + } + if err := fn(f); err != nil { return err } diff --git a/pkg/cmd/roachtest/tests/cdc_helper.go b/pkg/cmd/roachtest/tests/cdc_helper.go index 2a8f67af27eb..e1f4b14421c8 100644 --- a/pkg/cmd/roachtest/tests/cdc_helper.go +++ b/pkg/cmd/roachtest/tests/cdc_helper.go @@ -93,7 +93,7 @@ func getRandomIndex(sizeOfSlice int) int { func listFilesOfTargetTable( cs cloud.ExternalStorage, selectedTargetTable string, ) (csFileNames []string, _ error) { - err := cs.List(context.Background(), "", "", func(str string) error { + err := cs.List(context.Background(), "", cloud.ListOptions{}, func(str string) error { targetTableName, err := extractTableNameFromFileName(str) if err != nil { return err diff --git a/pkg/roachprod/blobfixture/registry.go b/pkg/roachprod/blobfixture/registry.go index 46b81b463599..4b90330a6f43 100644 --- a/pkg/roachprod/blobfixture/registry.go +++ b/pkg/roachprod/blobfixture/registry.go @@ -217,23 +217,26 @@ func (r *Registry) listFixtures( } var result []FixtureMetadata - err := r.storage.List(ctx, kindPrefix /*delimiter*/, "", func(found string) error { - json, err := r.maybeReadFile(ctx, path.Join(kindPrefix, found)) - if err != nil { - return err - } - if json == nil { - return nil // Skip files that don't exist (may have been GC'd) - } + err := r.storage.List( + ctx, kindPrefix /*delimiter*/, cloud.ListOptions{}, + func(found string) error { + json, err := r.maybeReadFile(ctx, path.Join(kindPrefix, found)) + if err != nil { + return err + } + if json == nil { + return nil // Skip files that don't exist (may have been GC'd) + } - metadata := FixtureMetadata{} - if err := metadata.UnmarshalJson(json); err != nil { - return err - } + metadata := FixtureMetadata{} + if err := metadata.UnmarshalJson(json); err != nil { + return err + } - result = append(result, metadata) - return nil - }) + result = append(result, metadata) + return nil + }, + ) if err != nil { return nil, err } @@ -290,7 +293,7 @@ func (r *Registry) deleteBlobsMatchingPrefix(ctx context.Context, prefix string) // Producer goroutine g.GoCtx(func(ctx context.Context) error { defer close(paths) - err := r.storage.List(ctx, prefix, "", func(path string) error { + err := r.storage.List(ctx, prefix, cloud.ListOptions{}, func(path string) error { select { case paths <- path: return nil diff --git a/pkg/sql/bulkutil/bulk_job_cleaner_test.go b/pkg/sql/bulkutil/bulk_job_cleaner_test.go index b616b0f32e9d..35841790a4d3 100644 --- a/pkg/sql/bulkutil/bulk_job_cleaner_test.go +++ b/pkg/sql/bulkutil/bulk_job_cleaner_test.go @@ -46,7 +46,7 @@ func (m *mockStorageWithTracking) Delete(ctx context.Context, basename string) e } func (m *mockStorageWithTracking) List( - ctx context.Context, prefix, delimiter string, fn cloud.ListingFn, + ctx context.Context, prefix string, opts cloud.ListOptions, fn cloud.ListingFn, ) error { m.listed = append(m.listed, prefix) if m.listError != nil { diff --git a/pkg/sql/bulkutil/external_storage_mux.go b/pkg/sql/bulkutil/external_storage_mux.go index 9c9c790f67f8..387a1bba23bc 100644 --- a/pkg/sql/bulkutil/external_storage_mux.go +++ b/pkg/sql/bulkutil/external_storage_mux.go @@ -103,7 +103,7 @@ func (c *ExternalStorageMux) ListFiles( if err != nil { return err } - return store.List(ctx, prefix, "" /* delimiter */, fn) + return store.List(ctx, prefix, cloud.ListOptions{}, fn) } // splitURI splits a URI into its prefix (scheme + host) and path components. diff --git a/pkg/sql/importer/import_planning.go b/pkg/sql/importer/import_planning.go index 097a07f4e27e..0f9433670f24 100644 --- a/pkg/sql/importer/import_planning.go +++ b/pkg/sql/importer/import_planning.go @@ -396,7 +396,7 @@ func importPlanHook( return err } var expandedFiles []string - if err := s.List(ctx, "", "", func(s string) error { + if err := s.List(ctx, "", cloud.ListOptions{}, func(s string) error { ok, err := path.Match(pattern, s) if ok { uri.Path = prefix + s diff --git a/pkg/sql/importer/read_import_base_test.go b/pkg/sql/importer/read_import_base_test.go index 4c6f65bd6e6d..9cf0311a69a3 100644 --- a/pkg/sql/importer/read_import_base_test.go +++ b/pkg/sql/importer/read_import_base_test.go @@ -331,7 +331,7 @@ func (m *mockExternalStorage) Writer(ctx context.Context, basename string) (io.W return nil, errors.New("not implemented") } func (m *mockExternalStorage) List( - ctx context.Context, prefix, delimiter string, fn cloud.ListingFn, + ctx context.Context, prefix string, opts cloud.ListOptions, fn cloud.ListingFn, ) error { return errors.New("not implemented") } diff --git a/pkg/sql/importer/testutils_test.go b/pkg/sql/importer/testutils_test.go index 65bd3b328678..547f967c0a02 100644 --- a/pkg/sql/importer/testutils_test.go +++ b/pkg/sql/importer/testutils_test.go @@ -251,7 +251,7 @@ func (es *generatorExternalStorage) Writer( } func (es *generatorExternalStorage) List( - ctx context.Context, _, _ string, _ cloud.ListingFn, + ctx context.Context, _ string, _ cloud.ListOptions, _ cloud.ListingFn, ) error { return errors.New("unsupported") } diff --git a/pkg/storage/shared_storage.go b/pkg/storage/shared_storage.go index 563bb9c6cc16..0d47fdb57097 100644 --- a/pkg/storage/shared_storage.go +++ b/pkg/storage/shared_storage.go @@ -140,7 +140,7 @@ func (e *externalStorageWrapper) CreateObject(objName string) (io.WriteCloser, e // List implements the remote.Storage interface. func (e *externalStorageWrapper) List(prefix, delimiter string) ([]string, error) { var directoryList []string - err := e.es.List(e.ctx, prefix, delimiter, func(s string) error { + err := e.es.List(e.ctx, prefix, cloud.ListOptions{Delimiter: delimiter}, func(s string) error { directoryList = append(directoryList, s) return nil })