fix(store-gateway): run all index header bucket reads through GetRange interface#15024
Conversation
| g.Go(func() error { | ||
| return ensureIndexHeaderOnDisk(ctx, id, bkt, localDir, logger) | ||
| }) |
There was a problem hiding this comment.
This was incorrectly introduced in the big refactor - we always want to download the index-header on lazy reader init, like on startup so we don't have to do it later when the StreamBinaryReader is initialized and put that latency into the query path.
There was a problem hiding this comment.
Huh, will this particular fix need a backport?
There was a problem hiding this comment.
It didn't actually cause any noticeable problems when running, and it's an experiment feature so I figured no.
|
Borked the git history on the previous PR, so moving here @chencs |
3adb520 to
04c303a
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
Autofix Details
Bugbot Autofix prepared fixes for both issues found in the latest run.
- ✅ Fixed: Mutex not unlocked on Read error causes deadlock
- Added the missing unlock on the metadata read error path so failed reads no longer leave the factory mutex locked.
- ✅ Fixed: Missing cleanup leaks pooled buffer on Skip error
- Added deferred reader cleanup in
NewDecbufAtCheckedso the pooled bucket buffer is always returned whenSkipfails before Decbuf ownership is transferred.
- Added deferred reader cleanup in
Or push these changes by commenting:
@cursor push 3fab84b92c
Preview (3fab84b92c)
diff --git a/pkg/storage/indexheader/encoding/bucket_factory.go b/pkg/storage/indexheader/encoding/bucket_factory.go
--- a/pkg/storage/indexheader/encoding/bucket_factory.go
+++ b/pkg/storage/indexheader/encoding/bucket_factory.go
@@ -57,6 +57,7 @@
lengthBytes := make([]byte, numLenBytes)
n, err := metaReader.Read(lengthBytes)
if err != nil {
+ bf.mu.Unlock()
return Decbuf{E: err}
}
if n != numLenBytes {
@@ -71,12 +72,19 @@
bufferLength := numLenBytes + contentLength + crc32.Size
bufReader := NewBucketBufReader(bf.ctx, bf.bkt, bf.objectPath, offset, bufferLength)
+ closeReader := true
+ defer func() {
+ if closeReader {
+ _ = bufReader.Close()
+ }
+ }()
// bufReader is expected start at base offset + 4 after consuming length bytes
err = bufReader.Skip(numLenBytes)
if err != nil {
return Decbuf{E: err}
}
d := Decbuf{r: bufReader}
+ closeReader = false
if table != nil {
if d.CheckCrc32(table); d.Err() != nil {
diff --git a/pkg/storage/indexheader/encoding/factory_test.go b/pkg/storage/indexheader/encoding/factory_test.go
--- a/pkg/storage/indexheader/encoding/factory_test.go
+++ b/pkg/storage/indexheader/encoding/factory_test.go
@@ -3,13 +3,19 @@
package encoding
import (
+ "bufio"
+ "bytes"
"context"
"encoding/binary"
"fmt"
"hash/crc32"
+ "io"
"os"
"path"
+ "runtime"
+ "sync"
"testing"
+ "time"
"github.com/prometheus/client_golang/prometheus"
promencoding "github.com/prometheus/prometheus/tsdb/encoding"
@@ -160,6 +166,64 @@
})
}
+func TestBucketDecbufFactory_NewDecbufAtChecked_ReadErrorDoesNotDeadlock(t *testing.T) {
+ t.Parallel()
+
+ readErr := fmt.Errorf("read failed")
+ factory := NewBucketDecbufFactory(context.Background(), &errorOnGetRangeBucket{err: readErr}, "obj")
+
+ d := factory.NewDecbufAtChecked(0, nil)
+ require.ErrorIs(t, d.Err(), readErr)
+
+ done := make(chan struct{})
+ go func() {
+ defer close(done)
+ d = factory.NewDecbufAtChecked(0, nil)
+ require.ErrorIs(t, d.Err(), readErr)
+ }()
+
+ select {
+ case <-done:
+ case <-time.After(time.Second):
+ t.Fatal("second call timed out waiting for lock")
+ }
+}
+
+func TestBucketDecbufFactory_NewDecbufAtChecked_SkipErrorReturnsReaderToPool(t *testing.T) {
+ t.Parallel()
+
+ prevPool := bucketBufPool
+ defer func() {
+ bucketBufPool = prevPool
+ }()
+
+ prevProcs := runtime.GOMAXPROCS(1)
+ defer runtime.GOMAXPROCS(prevProcs)
+
+ pooledReader := bufio.NewReaderSize(nil, 16)
+ bucketBufPool = sync.Pool{
+ New: func() any {
+ t.Fatal("unexpected bucket buffer allocation")
+ return bufio.NewReader(nil)
+ },
+ }
+ bucketBufPool.Put(pooledReader)
+
+ contentLength := make([]byte, numLenBytes)
+ binary.BigEndian.PutUint32(contentLength, 8)
+ factory := NewBucketDecbufFactory(context.Background(), &failAfterFirstGetRangeBucket{
+ metaBytes: contentLength,
+ err: fmt.Errorf("skip read failed"),
+ }, "obj")
+
+ d := factory.NewDecbufAtChecked(0, nil)
+ require.ErrorContains(t, d.Err(), "skip read failed")
+
+ // A second call reuses the same pooled buffer, proving the first call closed the reader on error.
+ d = factory.NewDecbufAtChecked(0, nil)
+ require.ErrorContains(t, d.Err(), "skip read failed")
+}
+
func TestDecbufFactory_NewDecbufAtUnchecked_HappyPath(t *testing.T) {
enc := createTestEncoder(testContentSize)
enc.PutHash(crc32.New(table))
@@ -271,3 +335,87 @@
return diskFactory, bucketFactory
}
+
+type errorOnGetRangeBucket struct {
+ err error
+}
+
+func (b *errorOnGetRangeBucket) GetRange(_ context.Context, _ string, _, _ int64) (io.ReadCloser, error) {
+ return nil, b.err
+}
+
+func (b *errorOnGetRangeBucket) Get(_ context.Context, _ string) (io.ReadCloser, error) {
+ return nil, b.err
+}
+
+func (b *errorOnGetRangeBucket) Iter(_ context.Context, _ string, _ func(string) error, _ ...objstore.IterOption) error {
+ return b.err
+}
+
+func (b *errorOnGetRangeBucket) IterWithAttributes(_ context.Context, _ string, _ func(objstore.IterObjectAttributes) error, _ ...objstore.IterOption) error {
+ return b.err
+}
+
+func (b *errorOnGetRangeBucket) SupportedIterOptions() []objstore.IterOptionType { return nil }
+
+func (b *errorOnGetRangeBucket) Exists(_ context.Context, _ string) (bool, error) {
+ return false, b.err
+}
+
+func (b *errorOnGetRangeBucket) Attributes(_ context.Context, _ string) (objstore.ObjectAttributes, error) {
+ return objstore.ObjectAttributes{Size: 64}, nil
+}
+
+func (b *errorOnGetRangeBucket) IsObjNotFoundErr(_ error) bool { return false }
+func (b *errorOnGetRangeBucket) IsAccessDeniedErr(_ error) bool { return false }
+func (b *errorOnGetRangeBucket) Name() string { return "error-on-get-range" }
+func (b *errorOnGetRangeBucket) Close() error { return nil }
+
+func (b *errorOnGetRangeBucket) ReaderWithExpectedErrs(_ objstore.IsOpFailureExpectedFunc) objstore.BucketReader {
+ return b
+}
+
+type failAfterFirstGetRangeBucket struct {
+ metaBytes []byte
+ err error
+ calls int
+}
+
+func (b *failAfterFirstGetRangeBucket) GetRange(_ context.Context, _ string, _, _ int64) (io.ReadCloser, error) {
+ b.calls++
+ if b.calls == 1 {
+ return io.NopCloser(bytes.NewReader(b.metaBytes)), nil
+ }
+ return nil, b.err
+}
+
+func (b *failAfterFirstGetRangeBucket) Get(_ context.Context, _ string) (io.ReadCloser, error) {
+ return nil, b.err
+}
+
+func (b *failAfterFirstGetRangeBucket) Iter(_ context.Context, _ string, _ func(string) error, _ ...objstore.IterOption) error {
+ return b.err
+}
+
+func (b *failAfterFirstGetRangeBucket) IterWithAttributes(_ context.Context, _ string, _ func(objstore.IterObjectAttributes) error, _ ...objstore.IterOption) error {
+ return b.err
+}
+
+func (b *failAfterFirstGetRangeBucket) SupportedIterOptions() []objstore.IterOptionType { return nil }
+
+func (b *failAfterFirstGetRangeBucket) Exists(_ context.Context, _ string) (bool, error) {
+ return false, b.err
+}
+
+func (b *failAfterFirstGetRangeBucket) Attributes(_ context.Context, _ string) (objstore.ObjectAttributes, error) {
+ return objstore.ObjectAttributes{Size: 64}, nil
+}
+
+func (b *failAfterFirstGetRangeBucket) IsObjNotFoundErr(_ error) bool { return false }
+func (b *failAfterFirstGetRangeBucket) IsAccessDeniedErr(_ error) bool { return false }
+func (b *failAfterFirstGetRangeBucket) Name() string { return "fail-after-first-get-range" }
+func (b *failAfterFirstGetRangeBucket) Close() error { return nil }
+
+func (b *failAfterFirstGetRangeBucket) ReaderWithExpectedErrs(_ objstore.IsOpFailureExpectedFunc) objstore.BucketReader {
+ return b
+}You can send follow-ups to the cloud agent here.
| return bbr.ResetAt(0) | ||
| } | ||
|
|
||
| func (bbr *BucketBufReader) ResetAt(off int) error { |
There was a problem hiding this comment.
We had logic to avoid unnecessary buffer discard in the original streamReader here
if dist := off - r.pos; dist > 0 && dist < r.Buffered() {
// skip ahead by discarding the distance bytes
return r.Skip(dist)
}
Shouldn't we do something similar here or are you explicitly resetting every time for simplicity?
There was a problem hiding this comment.
We actually do not need it because the BufReader.ResetAt is only called by the Decbuf.ResetAt which handles it already (double-check my work):
// ResetAt sets the pointer of the underlying BufReader to the absolute
// offset and discards any buffered data. If E is non-nil, this method has
// no effect. ResetAt-ing beyond the end of the underlying BufReader will set
// E to an error and not advance the pointer of BufReader.
func (d *Decbuf) ResetAt(off int) {
if d.E != nil {
return
}
// If we are trying to reset at an offset which is already buffered,
// we can avoid resetting the BufReader and just discard some of the buffer instead.
if dist := off - d.Offset(); dist >= 0 && dist < d.r.Buffered() {
d.E = d.r.Skip(dist)
return
}
d.E = d.r.ResetAt(off)
}I also added a warning to the Reader interface about this method:
// CAUTION: This operation may be very expensive and result in the discard of buffered data.
// Use Skip to move forward to avoid unnecessary buffer discard.
// ResetAt should only be used to move backwards.Which mirrors the Decbuf dosctring warning. The Decbuf docstring is actually a bit overly-cautious since it does try to skip although it says discards any buffered data.
There was a problem hiding this comment.
Yeah, I got here by reading that CAUTION comment, because I think the phrasing of that comment could be more specific. "Expensive" isn't entirely clear (why is it expensive?), and "to avoid unnecessary buffer discard" is inconsistently true -- other BufReader implementations do avoid unnecessary buffer discard where possible.
Since this is a public method, I wouldn't assume that all possible ResetAt callers of this will do that check ahead of time, even if that's the current state. It does feel weird that the different implementations of BufReader.ResetAt could reasonably have consistent behavior, but don't.
There was a problem hiding this comment.
I am fine with putting it back in the PR. Maybe we can revisit once changes are made to how it's called.
I lean towards the philosophy that the caller should use the interface correctly and not rely on several layers of defensive programming, but it's not going to keep me up at night.
There was a problem hiding this comment.
^ This has been re-instated
|
|
||
| resetReader := func(off int) error { | ||
| r := NewBucketReader(ctx, bkt, name, base, length) | ||
| _, err := r.Seek(int64(off), io.SeekStart) |
There was a problem hiding this comment.
Does this need to be
| _, err := r.Seek(int64(off), io.SeekStart) | |
| _, err := r.Seek(int64(base+off), io.SeekStart) |
?
There was a problem hiding this comment.
I will double-check with a test and report back, but I am pretty sure no - the BucketReader created seeks relative to its own base offset, which is already equal to base.
There was a problem hiding this comment.
Going back end refreshing how/why I did this, I followed implementations for the Go Seek interface, but only supporting io.SeekStart, which does everything relative to base offset, so the operation is just to set the reader offset equal to the off arg.
Example of the Go implementation for bytes.Reader - it's the same for io.SeekStart case
// Seek implements the [io.Seeker] interface.
func (r *Reader) Seek(offset int64, whence int) (int64, error) {
r.prevRune = -1
var abs int64
switch whence {
case io.SeekStart:
abs = offset
case io.SeekCurrent:
abs = r.i + offset
case io.SeekEnd:
abs = int64(len(r.s)) + offset
default:
return 0, errors.New("bytes.Reader.Seek: invalid whence")
}
if abs < 0 {
return 0, errors.New("bytes.Reader.Seek: negative position")
}
r.i = abs
return abs, nil
}There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
Autofix Details
Bugbot Autofix prepared fixes for both issues found in the latest run.
- ✅ Fixed: Test resetReader diverges from production resetReader behavior
- Updated the test helper reset path to recreate the reader at the original base and apply Seek(off) so remaining-length behavior matches production.
- ✅ Fixed: Unnecessary Attributes call when section length is cached
- Moved the Attributes lookup and object-size validation into the cache-miss branch so cached section-length reads avoid extra metadata round-trips.
Or push these changes by commenting:
@cursor push 2fe8a13b88
Preview (2fe8a13b88)
diff --git a/pkg/storage/indexheader/encoding/bucket_factory.go b/pkg/storage/indexheader/encoding/bucket_factory.go
--- a/pkg/storage/indexheader/encoding/bucket_factory.go
+++ b/pkg/storage/indexheader/encoding/bucket_factory.go
@@ -34,20 +34,22 @@
}
func (bf *BucketDecbufFactory) NewDecbufAtChecked(offset int, table *crc32.Table) Decbuf {
- attrs, err := bf.bkt.Attributes(bf.ctx, bf.objectPath)
- if err != nil {
- return Decbuf{E: fmt.Errorf("get size from %s: %w", bf.objectPath, err)}
- }
- if offset > int(attrs.Size) {
- return Decbuf{E: fmt.Errorf("offset greater than object size of %s", bf.objectPath)}
- }
-
var contentLength int
bf.mu.Lock()
if cachedContentLength, ok := bf.sectionLenCache[offset]; ok {
// Section length is cached
contentLength = cachedContentLength
} else {
+ attrs, err := bf.bkt.Attributes(bf.ctx, bf.objectPath)
+ if err != nil {
+ bf.mu.Unlock()
+ return Decbuf{E: fmt.Errorf("get size from %s: %w", bf.objectPath, err)}
+ }
+ if offset > int(attrs.Size) {
+ bf.mu.Unlock()
+ return Decbuf{E: fmt.Errorf("offset greater than object size of %s", bf.objectPath)}
+ }
+
// We do not know section length yet;
// use the lower-level BucketReader to scan the length data
metaReader := NewBucketReader(
@@ -73,8 +75,7 @@
bufReader := NewBucketBufReader(bf.ctx, bf.bkt, bf.objectPath, offset, bufferLength)
// bufReader is expected start at base offset + 4 after consuming length bytes
- err = bufReader.Skip(numLenBytes)
- if err != nil {
+ if err := bufReader.Skip(numLenBytes); err != nil {
_ = bufReader.Close()
return Decbuf{E: err}
}
diff --git a/pkg/storage/indexheader/encoding/bucket_reader_test.go b/pkg/storage/indexheader/encoding/bucket_reader_test.go
--- a/pkg/storage/indexheader/encoding/bucket_reader_test.go
+++ b/pkg/storage/indexheader/encoding/bucket_reader_test.go
@@ -54,7 +54,11 @@
}
resetReader := func(off int) error {
- r := NewBucketReader(ctx, bkt, testBucketObjectName, base+off, length)
+ r := NewBucketReader(ctx, bkt, testBucketObjectName, base, length)
+ _, err := r.Seek(int64(off), io.SeekStart)
+ if err != nil {
+ return err
+ }
bufReader.r = r
return nil
}You can send follow-ups to the cloud agent here.
| } | ||
|
|
||
| closeReader := true | ||
| defer func() { |
There was a problem hiding this comment.
Unnecessary Attributes call when section length is cached
Medium Severity
NewDecbufAtChecked calls bf.bkt.Attributes() on every invocation, even when the section length is already cached in sectionLenCache. On cache hit, attrs is only used for offset validation — the metaReader creation that actually needs attrs.Size is skipped. This adds an unnecessary bucket metadata round-trip per cached read, undermining the PR's goal of reducing bucket operations.
Reviewed by Cursor Bugbot for commit 29346f6. Configure here.
There was a problem hiding this comment.
Going to ignore this for now:
- We have still reduced total calls in this PR
- These calls are cached by the metadata cache
- It will be a cleaner solution once we can use TOC to initialize decbufs
…rod and test impl
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
There are 3 total unresolved issues (including 1 from previous review).
Bugbot Autofix prepared fixes for both issues found in the latest run.
- ✅ Fixed: Close returns buffer to wrong pool
- BucketBufReader now stores the pool passed to newBucketBufReader and Close returns the buffer to that same pool instead of the global one.
- ✅ Fixed: Shared bucket closed multiple times in benchmark cleanup
- The inner-loop benchmark cleanup that re-closed the shared bucket was removed so the bucket is only closed by the existing cleanup from labelValuesTestCases.
Or push these changes by commenting:
@cursor push d26602a878
Preview (d26602a878)
diff --git a/pkg/storage/indexheader/encoding/bucket_reader.go b/pkg/storage/indexheader/encoding/bucket_reader.go
--- a/pkg/storage/indexheader/encoding/bucket_reader.go
+++ b/pkg/storage/indexheader/encoding/bucket_reader.go
@@ -80,6 +80,7 @@
type BucketBufReader struct {
ctx context.Context
+ bufioPool *sync.Pool
bkt objstore.BucketReader
name string
base int
@@ -116,13 +117,14 @@
bufioReader.Reset(reader)
bufReader := &BucketBufReader{
- ctx: ctx,
- bkt: bkt,
- name: name,
- base: base,
- length: length,
- r: reader,
- buf: bufioReader,
+ ctx: ctx,
+ bufioPool: bufioPool,
+ bkt: bkt,
+ name: name,
+ base: base,
+ length: length,
+ r: reader,
+ buf: bufioReader,
}
bufReader.resetReader = resetReaderFunc(bufReader)
@@ -226,7 +228,7 @@
func (bbr *BucketBufReader) Close() error {
// Note that we don't do anything to clean up the buffer before returning it to the pool here:
// we reset the buffer when we retrieve it from the pool instead.
- bucketBufPool.Put(bbr.buf)
+ bbr.bufioPool.Put(bbr.buf)
// The BucketReader does not need closed -
// it closes the reader generated from bkt.GetRange on each Read call.
return nil
diff --git a/pkg/storage/indexheader/encoding/bucket_reader_test.go b/pkg/storage/indexheader/encoding/bucket_reader_test.go
--- a/pkg/storage/indexheader/encoding/bucket_reader_test.go
+++ b/pkg/storage/indexheader/encoding/bucket_reader_test.go
@@ -50,13 +50,14 @@
bufioReader := testBucketBufPool.Get().(*bufio.Reader)
bufioReader.Reset(reader)
return &BucketBufReader{
- ctx: ctx,
- bkt: bkt,
- name: "obj",
- base: 0,
- length: 10,
- r: reader,
- buf: bufioReader,
+ ctx: ctx,
+ bufioPool: &testBucketBufPool,
+ bkt: bkt,
+ name: "obj",
+ base: 0,
+ length: 10,
+ r: reader,
+ buf: bufioReader,
}
}
diff --git a/pkg/storage/indexheader/reader_benchmarks_test.go b/pkg/storage/indexheader/reader_benchmarks_test.go
--- a/pkg/storage/indexheader/reader_benchmarks_test.go
+++ b/pkg/storage/indexheader/reader_benchmarks_test.go
@@ -272,9 +272,6 @@
for _, tc := range sortedTest.tcs {
bucketReg := prometheus.NewPedanticRegistry()
instrBkt := objstore.WrapWithMetrics(objstore.WithNoopInstr(bkt), prometheus.WrapRegistererWithPrefix("thanos_", bucketReg), "")
- b.Cleanup(func() {
- require.NoError(b, bkt.Close())
- })
// Initialize the first index-header reader,
// configured to read all index-header sections from the on-disk index-header.You can send follow-ups to the cloud agent here.
Reviewed by Cursor Bugbot for commit 73ed6f5. Configure here.
|
I believe all feedback is addressed |
chencs
left a comment
There was a problem hiding this comment.
LGTM! Thanks for addressing all the feedback 🙂



What this PR does
Previous Bucket-based decbuf implementation did not send all of its reads through the thanos objstore
GetRangeinterface - we fix this for two reasons:GetRangeinterface can be counted on to track bucket operation metrics correctlyGetRangeinterface is required to use our bucket reads cachingThis PR also now caches the Postings Offsets section length in the
BucketDecBufFactoryso after the first read it will make one lessGetRangecall each time it is initialized.Other minor changes:
factory.go, had the interface definition and the file-backed factory, while the bucket-backed factory was separate. I followed the pattern in the package for thre readers, where the interface gets its own file and then each impl gets its own file.numLenBytesisntead of writing4everywhereWhich issue(s) this PR fixes or relates to
Fixes #
Checklist
CHANGELOG.mdupdated - the order of entries should be[CHANGE],[FEATURE],[ENHANCEMENT],[BUGFIX]. If changelog entry is not needed, please add thechangelog-not-neededlabel to the PR.about-versioning.mdupdated with experimental features.Note
Medium Risk
Touches low-level index-header reading/rewind behavior and request patterns against object storage; regressions could surface as incorrect reads, CRC failures, or increased bucket calls under certain access patterns.
Overview
Refactors bucket-backed index-header decoding to consistently use
objstore.BucketReader.GetRangeby replacing the bespokestreamReaderlogic with newBucketReader/BucketBufReaderimplementations, enabling accurate bucket metrics and compatibility with bucket-cache behavior.BucketDecbufFactorynow reads section length via a smallGetRange-based reader and caches section lengths by offset to avoid repeated length-prefix fetches; the file-backed factory is moved intofile_factory.goand both paths use a sharednumLenBytesconstant. Also removes the conditional that skippedensureIndexHeaderOnDiskwhen bucket reading was enabled, and updates benchmarks to run deterministically by sorting test cases; adds comprehensive unit tests for the new bucket readers.Reviewed by Cursor Bugbot for commit c91b282. Bugbot is set up for automated code reviews on this repo. Configure here.