Skip to content
254 changes: 51 additions & 203 deletions pkg/storage/indexheader/encoding/bucket_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,23 @@
package encoding

import (
"bufio"
"context"
"encoding/binary"
"errors"
"fmt"
"hash/crc32"
"io"
"sync"

"github.com/thanos-io/objstore"
)

// BucketDecbufFactory creates new bucket-reader-backed Decbuf instances
// for a specific index-header file in object storage
// for a specific index file in object storage
type BucketDecbufFactory struct {
ctx context.Context
bkt objstore.BucketReader
objectPath string // Path to index file in bucket
ctx context.Context
bkt objstore.BucketReader
objectPath string // Path to index file in bucket
sectionLenCache map[int]int
mu sync.Mutex
}

// NewBucketDecbufFactory creates a new BucketDecbufFactory for the given object path.
Expand All @@ -29,60 +28,65 @@ func NewBucketDecbufFactory(ctx context.Context, bkt objstore.BucketReader, obje
ctx: ctx,
bkt: bkt,
objectPath: objectPath,
// Allocate cache to hold the start offsets of Symbols and Postings Offsets tables.
sectionLenCache: make(map[int]int, 2),
}
}

func (bf *BucketDecbufFactory) NewDecbufAtChecked(offset int, table *crc32.Table) Decbuf {
// At this point we don't know the length of the section (length is -1).
rc, err := bf.bkt.GetRange(bf.ctx, bf.objectPath, int64(offset), -1)
if err != nil {
return Decbuf{E: fmt.Errorf("get range from %s at offset %d: %w", bf.objectPath, offset, err)}
}

closeReader := true
defer func() {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary Attributes call when section length is cached

Medium Severity

NewDecbufAtChecked calls bf.bkt.Attributes() on every invocation, even when the section length is already cached in sectionLenCache. On cache hit, attrs is only used for offset validation — the metaReader creation that actually needs attrs.Size is skipped. This adds an unnecessary bucket metadata round-trip per cached read, undermining the PR's goal of reducing bucket operations.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 29346f6. Configure here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going to ignore this for now:

  1. We have still reduced total calls in this PR
  2. These calls are cached by the metadata cache
  3. It will be a cleaner solution once we can use TOC to initialize decbufs

if closeReader {
rc.Close()
}
}()

// Consume 4-byte length prefix of the section.
lengthBytes := make([]byte, 4)
n, err := io.ReadFull(rc, lengthBytes)
attrs, err := bf.bkt.Attributes(bf.ctx, bf.objectPath)
if err != nil {
return Decbuf{E: fmt.Errorf("read section length from %s at offset %d: %w", bf.objectPath, offset, err)}
return Decbuf{E: fmt.Errorf("get size from %s: %w", bf.objectPath, err)}
}
if n != 4 {
return Decbuf{E: fmt.Errorf(
"insufficient bytes read from %s for section length at offset %d (got %d, wanted %d): %w",
bf.objectPath, offset, n, 4, ErrInvalidSize,
)}
if offset > int(attrs.Size) {
return Decbuf{E: fmt.Errorf("offset greater than object size of %s", bf.objectPath)}
}

contentLength := int(binary.BigEndian.Uint32(lengthBytes))
bufLength := len(lengthBytes) + contentLength + crc32.Size
var contentLength int
bf.mu.Lock()
if cachedContentLength, ok := bf.sectionLenCache[offset]; ok {
// Section length is cached
contentLength = cachedContentLength
} else {
// We do not know section length yet;
// use the lower-level BucketReader to scan the length data
metaReader := NewBucketReader(
bf.ctx, bf.bkt, bf.objectPath, offset, int(attrs.Size)-offset,
)

r := newStreamReader(rc, len(lengthBytes), bufLength)
r.seekReader = func(off int) error {
rc, err := bf.bkt.GetRange(bf.ctx, bf.objectPath, int64(offset+off), -1)
lengthBytes := make([]byte, numLenBytes)
n, err := metaReader.Read(lengthBytes)
if err != nil {
return err
bf.mu.Unlock()
return Decbuf{E: err}
Comment thread
cursor[bot] marked this conversation as resolved.
}
if n != numLenBytes {
bf.mu.Unlock()
return Decbuf{E: fmt.Errorf("insufficient bytes read for size (got %d, wanted %d): %w", n, numLenBytes, ErrInvalidSize)}
}
r.rc.Close()
r.rc = rc
return nil
contentLength = int(binary.BigEndian.Uint32(lengthBytes))
bf.sectionLenCache[offset] = contentLength
}
bf.mu.Unlock()

d := Decbuf{r: r}
closeReader = false
bufferLength := numLenBytes + contentLength + crc32.Size

bufReader := NewBucketBufReader(bf.ctx, bf.bkt, bf.objectPath, offset, bufferLength)
// bufReader is expected start at base offset + 4 after consuming length bytes
err = bufReader.Skip(numLenBytes)
if err != nil {
_ = bufReader.Close()
return Decbuf{E: err}
}
Comment thread
cursor[bot] marked this conversation as resolved.
d := Decbuf{r: bufReader}

if table != nil {
if d.CheckCrc32(table); d.Err() != nil {
return d
}

// reset to the beginning of the content after reading it all for the CRC.
d.ResetAt(4)
d.ResetAt(numLenBytes)
}

return d
Expand All @@ -95,37 +99,16 @@ func (bf *BucketDecbufFactory) NewDecbufAtUnchecked(offset int) Decbuf {
func (bf *BucketDecbufFactory) NewRawDecbuf() Decbuf {
const offset = 0

rc, err := bf.bkt.GetRange(bf.ctx, bf.objectPath, offset, -1)
if err != nil {
return Decbuf{E: fmt.Errorf("get range from %s at offset %d: %w", bf.objectPath, offset, err)}
}

closeReader := true
defer func() {
if closeReader {
rc.Close()
}
}()

attrs, err := bf.bkt.Attributes(bf.ctx, bf.objectPath)
if err != nil {
return Decbuf{E: fmt.Errorf("get size from %s: %w", bf.objectPath, err)}
}

r := newStreamReader(rc, offset, int(attrs.Size))
r.seekReader = func(_ int) error {
// Create reader from full file range
rc, err := bf.bkt.GetRange(bf.ctx, bf.objectPath, offset, attrs.Size)
if err != nil {
return err
}
r.rc.Close()
r.rc = rc
return nil
}

closeReader = false
return Decbuf{r: r}
// Create reader from full file range
r := NewBucketBufReader(
bf.ctx, bf.bkt, bf.objectPath, offset, int(attrs.Size),
)
d := Decbuf{r: r}
return d
}

// Close cleans up resources associated with this BucketDecbufFactory.
Expand All @@ -135,138 +118,3 @@ func (bf *BucketDecbufFactory) Close() error {
// Nothing to do for bucket-based implementation
return nil
}

// streamReader implements BufReader with a bucket-based io.ReadCloser
type streamReader struct {
rc io.ReadCloser
buf *bufio.Reader
pos int
length int

seekReader func(off int) error
}

var netbufPool = sync.Pool{
New: func() any {
// 1MiB buffer chosen as starting point;
// we could make this configurable and benchmark.
return bufio.NewReaderSize(nil, 1<<20)
},
}

// newStreamReader creates a new streamReader that wraps the given io.ReadCloser.
func newStreamReader(rc io.ReadCloser, pos, length int) *streamReader {
r := &streamReader{
rc: rc,
buf: netbufPool.Get().(*bufio.Reader),
pos: pos,
length: length,
}
r.buf.Reset(r.rc)
return r
}

func (r *streamReader) Reset() error {
return r.ResetAt(0)
}

func (r *streamReader) ResetAt(off int) error {
if off > r.length {
return ErrInvalidSize
}

if dist := off - r.pos; dist > 0 && dist < r.Buffered() {
// skip ahead by discarding the distance bytes
return r.Skip(dist)
}

// Objstore hides the io.ReadSeekCloser, that the underlying bucket clients implement.
// So we reimplement it ourselves:
// 1. Close the r.rc
// 2. Re-read the object from new offset
// 3. Reset the r.buf and the rest of the state.
// TODO: evaluate if we need a more efficient approach
if err := r.seekReader(off); err != nil {
return err
}

r.buf.Reset(r.rc)
r.pos = off

return nil
}

func (r *streamReader) Skip(l int) error {
if l > r.Len() {
return ErrInvalidSize
}

// TODO: how to make sure we don't trash the cache when skipping
n, err := r.buf.Discard(l)
if n > 0 {
r.pos += n
}

return err
}

func (r *streamReader) Peek(n int) ([]byte, error) {
b, err := r.buf.Peek(n)
if err != nil && !errors.Is(err, io.EOF) {
return nil, err
}

if len(b) > 0 {
return b, nil
}

return nil, nil
}

func (r *streamReader) Read(n int) ([]byte, error) {
b := make([]byte, n)

err := r.ReadInto(b)
if err != nil {
return nil, err
}

return b, nil
}

func (r *streamReader) ReadInto(b []byte) error {
n, err := io.ReadFull(r.buf, b)
if n > 0 {
r.pos += n
}

if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
return fmt.Errorf("%w reading %d bytes: %s", ErrInvalidSize, len(b), err)
} else if err != nil {
return err
}

return nil
}

func (r *streamReader) Offset() int {
return r.pos
}

func (r *streamReader) Len() int {
return r.length - r.pos
}

func (r *streamReader) Size() int {
return r.buf.Size()
}

func (r *streamReader) Buffered() int {
return r.buf.Buffered()
}

func (r *streamReader) Close() error {
err := r.rc.Close()
netbufPool.Put(r.buf)
return err
}
Loading
Loading