Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge the query process of series index #505

Merged
merged 14 commits into from
Aug 8, 2024
Merged
125 changes: 45 additions & 80 deletions banyand/internal/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package storage

import (
"context"
"maps"
"path"

"github.com/pkg/errors"
Expand All @@ -27,7 +28,6 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/index/inverted"
"github.com/apache/skywalking-banyandb/pkg/index/posting"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/query"
Expand All @@ -38,7 +38,7 @@ func (s *segment[T, O]) IndexDB() IndexDB {
}

func (s *segment[T, O]) Lookup(ctx context.Context, series []*pbv1.Series) (pbv1.SeriesList, error) {
sl, _, err := s.index.searchPrimary(ctx, series, nil)
sl, _, err := s.index.filter(ctx, series, nil, nil)
return sl, err
}

Expand Down Expand Up @@ -70,18 +70,24 @@ func (s *seriesIndex) Write(docs index.Documents) error {

var rangeOpts = index.RangeOpts{}

func (s *seriesIndex) searchPrimary(ctx context.Context, series []*pbv1.Series, projection []index.FieldKey) (sl pbv1.SeriesList, fields FieldResultList, err error) {
func (s *seriesIndex) filter(ctx context.Context, series []*pbv1.Series,
projection []index.FieldKey, secondaryQuery index.Query,
) (sl pbv1.SeriesList, fields FieldResultList, err error) {
seriesMatchers := make([]index.SeriesMatcher, len(series))
for i := range series {
seriesMatchers[i], err = convertEntityValuesToSeriesMatcher(series[i])
if err != nil {
return nil, nil, err
}
}
indexQuery, err := s.store.BuildQuery(seriesMatchers, secondaryQuery)
if err != nil {
return nil, nil, err
}
tracer := query.GetTracer(ctx)
if tracer != nil {
span, _ := tracer.StartSpan(ctx, "seriesIndex.searchPrimary")
span.Tagf("matchers", "%v", seriesMatchers)
span, _ := tracer.StartSpan(ctx, "seriesIndex.search")
span.Tagf("query", "%s", indexQuery.String())
defer func() {
span.Tagf("matched", "%d", len(sl))
if len(fields) > 0 {
Expand All @@ -93,7 +99,7 @@ func (s *seriesIndex) searchPrimary(ctx context.Context, series []*pbv1.Series,
span.Stop()
}()
}
ss, err := s.store.Search(ctx, seriesMatchers, projection)
ss, err := s.store.Search(ctx, projection, indexQuery)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -191,44 +197,19 @@ func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, opts In
span.Stop()
}()
}
seriesList, fieldResultList, err := s.searchPrimary(ctx, series, opts.Projection)
if err != nil {
return nil, nil, err
}

pl := seriesList.ToList()
if opts.Query != nil {
var plFilter posting.List
func() {
if tracer != nil {
span, _ := tracer.StartSpan(ctx, "filter")
span.Tag("exp", opts.Query.String())
defer func() {
if err != nil {
span.Error(err)
} else {
span.Tagf("matched", "%d", plFilter.Len())
span.Tagf("total", "%d", pl.Len())
}
span.Stop()
}()
}
if plFilter, err = s.store.Execute(ctx, opts.Query); err != nil {
return
}
if plFilter == nil {
return
}
err = pl.Intersect(plFilter)
}()
if opts.Order == nil || opts.Order.Index == nil {
var seriesList pbv1.SeriesList
var fieldResultList FieldResultList
if opts.Query != nil {
seriesList, fieldResultList, err = s.filter(ctx, series, opts.Projection, opts.Query)
} else {
seriesList, fieldResultList, err = s.filter(ctx, series, opts.Projection, nil)
}
if err != nil {
return nil, nil, err
}
}

if opts.Order == nil || opts.Order.Index == nil {
sl, frl = filterSeriesList(seriesList, fieldResultList, pl)
return sl, frl, nil
return seriesList, fieldResultList, nil
}

fieldKey := index.FieldKey{
Expand All @@ -245,65 +226,49 @@ func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, opts In
span.Stop()
}()
}
seriesMatchers := make([]index.SeriesMatcher, len(series))
for i := range series {
seriesMatchers[i], err = convertEntityValuesToSeriesMatcher(series[i])
if err != nil {
return nil, nil, err
}
}
query, err := s.store.BuildQuery(seriesMatchers, opts.Query)
if err != nil {
return nil, nil, err
}
iter, err := s.store.Iterator(fieldKey, rangeOpts,
opts.Order.Sort, opts.PreloadSize)
opts.Order.Sort, opts.PreloadSize, query, opts.Projection)
if err != nil {
return nil, nil, err
}
defer func() {
err = multierr.Append(err, iter.Close())
}()

var sortedSeriesList pbv1.SeriesList
var sortedFieldResultList FieldResultList
var r int
result := make([]index.SeriesDocument, 0, 10)
for iter.Next() {
r++
docID := iter.Val().DocID
if !pl.Contains(docID) {
continue
}
sortedSeriesList, sortedFieldResultList = appendSeriesList(
sortedSeriesList, seriesList,
sortedFieldResultList, fieldResultList,
common.SeriesID(docID))
if err != nil {
return nil, nil, err
}
val := iter.Val()
var doc index.SeriesDocument
doc.Fields = maps.Clone(val.Values)
doc.Key.ID = common.SeriesID(val.DocID)
doc.Key.EntityValues = val.EntityValues
result = append(result, doc)
}
sortedSeriesList, sortedFieldResultList, err := convertIndexSeriesToSeriesList(result, len(opts.Projection) > 0)
if err != nil {
return nil, nil, errors.WithMessagef(err, "failed to convert index series to series list, matchers: %v, matched: %d", seriesMatchers, len(result))
}
if span != nil {
span.Tagf("query", "%s", iter.Query().String())
span.Tagf("rounds", "%d", r)
span.Tagf("size", "%d", len(sortedSeriesList))
}
return sortedSeriesList, sortedFieldResultList, err
}

func filterSeriesList(seriesList pbv1.SeriesList, fieldResultList FieldResultList, filter posting.List) (pbv1.SeriesList, FieldResultList) {
for i := 0; i < len(seriesList); i++ {
if !filter.Contains(uint64(seriesList[i].ID)) {
seriesList = append(seriesList[:i], seriesList[i+1:]...)
if fieldResultList != nil {
fieldResultList = append(fieldResultList[:i], fieldResultList[i+1:]...)
}
i--
}
}
return seriesList, fieldResultList
}

func appendSeriesList(dest, src pbv1.SeriesList, destFRL, srcFRL FieldResultList, target common.SeriesID) (pbv1.SeriesList, FieldResultList) {
for i := 0; i < len(src); i++ {
if target == src[i].ID {
dest = append(dest, src[i])
if srcFRL != nil {
destFRL = append(destFRL, srcFRL[i])
}
break
}
}
return dest, destFRL
}

func (s *seriesIndex) Close() error {
return s.store.Close()
}
2 changes: 1 addition & 1 deletion banyand/internal/storage/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func TestSeriesIndex_Primary(t *testing.T) {
seriesQuery.EntityValues = tt.entityValues[i]
seriesQueries = append(seriesQueries, &seriesQuery)
}
sl, _, err := si.searchPrimary(ctx, seriesQueries, nil)
sl, _, err := si.filter(ctx, seriesQueries, nil, nil)
require.NoError(t, err)
require.Equal(t, len(tt.entityValues), len(sl))
assert.Equal(t, tt.subject, sl[0].Subject)
Expand Down
3 changes: 1 addition & 2 deletions banyand/internal/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/index/inverted"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/query/model"
Expand Down Expand Up @@ -67,7 +66,7 @@ type SupplyTSDB[T TSTable] func() T

// IndexSearchOpts is the options for searching index.
type IndexSearchOpts struct {
Query *inverted.Query
Query index.Query
Order *model.OrderBy
Projection []index.FieldKey
PreloadSize int
Expand Down
26 changes: 14 additions & 12 deletions pkg/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
"fmt"
"io"

"github.com/blugelabs/bluge"

"github.com/apache/skywalking-banyandb/api/common"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
Expand Down Expand Up @@ -97,11 +95,12 @@ func (r RangeOpts) Between(value []byte) int {

// DocumentResult represents a document in an index.
type DocumentResult struct {
Values map[string][]byte
SortedValue []byte
SeriesID common.SeriesID
DocID uint64
Timestamp int64
EntityValues []byte
Values map[string][]byte
SortedValue []byte
SeriesID common.SeriesID
DocID uint64
Timestamp int64
}

// SortedField returns the value of the sorted field.
Expand All @@ -114,6 +113,7 @@ type FieldIterator[T sort.Comparable] interface {
Next() bool
Val() T
Close() error
Query() Query
}

// DummyFieldIterator never iterates.
Expand All @@ -133,6 +133,10 @@ func (i *dummyIterator) Close() error {
return nil
}

func (i *dummyIterator) Query() Query {
return nil
}

// Document represents a document in an index.
type Document struct {
Fields []Field
Expand All @@ -156,7 +160,8 @@ type Writer interface {

// FieldIterable allows building a FieldIterator.
type FieldIterable interface {
Iterator(fieldKey FieldKey, termRange RangeOpts, order modelv1.Sort, preLoadSize int) (iter FieldIterator[*DocumentResult], err error)
BuildQuery(seriesMatchers []SeriesMatcher, secondaryQuery Query) (Query, error)
Iterator(fieldKey FieldKey, termRange RangeOpts, order modelv1.Sort, preLoadSize int, query Query, fieldKeys []FieldKey) (iter FieldIterator[*DocumentResult], err error)
Sort(sids []common.SeriesID, fieldKey FieldKey, order modelv1.Sort, timeRange *timestamp.TimeRange, preLoadSize int) (FieldIterator[*DocumentResult], error)
}

Expand All @@ -171,9 +176,7 @@ type Searcher interface {

// Query is an abstract of an index query.
type Query interface {
bluge.Query
fmt.Stringer
Query() bluge.Query
}

// Store is an abstract of an index repository.
Expand Down Expand Up @@ -204,8 +207,7 @@ type SeriesDocument struct {
type SeriesStore interface {
Store
// Search returns a list of series that match the given matchers.
Search(context.Context, []SeriesMatcher, []FieldKey) ([]SeriesDocument, error)
Execute(context.Context, Query) (posting.List, error)
Search(context.Context, []FieldKey, Query) ([]SeriesDocument, error)
}

// SeriesMatcherType represents the type of series matcher.
Expand Down
Loading
Loading