Skip to content
This repository was archived by the owner on Jul 19, 2023. It is now read-only.

Commit 9ab5fba

Browse files
committed
Query profiles parquet by rowGroup
1 parent ebc3e04 commit 9ab5fba

File tree

2 files changed

+113
-37
lines changed

2 files changed

+113
-37
lines changed

pkg/iter/iter.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,3 +259,59 @@ func (it *BufferedIterator[T]) Next() bool {
259259
func (it *BufferedIterator[T]) At() T {
260260
return it.at
261261
}
262+
263+
type LazyConcatIterator[T any] struct {
264+
curr Iterator[T]
265+
err error
266+
nextIterator func() (Iterator[T], error)
267+
}
268+
269+
// NewLazyConcatIterator returns an iterator that concats additional elements once it runs out.
270+
func NewLazyConcatIterator[T any](nextIterator func() (Iterator[T], error)) Iterator[T] {
271+
// get next iterator first
272+
it, err := nextIterator()
273+
return &LazyConcatIterator[T]{
274+
curr: it,
275+
err: err,
276+
nextIterator: nextIterator,
277+
}
278+
}
279+
280+
func (it *LazyConcatIterator[T]) Next() bool {
281+
if it.curr == nil {
282+
return false
283+
}
284+
285+
if it.curr.Next() {
286+
return true
287+
}
288+
289+
it.curr, it.err = it.nextIterator()
290+
291+
return it.Next()
292+
}
293+
294+
func (it *LazyConcatIterator[T]) At() T {
295+
if it.curr == nil {
296+
var zero T
297+
return zero
298+
}
299+
return it.curr.At()
300+
}
301+
302+
func (it *LazyConcatIterator[T]) Err() error {
303+
if it.err != nil {
304+
return it.err
305+
}
306+
if it.curr == nil {
307+
return nil
308+
}
309+
return it.curr.Err()
310+
}
311+
312+
func (it *LazyConcatIterator[T]) Close() error {
313+
if it.curr == nil {
314+
return nil
315+
}
316+
return it.curr.Close()
317+
}

pkg/phlaredb/block_querier.go

Lines changed: 57 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -983,46 +983,65 @@ func (b *singleBlockQuerier) SelectMatchingProfiles(ctx context.Context, params
983983
lbls = make(phlaremodel.Labels, 0, 6)
984984
}
985985
}
986-
pIt := query.NewJoinIterator(
987-
0,
988-
[]query.Iterator{
989-
b.profiles.columnIter(ctx, "SeriesIndex", newMapPredicate(lblsPerRef), "SeriesIndex"),
990-
b.profiles.columnIter(ctx, "TimeNanos", query.NewIntBetweenPredicate(model.Time(params.Start).UnixNano(), model.Time(params.End).UnixNano()), "TimeNanos"),
991-
b.profiles.columnIter(ctx, "StacktracePartition", nil, "StacktracePartition"),
992-
},
993-
nil,
994-
)
986+
987+
rowGroups := b.profiles.file.RowGroups()
995988
iters := make([]iter.Iterator[Profile], 0, len(lblsPerRef))
996-
buf := make([][]parquet.Value, 2)
997-
defer pIt.Close()
998-
999-
currSeriesIndex := int64(-1)
1000-
var currentSeriesSlice []Profile
1001-
for pIt.Next() {
1002-
res := pIt.At()
1003-
buf = res.Columns(buf, "SeriesIndex", "TimeNanos", "StacktracePartition")
1004-
seriesIndex := buf[0][0].Int64()
1005-
if seriesIndex != currSeriesIndex {
1006-
currSeriesIndex++
1007-
if len(currentSeriesSlice) > 0 {
1008-
iters = append(iters, iter.NewSliceIterator(currentSeriesSlice))
989+
buf := make([][]parquet.Value, 3)
990+
991+
return iter.NewLazyConcatIterator(func() (iter.Iterator[Profile], error) {
992+
993+
// all rowgroups have been read
994+
if len(rowGroups) == 0 {
995+
return nil, nil
996+
}
997+
998+
sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectMatchingProfiles - RowGroup")
999+
defer sp.Finish()
1000+
1001+
pIt := query.NewJoinIterator(
1002+
0,
1003+
[]query.Iterator{
1004+
b.profiles.columnIter(ctx, rowGroups[0:1], "SeriesIndex", newMapPredicate(lblsPerRef), "SeriesIndex"),
1005+
b.profiles.columnIter(ctx, rowGroups[0:1], "TimeNanos", query.NewIntBetweenPredicate(model.Time(params.Start).UnixNano(), model.Time(params.End).UnixNano()), "TimeNanos"),
1006+
b.profiles.columnIter(ctx, rowGroups[0:1], "StacktracePartition", nil, "StacktracePartition"),
1007+
},
1008+
nil,
1009+
)
1010+
defer pIt.Close()
1011+
1012+
// remove first row group
1013+
rowGroups = rowGroups[1:]
1014+
1015+
currSeriesIndex := int64(-1)
1016+
var currentSeriesSlice []Profile
1017+
for pIt.Next() {
1018+
res := pIt.At()
1019+
buf = res.Columns(buf, "SeriesIndex", "TimeNanos", "StacktracePartition")
1020+
seriesIndex := buf[0][0].Int64()
1021+
if seriesIndex != currSeriesIndex {
1022+
currSeriesIndex++
1023+
if len(currentSeriesSlice) > 0 {
1024+
iters = append(iters, iter.NewSliceIterator(currentSeriesSlice))
1025+
}
1026+
// can probably keep the ones form before
1027+
currentSeriesSlice = make([]Profile, 0, 100)
10091028
}
1010-
currentSeriesSlice = make([]Profile, 0, 100)
1029+
1030+
currentSeriesSlice = append(currentSeriesSlice, BlockProfile{
1031+
labels: lblsPerRef[seriesIndex].lbs,
1032+
fp: lblsPerRef[seriesIndex].fp,
1033+
ts: model.TimeFromUnixNano(buf[1][0].Int64()),
1034+
stacktracePartition: retrieveStacktracePartition(buf, 2),
1035+
RowNum: res.RowNumber[0],
1036+
})
1037+
}
1038+
if len(currentSeriesSlice) > 0 {
1039+
iters = append(iters, iter.NewSliceIterator(currentSeriesSlice))
10111040
}
10121041

1013-
currentSeriesSlice = append(currentSeriesSlice, BlockProfile{
1014-
labels: lblsPerRef[seriesIndex].lbs,
1015-
fp: lblsPerRef[seriesIndex].fp,
1016-
ts: model.TimeFromUnixNano(buf[1][0].Int64()),
1017-
stacktracePartition: retrieveStacktracePartition(buf, 2),
1018-
RowNum: res.RowNumber[0],
1019-
})
1020-
}
1021-
if len(currentSeriesSlice) > 0 {
1022-
iters = append(iters, iter.NewSliceIterator(currentSeriesSlice))
1023-
}
1042+
return iter.NewMergeIterator(maxBlockProfile, false, iters...), nil
10241043

1025-
return iter.NewMergeIterator(maxBlockProfile, false, iters...), nil
1044+
}), nil
10261045
}
10271046

10281047
func (b *singleBlockQuerier) Sort(in []Profile) []Profile {
@@ -1185,13 +1204,14 @@ func (r *parquetReader[M, P]) relPath() string {
11851204
return r.persister.Name() + block.ParquetSuffix
11861205
}
11871206

1188-
func (r *parquetReader[M, P]) columnIter(ctx context.Context, columnName string, predicate query.Predicate, alias string) query.Iterator {
1207+
func (r *parquetReader[M, P]) columnIter(ctx context.Context, rowGroups []parquet.RowGroup, columnName string, predicate query.Predicate, alias string) query.Iterator {
1208+
11891209
index, _ := query.GetColumnIndexByPath(r.file, columnName)
11901210
if index == -1 {
11911211
return query.NewErrIterator(fmt.Errorf("column '%s' not found in parquet file '%s'", columnName, r.relPath()))
11921212
}
11931213
ctx = query.AddMetricsToContext(ctx, r.metrics.query)
1194-
return query.NewColumnIterator(ctx, r.file.RowGroups(), index, columnName, 1000, predicate, alias)
1214+
return query.NewColumnIterator(ctx, rowGroups, index, columnName, 1000, predicate, alias)
11951215
}
11961216

11971217
func repeatedColumnIter[T any](ctx context.Context, source Source, columnName string, rows iter.Iterator[T]) iter.Iterator[*query.RepeatedRow[T]] {

0 commit comments

Comments
 (0)