-
Notifications
You must be signed in to change notification settings - Fork 771
Expand file tree
/
Copy pathbucket_factory.go
More file actions
120 lines (103 loc) · 3.42 KB
/
bucket_factory.go
File metadata and controls
120 lines (103 loc) · 3.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
// SPDX-License-Identifier: AGPL-3.0-only
package encoding
import (
"context"
"encoding/binary"
"fmt"
"hash/crc32"
"sync"
"github.com/thanos-io/objstore"
)
// BucketDecbufFactory creates new bucket-reader-backed Decbuf instances
// for a specific index file in object storage
type BucketDecbufFactory struct {
ctx context.Context
bkt objstore.BucketReader
objectPath string // Path to index file in bucket
sectionLenCache map[int]int
mu sync.Mutex
}
// NewBucketDecbufFactory creates a new BucketDecbufFactory for the given object path.
func NewBucketDecbufFactory(ctx context.Context, bkt objstore.BucketReader, objectPath string) *BucketDecbufFactory {
return &BucketDecbufFactory{
ctx: ctx,
bkt: bkt,
objectPath: objectPath,
// Allocate cache to hold the start offsets of Symbols and Postings Offsets tables.
sectionLenCache: make(map[int]int, 2),
}
}
func (bf *BucketDecbufFactory) NewDecbufAtChecked(offset int, table *crc32.Table) Decbuf {
attrs, err := bf.bkt.Attributes(bf.ctx, bf.objectPath)
if err != nil {
return Decbuf{E: fmt.Errorf("get size from %s: %w", bf.objectPath, err)}
}
if offset > int(attrs.Size) {
return Decbuf{E: fmt.Errorf("offset greater than object size of %s", bf.objectPath)}
}
var contentLength int
bf.mu.Lock()
if cachedContentLength, ok := bf.sectionLenCache[offset]; ok {
// Section length is cached
contentLength = cachedContentLength
} else {
// We do not know section length yet;
// use the lower-level BucketReader to scan the length data
metaReader := NewBucketReader(
bf.ctx, bf.bkt, bf.objectPath, offset, int(attrs.Size)-offset,
)
lengthBytes := make([]byte, numLenBytes)
n, err := metaReader.Read(lengthBytes)
if err != nil {
bf.mu.Unlock()
return Decbuf{E: err}
}
if n != numLenBytes {
bf.mu.Unlock()
return Decbuf{E: fmt.Errorf("insufficient bytes read for size (got %d, wanted %d): %w", n, numLenBytes, ErrInvalidSize)}
}
contentLength = int(binary.BigEndian.Uint32(lengthBytes))
bf.sectionLenCache[offset] = contentLength
}
bf.mu.Unlock()
bufferLength := numLenBytes + contentLength + crc32.Size
bufReader := NewBucketBufReader(bf.ctx, bf.bkt, bf.objectPath, offset, bufferLength)
// bufReader is expected start at base offset + 4 after consuming length bytes
err = bufReader.Skip(numLenBytes)
if err != nil {
_ = bufReader.Close()
return Decbuf{E: err}
}
d := Decbuf{r: bufReader}
if table != nil {
if d.CheckCrc32(table); d.Err() != nil {
return d
}
// reset to the beginning of the content after reading it all for the CRC.
d.ResetAt(numLenBytes)
}
return d
}
func (bf *BucketDecbufFactory) NewDecbufAtUnchecked(offset int) Decbuf {
return bf.NewDecbufAtChecked(offset, nil)
}
func (bf *BucketDecbufFactory) NewRawDecbuf() Decbuf {
const offset = 0
attrs, err := bf.bkt.Attributes(bf.ctx, bf.objectPath)
if err != nil {
return Decbuf{E: fmt.Errorf("get size from %s: %w", bf.objectPath, err)}
}
// Create reader from full file range
r := NewBucketBufReader(
bf.ctx, bf.bkt, bf.objectPath, offset, int(attrs.Size),
)
d := Decbuf{r: r}
return d
}
// Close cleans up resources associated with this BucketDecbufFactory.
// For bucket-based implementation, there are no resources to clean up;
// the bucket client lifecycle is managed by parent components.
func (bf *BucketDecbufFactory) Close() error {
// Nothing to do for bucket-based implementation
return nil
}