Skip to content

Commit 20fec10

Browse files
committed
cloud: add support for listing after a key
This patch adds an AfterKey option in ExternalStorage.List. If it is specified, only names that are lexicographically greater than AfterKey are returned. Epic: CRDB-57536 Informs: #159647 Release note: None
1 parent 093e6eb commit 20fec10

File tree

11 files changed

+149
-22
lines changed

11 files changed

+149
-22
lines changed

pkg/cloud/amazon/s3_storage.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -950,6 +950,7 @@ func (s *s3Storage) List(
950950

951951
dest := cloud.JoinPathPreservingTrailingSlash(s.prefix, prefix)
952952
sp.SetTag("path", attribute.StringValue(dest))
953+
afterKey := opts.CanonicalAfterKey(s.prefix)
953954

954955
client, err := s.getClient(ctx)
955956
if err != nil {
@@ -962,9 +963,18 @@ func (s *s3Storage) List(
962963
// s3 clones which return s3://<prefix>/ as the first result of listing
963964
// s3://<prefix> to exclude that result.
964965
if envutil.EnvOrDefaultBool("COCKROACH_S3_LIST_WITH_PREFIX_SLASH_MARKER", false) {
965-
s3Input = &s3.ListObjectsV2Input{Bucket: s.bucket, Prefix: aws.String(dest), Delimiter: nilIfEmpty(opts.Delimiter), StartAfter: aws.String(dest + "/")}
966+
s3Input = &s3.ListObjectsV2Input{
967+
Bucket: s.bucket,
968+
Prefix: aws.String(dest),
969+
Delimiter: nilIfEmpty(opts.Delimiter),
970+
StartAfter: aws.String(dest + "/"),
971+
}
966972
} else {
967-
s3Input = &s3.ListObjectsV2Input{Bucket: s.bucket, Prefix: aws.String(dest), Delimiter: nilIfEmpty(opts.Delimiter)}
973+
s3Input = &s3.ListObjectsV2Input{
974+
Bucket: s.bucket,
975+
Prefix: aws.String(dest),
976+
Delimiter: nilIfEmpty(opts.Delimiter),
977+
}
968978
}
969979

970980
paginator := s3.NewListObjectsV2Paginator(client, s3Input)
@@ -982,12 +992,18 @@ func (s *s3Storage) List(
982992
}
983993

984994
for _, x := range page.CommonPrefixes {
995+
if *x.Prefix <= afterKey {
996+
continue
997+
}
985998
if err := fn(strings.TrimPrefix(*x.Prefix, dest)); err != nil {
986999
return err
9871000
}
9881001
}
9891002

9901003
for _, fileObject := range page.Contents {
1004+
if *fileObject.Key <= afterKey {
1005+
continue
1006+
}
9911007
if err := fn(strings.TrimPrefix(*fileObject.Key, dest)); err != nil {
9921008
return err
9931009
}

pkg/cloud/azure/azure_storage.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,7 @@ func (s *azureStorage) List(
472472

473473
dest := cloud.JoinPathPreservingTrailingSlash(s.prefix, prefix)
474474
sp.SetTag("path", attribute.StringValue(dest))
475+
afterKey := opts.CanonicalAfterKey(s.prefix)
475476

476477
pager := s.container.NewListBlobsHierarchyPager(opts.Delimiter, &container.ListBlobsHierarchyOptions{Prefix: &dest})
477478
for pager.More() {
@@ -481,11 +482,17 @@ func (s *azureStorage) List(
481482
return errors.Wrap(err, "unable to list files for specified blob")
482483
}
483484
for _, blob := range response.Segment.BlobPrefixes {
485+
if *blob.Name <= afterKey {
486+
continue
487+
}
484488
if err := fn(strings.TrimPrefix(*blob.Name, dest)); err != nil {
485489
return err
486490
}
487491
}
488492
for _, blob := range response.Segment.BlobItems {
493+
if *blob.Name <= afterKey {
494+
continue
495+
}
489496
if err := fn(strings.TrimPrefix(*blob.Name, dest)); err != nil {
490497
return err
491498
}

pkg/cloud/cloudtestutils/cloud_test_helpers.go

Lines changed: 67 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -399,81 +399,133 @@ func CheckListFilesCanonical(t *testing.T, info StoreInfo, canonical string) {
399399
}
400400

401401
t.Run("List", func(t *testing.T) {
402+
// NB: When using AfterKey in tests, pick keys such that they do not exist
403+
// in the expected output to avoid tests flaking due to different cloud
404+
// provider behaviors around AfterKey's inclusiveness.
402405
for _, tc := range []struct {
403-
name string
404-
uri string
405-
prefix string
406-
delimiter string
407-
expected []string
406+
name string
407+
uri string
408+
prefix string
409+
opts cloud.ListOptions
410+
expected []string
408411
}{
409412
{
410413
"root",
411414
info.URI,
412415
"",
413-
"",
416+
cloud.ListOptions{},
414417
foreach(fileNames, func(s string) string { return "/" + s }),
415418
},
416419
{
417420
"file-slash-numbers-slash",
418421
info.URI,
419422
"file/numbers/",
420-
"",
423+
cloud.ListOptions{},
421424
[]string{"data1.csv", "data2.csv", "data3.csv"},
422425
},
423426
{
424427
"root-slash",
425428
info.URI,
426429
"/",
427-
"",
430+
cloud.ListOptions{},
428431
foreach(fileNames, func(s string) string { return s }),
429432
},
430433
{
431434
"file",
432435
info.URI,
433436
"file",
434-
"",
437+
cloud.ListOptions{},
435438
foreach(fileNames, func(s string) string { return strings.TrimPrefix(s, "file") }),
436439
},
437440
{
438441
"file-slash",
439442
info.URI,
440443
"file/",
441-
"",
444+
cloud.ListOptions{},
442445
foreach(fileNames, func(s string) string { return strings.TrimPrefix(s, "file/") }),
443446
},
444447
{
445448
"slash-f",
446449
info.URI,
447450
"/f",
448-
"",
451+
cloud.ListOptions{},
449452
foreach(fileNames, func(s string) string { return strings.TrimPrefix(s, "f") }),
450453
},
451454
{
452455
"nothing",
453456
info.URI,
454457
"nothing",
455-
"",
458+
cloud.ListOptions{},
456459
nil,
457460
},
458461
{
459462
"delim-slash-file-slash",
460463
info.URI,
461464
"file/",
462-
"/",
465+
cloud.ListOptions{Delimiter: "/"},
463466
[]string{"abc/", "letters/", "numbers/"},
464467
},
465468
{
466469
"delim-data",
467470
info.URI,
468471
"",
469-
"data",
472+
cloud.ListOptions{Delimiter: "data"},
470473
[]string{"/file/abc/A.csv", "/file/abc/B.csv", "/file/abc/C.csv", "/file/letters/data", "/file/numbers/data"},
471474
},
475+
{
476+
"afterkey-no-prefix",
477+
info.URI,
478+
"",
479+
cloud.ListOptions{AfterKey: "file/letters/dataB"},
480+
[]string{"/file/letters/dataB.csv", "/file/letters/dataC.csv", "/file/numbers/data1.csv", "/file/numbers/data2.csv", "/file/numbers/data3.csv"},
481+
},
482+
{
483+
"afterkey-with-prefix",
484+
info.URI,
485+
"file/letters/",
486+
cloud.ListOptions{AfterKey: "file/letters/dataB"},
487+
[]string{"dataB.csv", "dataC.csv"},
488+
},
489+
{
490+
"afterkey-before-prefix",
491+
info.URI,
492+
"file/numbers/",
493+
cloud.ListOptions{AfterKey: "file/abc/D"},
494+
[]string{"data1.csv", "data2.csv", "data3.csv"},
495+
},
496+
{
497+
"afterkey-after-prefix",
498+
info.URI,
499+
"file/abc/",
500+
cloud.ListOptions{AfterKey: "file/z"},
501+
nil,
502+
},
503+
{
504+
"afterkey-excluded-from-results",
505+
info.URI,
506+
"file/abc/",
507+
cloud.ListOptions{AfterKey: "file/abc/B.csv"},
508+
[]string{"C.csv"},
509+
},
510+
{
511+
"afterkey-with-delim",
512+
info.URI,
513+
"file/",
514+
cloud.ListOptions{Delimiter: "/", AfterKey: "file/bar"},
515+
[]string{"letters/", "numbers/"},
516+
},
517+
{
518+
"afterkey-applied-after-delim-grouping",
519+
info.URI,
520+
"file/",
521+
cloud.ListOptions{Delimiter: "/", AfterKey: "file/abc/B"},
522+
[]string{"letters/", "numbers/"},
523+
},
472524
} {
473525
t.Run(tc.name, func(t *testing.T) {
474526
s := storeFromURI(ctx, t, tc.uri, clientFactory, info.User, info.DB, testSettings)
475527
var actual []string
476-
require.NoError(t, s.List(ctx, tc.prefix, cloud.ListOptions{Delimiter: tc.delimiter}, func(f string) error {
528+
require.NoError(t, s.List(ctx, tc.prefix, tc.opts, func(f string) error {
477529
actual = append(actual, f)
478530
return nil
479531
}))

pkg/cloud/external_storage.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,34 @@ type ListOptions struct {
9393
// If a Delimiter is set, names which have the same prefix, prior to the
9494
// Delimiter, are grouped into a single result which is that prefix.
9595
Delimiter string
96+
97+
// When AfterKey is set, listing will only return results whose names are
98+
// strictly lexicographically greater than AfterKey. AfterKey must be a full
99+
// key relative to the external storage's base prefix. If a Delimiter is set,
100+
// AfterKey filtering is applied after grouping.
101+
//
102+
// NB: Due to the fact that Azure does not support AfterKey semantics in
103+
// its list API, we decided to handle all AfterKey filtering client-side
104+
// across all external storage implementations. This gives us relatively
105+
// comparable performance behaviors across all cloud providers, reducing our
106+
// blind spots when it comes to testing and benchmarking.
107+
//
108+
// NB: If at some point Azure does add support for AfterKey, note that Cloud
109+
// providers have different semantics for how AfterKey is applied
110+
// (inclusive/exclusive, before/after delimiter grouping). This should be
111+
// standardized across all implementations at that time.
112+
AfterKey string
113+
}
114+
115+
// CanonicalAfterKey returns the canonicalized AfterKey, given an external
116+
// storage base prefix. All cloud providers expect a full key to be provided as
117+
// an AfterKey, so we join the base prefix of the store and AfterKey here. If no
118+
// AfterKey is set, an empty string is returned.
119+
func (o ListOptions) CanonicalAfterKey(prefix string) string {
120+
if o.AfterKey == "" {
121+
return ""
122+
}
123+
return JoinPathPreservingTrailingSlash(prefix, o.AfterKey)
96124
}
97125

98126
type ReadOptions struct {

pkg/cloud/gcp/gcs_storage.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -343,11 +343,18 @@ func (g *gcsStorage) List(
343343
ctx context.Context, prefix string, opts cloud.ListOptions, fn cloud.ListingFn,
344344
) error {
345345
dest := cloud.JoinPathPreservingTrailingSlash(g.prefix, prefix)
346+
afterKey := opts.CanonicalAfterKey(g.prefix)
346347
ctx, sp := tracing.ChildSpan(ctx, "gcs.List")
347348
defer sp.Finish()
348349
sp.SetTag("path", attribute.StringValue(dest))
349350

350-
it := g.bucket.Objects(ctx, &gcs.Query{Prefix: dest, Delimiter: opts.Delimiter})
351+
it := g.bucket.Objects(
352+
ctx,
353+
&gcs.Query{
354+
Prefix: dest,
355+
Delimiter: opts.Delimiter,
356+
},
357+
)
351358
for {
352359
attrs, err := it.Next()
353360
if errors.Is(err, iterator.Done) {
@@ -360,6 +367,9 @@ func (g *gcsStorage) List(
360367
if name == "" {
361368
name = attrs.Prefix
362369
}
370+
if name <= afterKey {
371+
continue
372+
}
363373
if err := fn(strings.TrimPrefix(name, dest)); err != nil {
364374
return err
365375
}

pkg/cloud/nodelocal/nodelocal_storage.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ func (l *localFileStorage) List(
179179
ctx context.Context, prefix string, opts cloud.ListOptions, fn cloud.ListingFn,
180180
) error {
181181
dest := cloud.JoinPathPreservingTrailingSlash(l.base, prefix)
182+
afterKey := opts.CanonicalAfterKey(l.base)
182183

183184
res, err := l.blobClient.List(ctx, dest)
184185
if err != nil {
@@ -199,6 +200,12 @@ func (l *localFileStorage) List(
199200
}
200201
prevPrefix = f
201202
}
203+
204+
// afterKey is a full key so we must compare against the full file key.
205+
if dest+f <= afterKey {
206+
continue
207+
}
208+
202209
if err := fn(f); err != nil {
203210
return err
204211
}

pkg/cloud/userfile/file_table_storage.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ func (f *fileTableStorage) List(
250250
ctx context.Context, prefix string, opts cloud.ListOptions, fn cloud.ListingFn,
251251
) error {
252252
dest := cloud.JoinPathPreservingTrailingSlash(f.prefix, prefix)
253+
afterKey := opts.CanonicalAfterKey(f.prefix)
253254

254255
res, err := f.fs.ListFiles(ctx, dest)
255256
if err != nil {
@@ -269,6 +270,12 @@ func (f *fileTableStorage) List(
269270
}
270271
prevPrefix = f
271272
}
273+
274+
// afterKey is a full key so we must compare against the full file key.
275+
if dest+f <= afterKey {
276+
continue
277+
}
278+
272279
if err := fn(f); err != nil {
273280
return err
274281
}

pkg/cmd/roachtest/tests/cdc_helper.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func getRandomIndex(sizeOfSlice int) int {
9393
func listFilesOfTargetTable(
9494
cs cloud.ExternalStorage, selectedTargetTable string,
9595
) (csFileNames []string, _ error) {
96-
err := cs.List(context.Background(), "", "", func(str string) error {
96+
err := cs.List(context.Background(), "", cloud.ListOptions{}, func(str string) error {
9797
targetTableName, err := extractTableNameFromFileName(str)
9898
if err != nil {
9999
return err

pkg/sql/bulkutil/bulk_job_cleaner_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func (m *mockStorageWithTracking) Delete(ctx context.Context, basename string) e
4646
}
4747

4848
func (m *mockStorageWithTracking) List(
49-
ctx context.Context, prefix, delimiter string, fn cloud.ListingFn,
49+
ctx context.Context, prefix string, opts cloud.ListOptions, fn cloud.ListingFn,
5050
) error {
5151
m.listed = append(m.listed, prefix)
5252
if m.listError != nil {

pkg/sql/bulkutil/external_storage_mux.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func (c *ExternalStorageMux) ListFiles(
103103
if err != nil {
104104
return err
105105
}
106-
return store.List(ctx, prefix, "" /* delimiter */, fn)
106+
return store.List(ctx, prefix, cloud.ListOptions{}, fn)
107107
}
108108

109109
// splitURI splits a URI into its prefix (scheme + host) and path components.

0 commit comments

Comments
 (0)