Skip to content

Commit 8190ded

Browse files
committed
chore(metastore): stream pointers columnar reader
Use columnar reader to read the stream pointers
1 parent a6dbce5 commit 8190ded

File tree

8 files changed

+692
-21
lines changed

8 files changed

+692
-21
lines changed

pkg/dataobj/consumer/logsobj/builder.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -127,10 +127,10 @@ type BuilderConfig struct {
127127
// RegisterFlagsWithPrefix registers flags with the given prefix.
128128
func (cfg *BuilderConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
129129
// Set defaults for base builder configuration
130-
_ = cfg.BuilderBaseConfig.TargetPageSize.Set("2MB")
131-
_ = cfg.BuilderBaseConfig.TargetObjectSize.Set("1GB")
132-
_ = cfg.BuilderBaseConfig.BufferSize.Set("16MB")
133-
_ = cfg.BuilderBaseConfig.TargetSectionSize.Set("128MB")
130+
_ = cfg.TargetPageSize.Set("2MB")
131+
_ = cfg.TargetObjectSize.Set("1GB")
132+
_ = cfg.BufferSize.Set("16MB")
133+
_ = cfg.TargetSectionSize.Set("128MB")
134134
cfg.BuilderBaseConfig.RegisterFlagsWithPrefix(prefix, f)
135135

136136
f.StringVar(&cfg.DataobjSortOrder, prefix+"dataobj-sort-order", sortStreamASC, "The desired sort order of the logs section. Can either be `stream-asc` (order by streamID ascending and timestamp descending) or `timestamp-desc` (order by timestamp descending and streamID ascending).")

pkg/dataobj/index/config.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
2121

2222
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
2323
// Set defaults for base builder configuration
24-
_ = cfg.BuilderBaseConfig.TargetPageSize.Set("128KB")
25-
_ = cfg.BuilderBaseConfig.TargetObjectSize.Set("64MB")
26-
_ = cfg.BuilderBaseConfig.BufferSize.Set("2MB")
27-
_ = cfg.BuilderBaseConfig.TargetSectionSize.Set("16MB")
24+
_ = cfg.TargetPageSize.Set("128KB")
25+
_ = cfg.TargetObjectSize.Set("64MB")
26+
_ = cfg.BufferSize.Set("2MB")
27+
_ = cfg.TargetSectionSize.Set("16MB")
2828
cfg.BuilderBaseConfig.RegisterFlagsWithPrefix(prefix, f)
2929

3030
f.IntVar(&cfg.EventsPerIndex, prefix+"events-per-index", 32, "Experimental: The number of events to batch before building an index")

pkg/dataobj/metastore/iter.go

Lines changed: 260 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
package metastore
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"io"
8+
"time"
9+
10+
"github.com/apache/arrow-go/v18/arrow/array"
11+
"github.com/apache/arrow-go/v18/arrow/scalar"
12+
"github.com/grafana/dskit/user"
13+
14+
"github.com/grafana/loki/v3/pkg/dataobj"
15+
"github.com/grafana/loki/v3/pkg/dataobj/sections/pointers"
16+
)
17+
18+
// forEachStreamSectionPointer iterates over all the section pointers that point to one of the
19+
// [streamIDs] in a given [indexObj] that overlap [sStart, sEnd] (inclusive) time range and
20+
// calls [f] on every found SectionPointer.
21+
func forEachStreamSectionPointer(
22+
ctx context.Context,
23+
indexObj *dataobj.Object,
24+
sStart, sEnd *scalar.Timestamp,
25+
streamIDs []int64,
26+
f func(pointers.SectionPointer),
27+
) error {
28+
targetTenant, err := user.ExtractOrgID(ctx)
29+
if err != nil {
30+
return fmt.Errorf("extracting org ID: %w", err)
31+
}
32+
var reader pointers.Reader
33+
defer reader.Close()
34+
35+
// prepare streamIDs scalars only once, they are used in the predicate later
36+
var sStreamIDs []scalar.Scalar
37+
for _, streamID := range streamIDs {
38+
sStreamIDs = append(sStreamIDs, scalar.NewInt64Scalar(streamID))
39+
}
40+
41+
const batchSize = 128
42+
buf := make([]streamSectionPointerBuilder, batchSize)
43+
44+
// iterate over the sections and fill buf column by column
45+
// once the read operation is over invoke client's [f] on every read row (numRows not always the same as len(buf))
46+
for _, section := range indexObj.Sections().Filter(pointers.CheckSection) {
47+
if section.Tenant != targetTenant {
48+
continue
49+
}
50+
51+
sec, err := pointers.Open(ctx, section)
52+
if err != nil {
53+
return fmt.Errorf("opening section: %w", err)
54+
}
55+
56+
pointerCols, err := findPointersColumnsByTypes(
57+
sec.Columns(),
58+
pointers.ColumnTypePath,
59+
pointers.ColumnTypeSection,
60+
pointers.ColumnTypeStreamID,
61+
pointers.ColumnTypeStreamIDRef,
62+
pointers.ColumnTypeMinTimestamp,
63+
pointers.ColumnTypeMaxTimestamp,
64+
pointers.ColumnTypeRowCount,
65+
pointers.ColumnTypeUncompressedSize,
66+
)
67+
if err != nil {
68+
return fmt.Errorf("finding pointers columns: %w", err)
69+
}
70+
71+
var (
72+
colStreamID *pointers.Column
73+
colMinTimestamp *pointers.Column
74+
colMaxTimestamp *pointers.Column
75+
)
76+
77+
for _, c := range pointerCols {
78+
if c.Type == pointers.ColumnTypeStreamID {
79+
colStreamID = c
80+
}
81+
if c.Type == pointers.ColumnTypeMinTimestamp {
82+
colMinTimestamp = c
83+
}
84+
if c.Type == pointers.ColumnTypeMaxTimestamp {
85+
colMaxTimestamp = c
86+
}
87+
if colStreamID != nil && colMinTimestamp != nil && colMaxTimestamp != nil {
88+
break
89+
}
90+
}
91+
92+
if colStreamID == nil || colMinTimestamp == nil || colMaxTimestamp == nil {
93+
return fmt.Errorf(
94+
"one of mandatory columns is missing: (streamID=%t, minTimestamp=%t, maxTimestamp=%t)",
95+
colStreamID == nil, colMinTimestamp == nil, colMaxTimestamp == nil,
96+
)
97+
}
98+
99+
reader.Reset(pointers.ReaderOptions{
100+
Columns: pointerCols,
101+
Predicates: []pointers.Predicate{
102+
buildPointersTimeRangePredicate(colMinTimestamp, colMaxTimestamp, sStart, sEnd),
103+
pointers.InPredicate{
104+
Column: colStreamID,
105+
Values: sStreamIDs,
106+
},
107+
},
108+
})
109+
110+
for {
111+
rec, readErr := reader.Read(ctx, batchSize)
112+
if readErr != nil && !errors.Is(readErr, io.EOF) {
113+
return fmt.Errorf("reading recordBatch: %w", readErr)
114+
}
115+
numRows := int(rec.NumRows())
116+
if numRows == 0 && errors.Is(readErr, io.EOF) {
117+
break
118+
}
119+
120+
for colIdx := range int(rec.NumCols()) {
121+
col := rec.Column(colIdx)
122+
pointerCol := pointerCols[colIdx]
123+
124+
switch pointerCol.Type {
125+
case pointers.ColumnTypePath:
126+
for rIdx := range numRows {
127+
if col.IsNull(rIdx) {
128+
continue
129+
}
130+
buf[rIdx].path = col.(*array.String).Value(rIdx)
131+
}
132+
case pointers.ColumnTypeSection:
133+
for rIdx := range numRows {
134+
if col.IsNull(rIdx) {
135+
continue
136+
}
137+
buf[rIdx].section = col.(*array.Int64).Value(rIdx)
138+
}
139+
case pointers.ColumnTypeStreamID:
140+
for rIdx := range numRows {
141+
if col.IsNull(rIdx) {
142+
continue
143+
}
144+
buf[rIdx].streamID = col.(*array.Int64).Value(rIdx)
145+
}
146+
case pointers.ColumnTypeStreamIDRef:
147+
for rIdx := range numRows {
148+
if col.IsNull(rIdx) {
149+
continue
150+
}
151+
buf[rIdx].streamIDRef = col.(*array.Int64).Value(rIdx)
152+
}
153+
case pointers.ColumnTypeMinTimestamp:
154+
for rIdx := range numRows {
155+
if col.IsNull(rIdx) {
156+
continue
157+
}
158+
buf[rIdx].start = time.Unix(0, int64(col.(*array.Timestamp).Value(rIdx)))
159+
}
160+
case pointers.ColumnTypeMaxTimestamp:
161+
for rIdx := range numRows {
162+
if col.IsNull(rIdx) {
163+
continue
164+
}
165+
buf[rIdx].end = time.Unix(0, int64(col.(*array.Timestamp).Value(rIdx)))
166+
}
167+
case pointers.ColumnTypeRowCount:
168+
for rIdx := range numRows {
169+
if col.IsNull(rIdx) {
170+
continue
171+
}
172+
buf[rIdx].lineCount = col.(*array.Int64).Value(rIdx)
173+
}
174+
case pointers.ColumnTypeUncompressedSize:
175+
for rIdx := range numRows {
176+
if col.IsNull(rIdx) {
177+
continue
178+
}
179+
buf[rIdx].uncompressedSize = col.(*array.Int64).Value(rIdx)
180+
}
181+
default:
182+
continue
183+
}
184+
}
185+
186+
for rowIdx := range numRows {
187+
b := buf[rowIdx]
188+
f(pointers.SectionPointer{
189+
Path: b.path,
190+
Section: b.section,
191+
PointerKind: pointers.PointerKindStreamIndex,
192+
StreamID: b.streamID,
193+
StreamIDRef: b.streamIDRef,
194+
StartTs: b.start,
195+
EndTs: b.end,
196+
LineCount: b.lineCount,
197+
UncompressedSize: b.uncompressedSize,
198+
})
199+
}
200+
201+
if errors.Is(readErr, io.EOF) {
202+
break
203+
}
204+
}
205+
}
206+
207+
return nil
208+
}
209+
210+
type streamSectionPointerBuilder struct {
211+
path string
212+
section int64
213+
streamID int64
214+
streamIDRef int64
215+
start time.Time
216+
end time.Time
217+
lineCount int64
218+
uncompressedSize int64
219+
}
220+
221+
func (b streamSectionPointerBuilder) build() streamSectionPointerBuilder {
222+
return streamSectionPointerBuilder{
223+
path: b.path,
224+
section: b.section,
225+
streamID: b.streamID,
226+
streamIDRef: b.streamIDRef,
227+
start: b.start,
228+
end: b.end,
229+
lineCount: b.lineCount,
230+
uncompressedSize: b.uncompressedSize,
231+
}
232+
}
233+
234+
func buildPointersTimeRangePredicate(
235+
colMinTimestamp, colMaxTimestamp *pointers.Column,
236+
sStart scalar.Scalar, sEnd scalar.Scalar,
237+
) pointers.Predicate {
238+
return pointers.AndPredicate{
239+
Left: pointers.OrPredicate{
240+
Left: pointers.EqualPredicate{
241+
Column: colMaxTimestamp,
242+
Value: sStart,
243+
},
244+
Right: pointers.GreaterThanPredicate{
245+
Column: colMaxTimestamp,
246+
Value: sStart,
247+
},
248+
},
249+
Right: pointers.OrPredicate{
250+
Left: pointers.EqualPredicate{
251+
Column: colMinTimestamp,
252+
Value: sEnd,
253+
},
254+
Right: pointers.LessThanPredicate{
255+
Column: colMinTimestamp,
256+
Value: sEnd,
257+
},
258+
},
259+
}
260+
}

pkg/dataobj/metastore/object.go

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ import (
1414
"sync"
1515
"time"
1616

17+
"github.com/apache/arrow-go/v18/arrow"
18+
"github.com/apache/arrow-go/v18/arrow/scalar"
1719
"github.com/go-kit/log"
1820
"github.com/go-kit/log/level"
1921
"github.com/grafana/dskit/tenant"
@@ -206,11 +208,7 @@ func (m *ObjectMetastore) Sections(ctx context.Context, start, end time.Time, ma
206208

207209
// Search the stream sections of the matching objects to find matching streams
208210
streamMatchers := streamPredicateFromMatchers(start, end, matchers...)
209-
pointerPredicate := pointers.TimeRangeRowPredicate{
210-
Start: start,
211-
End: end,
212-
}
213-
streamSectionPointers, err := m.getSectionsForStreams(ctx, indexObjects, streamMatchers, pointerPredicate)
211+
streamSectionPointers, err := m.getSectionsForStreams(ctx, indexObjects, streamMatchers, start, end)
214212
if err != nil {
215213
return nil, err
216214
}
@@ -504,12 +502,15 @@ func (m *ObjectMetastore) listStreamIDsFromLogObjects(ctx context.Context, objec
504502

505503
// getSectionsForStreams reads the section data from matching streams and aggregates them into section descriptors.
506504
// This is an exact lookup and includes metadata from the streams in each section: the stream IDs, the min-max timestamps, the number of bytes & number of lines.
507-
func (m *ObjectMetastore) getSectionsForStreams(ctx context.Context, indexObjects []*dataobj.Object, streamPredicate streams.RowPredicate, timeRangePredicate pointers.TimeRangeRowPredicate) ([]*DataobjSectionDescriptor, error) {
505+
func (m *ObjectMetastore) getSectionsForStreams(ctx context.Context, indexObjects []*dataobj.Object, streamPredicate streams.RowPredicate, start, end time.Time) ([]*DataobjSectionDescriptor, error) {
508506
if streamPredicate == nil {
509507
// At least one stream matcher is required, currently.
510508
return nil, nil
511509
}
512510

511+
sStart := scalar.NewTimestampScalar(arrow.Timestamp(start.UnixNano()), arrow.FixedWidthTypes.Timestamp_ns)
512+
sEnd := scalar.NewTimestampScalar(arrow.Timestamp(end.UnixNano()), arrow.FixedWidthTypes.Timestamp_ns)
513+
513514
timer := prometheus.NewTimer(m.metrics.streamFilterTotalDuration)
514515
defer timer.ObserveDuration()
515516

@@ -540,7 +541,8 @@ func (m *ObjectMetastore) getSectionsForStreams(ctx context.Context, indexObject
540541

541542
objectSectionDescriptors := make(map[SectionKey]*DataobjSectionDescriptor)
542543
sectionPointerReadTimer := prometheus.NewTimer(m.metrics.streamFilterPointersReadDuration)
543-
err = forEachObjPointer(ctx, indexObject, timeRangePredicate, matchingStreamIDs, func(pointer pointers.SectionPointer) {
544+
545+
err = forEachStreamSectionPointer(ctx, indexObject, sStart, sEnd, matchingStreamIDs, func(pointer pointers.SectionPointer) {
544546
key.ObjectPath = pointer.Path
545547
key.SectionIdx = pointer.Section
546548

@@ -551,6 +553,7 @@ func (m *ObjectMetastore) getSectionsForStreams(ctx context.Context, indexObject
551553
}
552554
sectionDescriptor.Merge(pointer)
553555
})
556+
554557
if err != nil {
555558
return fmt.Errorf("reading section pointers from index: %w", err)
556559
}
@@ -827,3 +830,19 @@ func dedupeAndSort(objects [][]string) []string {
827830
sort.Strings(paths)
828831
return paths
829832
}
833+
834+
func findPointersColumnsByTypes(allColumns []*pointers.Column, columnTypes ...pointers.ColumnType) ([]*pointers.Column, error) {
835+
result := make([]*pointers.Column, 0, len(columnTypes))
836+
837+
for _, c := range allColumns {
838+
for _, neededType := range columnTypes {
839+
if neededType != c.Type {
840+
continue
841+
}
842+
843+
result = append(result, c)
844+
}
845+
}
846+
847+
return result, nil
848+
}

0 commit comments

Comments
 (0)