Skip to content
This repository was archived by the owner on Jul 19, 2023. It is now read-only.

Commit 3a86303

Browse files
committed
Something iterates
1 parent f331ff6 commit 3a86303

File tree

5 files changed

+284
-24
lines changed

5 files changed

+284
-24
lines changed

pkg/phlaredb/block_querier.go

Lines changed: 209 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -941,6 +941,204 @@ func retrieveStacktracePartition(buf [][]parquet.Value, pos int) uint64 {
941941
return uint64(0)
942942
}
943943

944+
type sortingIterator struct {
945+
}
946+
947+
type rowGroupIndex struct {
948+
min, max, idx int64
949+
}
950+
951+
type profileMatchIterator struct {
952+
err error
953+
it query.Iterator
954+
955+
file *parquet.File
956+
currentRowGroup int
957+
rowGroups []parquet.RowGroup
958+
index []rowGroupIndex
959+
minTimeOrder []int
960+
lbls map[int64]labelsInfo
961+
962+
offset int
963+
ranges []rowRange
964+
seriesIndexes []uint32
965+
}
966+
967+
func newProfileMatchIterator(file *parquet.File, lbls map[int64]labelsInfo) query.Iterator {
968+
it := &profileMatchIterator{file: file, lbls: lbls}
969+
//timeNanosIndex, _ := query.GetColumnIndexByPath(file, "TimeNanos")
970+
it.rowGroups = file.RowGroups()
971+
972+
/*
973+
it.index = make([]rowGroupIndex, len(it.rowGroups))
974+
it.minTimeOrder = make([]int, len(it.rowGroups))
975+
976+
for rgIdx, rg := range it.rowGroups {
977+
it.minTimeOrder[rgIdx] = rgIdx
978+
idx := rg.ColumnChunks()[timeNanosIndex].ColumnIndex()
979+
for pgIdx := 0; pgIdx < idx.NumPages(); pgIdx++ {
980+
min := idx.MinValue(pgIdx).Int64()
981+
max := idx.MaxValue(pgIdx).Int64()
982+
if it.index[rgIdx].max == 0 || it.index[rgIdx].max < max {
983+
it.index[rgIdx].max = max
984+
}
985+
if it.index[rgIdx].min == 0 || it.index[rgIdx].min > min {
986+
it.index[rgIdx].min = min
987+
}
988+
}
989+
990+
sort.Slice(it.minTimeOrder, func(i, j int) bool {
991+
return it.index[i].min < it.index[j].min
992+
})
993+
}
994+
*/
995+
996+
return it
997+
}
998+
999+
type parquetUint32Reader interface {
1000+
ReadUint32s(values []uint32) (int, error)
1001+
}
1002+
1003+
func (it *profileMatchIterator) Seek(to query.RowNumberWithDefinitionLevel) bool {
1004+
return it.it.Seek(to)
1005+
}
1006+
1007+
func (it *profileMatchIterator) readNextRowGroup() error {
1008+
if it.currentRowGroup >= len(it.rowGroups) {
1009+
return io.EOF
1010+
}
1011+
1012+
rg := it.rowGroups[it.currentRowGroup]
1013+
it.currentRowGroup++
1014+
1015+
seriesIndexIndex, _ := query.GetColumnIndexByPath(it.file, "SeriesIndex")
1016+
1017+
// find series ids relevant
1018+
pages := rg.ColumnChunks()[seriesIndexIndex].Pages()
1019+
1020+
batch := make([]uint32, 10_000)
1021+
1022+
rangeIdx := -1
1023+
it.ranges = it.ranges[:0]
1024+
it.seriesIndexes = it.seriesIndexes[:0]
1025+
1026+
for {
1027+
page, err := pages.ReadPage()
1028+
if err == io.EOF {
1029+
break
1030+
} else if err != nil {
1031+
return err
1032+
}
1033+
1034+
reader, ok := page.Values().(parquetUint32Reader)
1035+
if !ok {
1036+
return fmt.Errorf("unexpected reader: %T", page.Values())
1037+
}
1038+
1039+
for {
1040+
valuesEOF := false
1041+
1042+
n, err := reader.ReadUint32s(batch)
1043+
if err == io.EOF {
1044+
valuesEOF = true
1045+
} else if err != nil {
1046+
return err
1047+
}
1048+
1049+
for idx := range batch {
1050+
seriesIdx := batch[idx]
1051+
1052+
_, selected := it.lbls[int64(seriesIdx)]
1053+
if !selected {
1054+
continue
1055+
}
1056+
1057+
// check rowRanges
1058+
if rangeIdx > 0 && seriesIdx == it.seriesIndexes[rangeIdx] {
1059+
it.ranges[rangeIdx].length += 1
1060+
continue
1061+
}
1062+
1063+
// if rangeIdx > 0 {
1064+
// fmt.Printf("rowRange finished: %+#v seriesIdx=%d offset=%d\n", it.ranges[rangeIdx], it.seriesIndexes[rangeIdx], it.offset)
1065+
// }
1066+
1067+
// needs new row range
1068+
rangeIdx += 1
1069+
it.ranges = append(it.ranges, rowRange{
1070+
rowNum: int64(idx + it.offset),
1071+
length: 1,
1072+
})
1073+
it.seriesIndexes = append(it.seriesIndexes, seriesIdx)
1074+
}
1075+
1076+
it.offset += n
1077+
if valuesEOF {
1078+
break
1079+
}
1080+
}
1081+
}
1082+
1083+
var rowNumberIter iter.Iterator[rowNumWithSomething[uint32]] = &rowRangesIter[uint32]{
1084+
r: it.ranges,
1085+
fps: it.seriesIndexes,
1086+
pos: 0,
1087+
}
1088+
1089+
it.it = query.NewRowNumberIterator(rowNumberIter)
1090+
1091+
return nil
1092+
1093+
}
1094+
1095+
func (it *profileMatchIterator) Next() bool {
1096+
err := it.next()
1097+
if err == io.EOF {
1098+
return false
1099+
}
1100+
if err != nil {
1101+
it.err = err
1102+
return false
1103+
}
1104+
1105+
return true
1106+
1107+
}
1108+
1109+
func (it *profileMatchIterator) next() error {
1110+
for it.it == nil || !it.it.Next() {
1111+
// close old iterator if required
1112+
if it.it != nil {
1113+
if err := it.it.Close(); err != nil {
1114+
return err
1115+
}
1116+
}
1117+
1118+
// get new iterator
1119+
err := it.readNextRowGroup()
1120+
if err != nil {
1121+
return err
1122+
}
1123+
}
1124+
1125+
return nil
1126+
}
1127+
1128+
func (it *profileMatchIterator) Err() error {
1129+
return it.err
1130+
}
1131+
1132+
func (it *profileMatchIterator) Close() error {
1133+
// TODO
1134+
return nil
1135+
}
1136+
1137+
func (it *profileMatchIterator) At() *query.IteratorResult {
1138+
v := it.it.At()
1139+
return v
1140+
}
1141+
9441142
func (b *singleBlockQuerier) SelectMatchingProfiles(ctx context.Context, params *ingestv1.SelectProfilesRequest) (iter.Iterator[Profile], error) {
9451143
sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectMatchingProfiles - Block")
9461144
defer sp.Finish()
@@ -992,16 +1190,18 @@ func (b *singleBlockQuerier) SelectMatchingProfiles(ctx context.Context, params
9921190
joinIters []query.Iterator
9931191
)
9941192

1193+
seriesIndexIter := newProfileMatchIterator(b.profiles.file, lblsPerRef)
1194+
9951195
if b.meta.Version >= 2 {
9961196
joinIters = []query.Iterator{
997-
b.profiles.columnIter(ctx, "SeriesIndex", newMapPredicate(lblsPerRef), "SeriesIndex"),
1197+
seriesIndexIter,
9981198
b.profiles.columnIter(ctx, "TimeNanos", query.NewIntBetweenPredicate(model.Time(params.Start).UnixNano(), model.Time(params.End).UnixNano()), "TimeNanos"),
9991199
b.profiles.columnIter(ctx, "StacktracePartition", nil, "StacktracePartition"),
10001200
}
10011201
buf = make([][]parquet.Value, 3)
10021202
} else {
10031203
joinIters = []query.Iterator{
1004-
b.profiles.columnIter(ctx, "SeriesIndex", newMapPredicate(lblsPerRef), "SeriesIndex"),
1204+
seriesIndexIter,
10051205
b.profiles.columnIter(ctx, "TimeNanos", query.NewIntBetweenPredicate(model.Time(params.Start).UnixNano(), model.Time(params.End).UnixNano()), "TimeNanos"),
10061206
}
10071207
buf = make([][]parquet.Value, 2)
@@ -1015,8 +1215,11 @@ func (b *singleBlockQuerier) SelectMatchingProfiles(ctx context.Context, params
10151215
var currentSeriesSlice []Profile
10161216
for pIt.Next() {
10171217
res := pIt.At()
1018-
buf = res.Columns(buf, "SeriesIndex", "TimeNanos", "StacktracePartition")
1019-
seriesIndex := buf[0][0].Int64()
1218+
1219+
seriesIndexE := res.Entries[0].RowValue.(rowNumWithSomething[uint32])
1220+
seriesIndex := int64(seriesIndexE.elem)
1221+
1222+
buf = res.Columns(buf, "TimeNanos", "StacktracePartition")
10201223
if seriesIndex != currSeriesIndex {
10211224
currSeriesIndex = seriesIndex
10221225
if len(currentSeriesSlice) > 0 {
@@ -1028,8 +1231,8 @@ func (b *singleBlockQuerier) SelectMatchingProfiles(ctx context.Context, params
10281231
currentSeriesSlice = append(currentSeriesSlice, BlockProfile{
10291232
labels: lblsPerRef[seriesIndex].lbs,
10301233
fp: lblsPerRef[seriesIndex].fp,
1031-
ts: model.TimeFromUnixNano(buf[1][0].Int64()),
1032-
stacktracePartition: retrieveStacktracePartition(buf, 2),
1234+
ts: model.TimeFromUnixNano(buf[0][0].Int64()),
1235+
stacktracePartition: retrieveStacktracePartition(buf, 1),
10331236
RowNum: res.RowNumber[0],
10341237
})
10351238
}

pkg/phlaredb/block_querier_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package phlaredb
33
import (
44
"context"
55
"fmt"
6+
"os"
7+
rpprof "runtime/pprof"
68
"strings"
79
"testing"
810
"time"
@@ -190,5 +192,60 @@ func TestBlockCompatability(t *testing.T) {
190192
})
191193

192194
}
195+
}
196+
197+
func BenchmarkSelect(t *testing.B) {
198+
path := "/Users/christian/parquet-notebook/"
199+
bucket, err := filesystem.NewBucket(path)
200+
require.NoError(t, err)
201+
202+
ctx := context.Background()
203+
metas, err := NewBlockQuerier(ctx, bucket).BlockMetas(ctx)
204+
require.NoError(t, err)
205+
206+
for _, meta := range metas {
207+
208+
q := NewSingleBlockQuerierFromMeta(ctx, bucket, meta)
209+
require.NoError(t, q.Open(ctx))
193210

211+
profilesTypes, err := q.index.LabelValues("__profile_type__")
212+
require.NoError(t, err)
213+
214+
for _, profileType := range profilesTypes {
215+
name := fmt.Sprintf("block-%s-%s", meta.ULID.String(), profileType)
216+
t.Run(name, func(t *testing.B) {
217+
profileTypeParts := strings.Split(profileType, ":")
218+
219+
it, err := q.SelectMatchingProfiles(ctx, &ingestv1.SelectProfilesRequest{
220+
LabelSelector: `{namespace="profiles-ops-001"}`,
221+
Start: 0,
222+
End: time.Now().UnixMilli(),
223+
Type: &typesv1.ProfileType{
224+
Name: profileTypeParts[0],
225+
SampleType: profileTypeParts[1],
226+
SampleUnit: profileTypeParts[2],
227+
PeriodType: profileTypeParts[3],
228+
PeriodUnit: profileTypeParts[4],
229+
},
230+
})
231+
require.NoError(t, err)
232+
233+
f, err := os.Create("heap-after-" + name + ".pprof")
234+
require.NoError(t, err)
235+
236+
require.NoError(t, rpprof.WriteHeapProfile(f))
237+
238+
require.NoError(t, f.Close())
239+
240+
// TODO: It would be nice actually comparing the whole profile, but at present the result is not deterministic.
241+
p, err := q.MergePprof(ctx, it)
242+
243+
var sampleSum int64
244+
for _, s := range p.Sample {
245+
sampleSum += s.Value[0]
246+
}
247+
t.Logf("profileType=%s sum=%d", profileType, sampleSum)
248+
})
249+
}
250+
}
194251
}

pkg/phlaredb/head_queriers.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,12 @@ func (q *headOnDiskQuerier) SelectMatchingProfiles(ctx context.Context, params *
6666
for pIt.Next() {
6767
res := pIt.At()
6868

69-
v, ok := res.Entries[0].RowValue.(fingerprintWithRowNum)
69+
v, ok := res.Entries[0].RowValue.(rowNumWithSomething[model.Fingerprint])
7070
if !ok {
7171
panic("no fingerprint information found")
7272
}
7373

74-
lbls, ok := labelsPerFP[v.fp]
74+
lbls, ok := labelsPerFP[v.elem]
7575
if !ok {
7676
panic("no profile series labels with matching fingerprint found")
7777
}
@@ -83,7 +83,7 @@ func (q *headOnDiskQuerier) SelectMatchingProfiles(ctx context.Context, params *
8383
}
8484
profiles = append(profiles, BlockProfile{
8585
labels: lbls,
86-
fp: v.fp,
86+
fp: v.elem,
8787
ts: model.TimeFromUnixNano(buf[0][0].Int64()),
8888
stacktracePartition: retrieveStacktracePartition(buf, 1),
8989
RowNum: res.RowNumber[0],

pkg/phlaredb/profile_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ func Test_rowRangeIter(t *testing.T) {
168168
result := []int64{}
169169
for it.Next() {
170170
result = append(result, it.At().RowNumber())
171-
assert.Equal(t, model.Fingerprint(0xff), it.At().fp)
171+
assert.Equal(t, model.Fingerprint(0xff), it.At().elem)
172172
}
173173
assert.Equal(t, tc.expected, result)
174174
})
@@ -238,7 +238,7 @@ func Test_rowRangesIter(t *testing.T) {
238238

239239
for it.Next() {
240240
rows = append(rows, it.At().RowNumber())
241-
fingerprints = append(fingerprints, it.At().fp)
241+
fingerprints = append(fingerprints, it.At().elem)
242242
}
243243
assert.Equal(t, tc.expRows, rows)
244244
assert.Equal(t, tc.expFingerprints, fingerprints)

0 commit comments

Comments
 (0)