Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge the query process of series index #505

Merged
merged 14 commits into from
Aug 8, 2024
Merged
125 changes: 46 additions & 79 deletions banyand/internal/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package storage

import (
"context"
"maps"
"path"

"github.com/pkg/errors"
Expand All @@ -27,7 +28,6 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/index/inverted"
"github.com/apache/skywalking-banyandb/pkg/index/posting"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/query"
Expand All @@ -38,7 +38,7 @@ func (s *segment[T, O]) IndexDB() IndexDB {
}

func (s *segment[T, O]) Lookup(ctx context.Context, series []*pbv1.Series) (pbv1.SeriesList, error) {
sl, _, err := s.index.searchPrimary(ctx, series, nil)
sl, _, err := s.index.search(ctx, series, nil, nil)
return sl, err
}

Expand Down Expand Up @@ -70,7 +70,9 @@ func (s *seriesIndex) Write(docs index.Documents) error {

var rangeOpts = index.RangeOpts{}

func (s *seriesIndex) searchPrimary(ctx context.Context, series []*pbv1.Series, projection []index.FieldKey) (sl pbv1.SeriesList, fields FieldResultList, err error) {
func (s *seriesIndex) search(ctx context.Context, series []*pbv1.Series,
projection []index.FieldKey, secondaryQuery index.Query,
) (sl pbv1.SeriesList, fields FieldResultList, err error) {
seriesMatchers := make([]index.SeriesMatcher, len(series))
for i := range series {
seriesMatchers[i], err = convertEntityValuesToSeriesMatcher(series[i])
Expand All @@ -80,8 +82,10 @@ func (s *seriesIndex) searchPrimary(ctx context.Context, series []*pbv1.Series,
}
tracer := query.GetTracer(ctx)
if tracer != nil {
span, _ := tracer.StartSpan(ctx, "seriesIndex.searchPrimary")
span, _ := tracer.StartSpan(ctx, "seriesIndex.search")
// TODO: modify trace information
span.Tagf("matchers", "%v", seriesMatchers)
span.Tagf("query", "%v", secondaryQuery)
defer func() {
span.Tagf("matched", "%d", len(sl))
if len(fields) > 0 {
Expand All @@ -93,7 +97,11 @@ func (s *seriesIndex) searchPrimary(ctx context.Context, series []*pbv1.Series,
span.Stop()
}()
}
ss, err := s.store.Search(ctx, seriesMatchers, projection)
query, err := s.store.BuildQuery(seriesMatchers, secondaryQuery)
if err != nil {
return nil, nil, err
}
ss, err := s.store.Search(ctx, projection, query)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -191,44 +199,19 @@ func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, opts In
span.Stop()
}()
}
seriesList, fieldResultList, err := s.searchPrimary(ctx, series, opts.Projection)
if err != nil {
return nil, nil, err
}

pl := seriesList.ToList()
if opts.Query != nil {
var plFilter posting.List
func() {
if tracer != nil {
span, _ := tracer.StartSpan(ctx, "filter")
span.Tag("exp", opts.Query.String())
defer func() {
if err != nil {
span.Error(err)
} else {
span.Tagf("matched", "%d", plFilter.Len())
span.Tagf("total", "%d", pl.Len())
}
span.Stop()
}()
}
if plFilter, err = s.store.Execute(ctx, opts.Query); err != nil {
return
}
if plFilter == nil {
return
}
err = pl.Intersect(plFilter)
}()
if opts.Order == nil || opts.Order.Index == nil {
var seriesList pbv1.SeriesList
var fieldResultList FieldResultList
if opts.Query != nil {
seriesList, fieldResultList, err = s.search(ctx, series, opts.Projection, opts.Query)
} else {
seriesList, fieldResultList, err = s.search(ctx, series, opts.Projection, nil)
}
if err != nil {
return nil, nil, err
}
}

if opts.Order == nil || opts.Order.Index == nil {
sl, frl = filterSeriesList(seriesList, fieldResultList, pl)
return sl, frl, nil
return seriesList, fieldResultList, nil
}

fieldKey := index.FieldKey{
Expand All @@ -245,31 +228,41 @@ func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, opts In
span.Stop()
}()
}
seriesMatchers := make([]index.SeriesMatcher, len(series))
for i := range series {
seriesMatchers[i], err = convertEntityValuesToSeriesMatcher(series[i])
if err != nil {
return nil, nil, err
}
}
var query index.Query
query, err = s.store.BuildQuery(seriesMatchers, opts.Query)
if err != nil {
return nil, nil, err
}
iter, err := s.store.Iterator(fieldKey, rangeOpts,
opts.Order.Sort, opts.PreloadSize)
opts.Order.Sort, opts.PreloadSize, query, opts.Projection)
if err != nil {
return nil, nil, err
}
defer func() {
err = multierr.Append(err, iter.Close())
}()

var sortedSeriesList pbv1.SeriesList
var sortedFieldResultList FieldResultList
var r int
result := make([]index.SeriesDocument, 0, 10)
for iter.Next() {
r++
docID := iter.Val().DocID
if !pl.Contains(docID) {
continue
}
sortedSeriesList, sortedFieldResultList = appendSeriesList(
sortedSeriesList, seriesList,
sortedFieldResultList, fieldResultList,
common.SeriesID(docID))
if err != nil {
return nil, nil, err
}
val := iter.Val()
var doc index.SeriesDocument
doc.Fields = maps.Clone(val.Values)
doc.Key.ID = common.SeriesID(val.DocID)
doc.Key.EntityValues = val.EntityValues
result = append(result, doc)
}
sortedSeriesList, sortedFieldResultList, err := convertIndexSeriesToSeriesList(result, len(opts.Projection) > 0)
if err != nil {
return nil, nil, errors.WithMessagef(err, "failed to convert index series to series list, matchers: %v, matched: %d", seriesMatchers, len(result))
}
if span != nil {
span.Tagf("rounds", "%d", r)
Expand All @@ -278,32 +271,6 @@ func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, opts In
return sortedSeriesList, sortedFieldResultList, err
}

func filterSeriesList(seriesList pbv1.SeriesList, fieldResultList FieldResultList, filter posting.List) (pbv1.SeriesList, FieldResultList) {
for i := 0; i < len(seriesList); i++ {
if !filter.Contains(uint64(seriesList[i].ID)) {
seriesList = append(seriesList[:i], seriesList[i+1:]...)
if fieldResultList != nil {
fieldResultList = append(fieldResultList[:i], fieldResultList[i+1:]...)
}
i--
}
}
return seriesList, fieldResultList
}

func appendSeriesList(dest, src pbv1.SeriesList, destFRL, srcFRL FieldResultList, target common.SeriesID) (pbv1.SeriesList, FieldResultList) {
for i := 0; i < len(src); i++ {
if target == src[i].ID {
dest = append(dest, src[i])
if srcFRL != nil {
destFRL = append(destFRL, srcFRL[i])
}
break
}
}
return dest, destFRL
}

func (s *seriesIndex) Close() error {
return s.store.Close()
}
2 changes: 1 addition & 1 deletion banyand/internal/storage/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func TestSeriesIndex_Primary(t *testing.T) {
seriesQuery.EntityValues = tt.entityValues[i]
seriesQueries = append(seriesQueries, seriesQuery)
}
sl, _, err := si.searchPrimary(ctx, seriesQueries, nil)
sl, _, err := si.search(ctx, seriesQueries, nil, nil)
require.NoError(t, err)
require.Equal(t, len(tt.entityValues), len(sl))
assert.Equal(t, tt.subject, sl[0].Subject)
Expand Down
3 changes: 1 addition & 2 deletions banyand/internal/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/index/inverted"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/query/model"
Expand Down Expand Up @@ -67,7 +66,7 @@ type SupplyTSDB[T TSTable] func() T

// IndexSearchOpts is the options for searching index.
type IndexSearchOpts struct {
Query *inverted.Query
Query index.Query
Order *model.OrderBy
Projection []index.FieldKey
PreloadSize int
Expand Down
21 changes: 9 additions & 12 deletions pkg/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
"fmt"
"io"

"github.com/blugelabs/bluge"

"github.com/apache/skywalking-banyandb/api/common"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
Expand Down Expand Up @@ -97,11 +95,12 @@ func (r RangeOpts) Between(value []byte) int {

// DocumentResult represents a document in an index.
type DocumentResult struct {
Values map[string][]byte
SortedValue []byte
SeriesID common.SeriesID
DocID uint64
Timestamp int64
EntityValues []byte
Values map[string][]byte
SortedValue []byte
SeriesID common.SeriesID
DocID uint64
Timestamp int64
}

// SortedField returns the value of the sorted field.
Expand Down Expand Up @@ -156,7 +155,8 @@ type Writer interface {

// FieldIterable allows building a FieldIterator.
type FieldIterable interface {
Iterator(fieldKey FieldKey, termRange RangeOpts, order modelv1.Sort, preLoadSize int) (iter FieldIterator[*DocumentResult], err error)
BuildQuery(seriesMatchers []SeriesMatcher, secondaryQuery Query) (Query, error)
Iterator(fieldKey FieldKey, termRange RangeOpts, order modelv1.Sort, preLoadSize int, query Query, fieldKeys []FieldKey) (iter FieldIterator[*DocumentResult], err error)
Sort(sids []common.SeriesID, fieldKey FieldKey, order modelv1.Sort, timeRange *timestamp.TimeRange, preLoadSize int) (FieldIterator[*DocumentResult], error)
}

Expand All @@ -171,9 +171,7 @@ type Searcher interface {

// Query is an abstract of an index query.
type Query interface {
bluge.Query
fmt.Stringer
Query() bluge.Query
}

// Store is an abstract of an index repository.
Expand Down Expand Up @@ -204,8 +202,7 @@ type SeriesDocument struct {
type SeriesStore interface {
Store
// Search returns a list of series that match the given matchers.
Search(context.Context, []SeriesMatcher, []FieldKey) ([]SeriesDocument, error)
Execute(context.Context, Query) (posting.List, error)
Search(context.Context, []FieldKey, Query) ([]SeriesDocument, error)
}

// SeriesMatcherType represents the type of series matcher.
Expand Down
46 changes: 19 additions & 27 deletions pkg/index/inverted/inverted.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ func (s *store) Close() error {
return s.writer.Close()
}

func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts,
order modelv1.Sort, preLoadSize int,
func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, order modelv1.Sort,
preLoadSize int, indexQuery index.Query, fieldKeys []index.FieldKey,
) (iter index.FieldIterator[*index.DocumentResult], err error) {
if termRange.Lower != nil &&
termRange.Upper != nil &&
Expand All @@ -191,7 +191,7 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts,
return nil, err
}
fk := fieldKey.Marshal()
query := bluge.NewBooleanQuery()
rangeQuery := bluge.NewBooleanQuery()
addRange := func(query *bluge.BooleanQuery, termRange index.RangeOpts) *bluge.BooleanQuery {
if termRange.Upper == nil {
termRange.Upper = defaultUpper
Expand All @@ -210,21 +210,31 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts,
}

if fieldKey.HasSeriesID() {
query = query.AddMust(bluge.NewTermQuery(string(fieldKey.SeriesID.Marshal())).
rangeQuery = rangeQuery.AddMust(bluge.NewTermQuery(string(fieldKey.SeriesID.Marshal())).
SetField(seriesIDField))
if termRange.Lower != nil || termRange.Upper != nil {
query = addRange(query, termRange)
rangeQuery = addRange(rangeQuery, termRange)
}
} else {
query = addRange(query, termRange)
rangeQuery = addRange(rangeQuery, termRange)
}

sortedKey := fk
if order == modelv1.Sort_SORT_DESC {
sortedKey = "-" + sortedKey
}
query := bluge.NewBooleanQuery().AddMust(rangeQuery)
if indexQuery != nil && indexQuery.(*Query).query != nil {
query.AddMust(indexQuery.(*Query).query)
}
fields := make([]string, 0, len(fieldKeys))
for i := range fieldKeys {
fields = append(fields, fieldKeys[i].Marshal())
}
// TODO: add trace information
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we finish todo in this pr?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. We should trace the query before merging it.

result := &sortIterator{
query: query,
fields: fields,
reader: reader,
sortedKey: sortedKey,
size: preLoadSize,
Expand Down Expand Up @@ -298,7 +308,7 @@ func (s *store) Match(fieldKey index.FieldKey, matches []string) (posting.List,
}

func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list posting.List, err error) {
iter, err := s.Iterator(fieldKey, opts, modelv1.Sort_SORT_ASC, defaultRangePreloadSize)
iter, err := s.Iterator(fieldKey, opts, modelv1.Sort_SORT_ASC, defaultRangePreloadSize, nil, nil)
if err != nil {
return roaring.DummyPostingList, err
}
Expand All @@ -310,26 +320,6 @@ func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list posti
return
}

func (s *store) Execute(ctx context.Context, query index.Query) (posting.List, error) {
reader, err := s.writer.Reader()
if err != nil {
return nil, err
}
documentMatchIterator, err := reader.Search(ctx, bluge.NewAllMatches(query.Query()))
if err != nil {
return nil, err
}
iter := newBlugeMatchIterator(documentMatchIterator, reader, nil)
defer func() {
err = multierr.Append(err, iter.Close())
}()
list := roaring.NewPostingList()
for iter.Next() {
list.Insert(iter.Val().DocID)
}
return list, err
}

func (s *store) SizeOnDisk() int64 {
_, bytes := s.writer.DirectoryStats()
return int64(bytes)
Expand Down Expand Up @@ -380,6 +370,8 @@ func (bmi *blugeMatchIterator) Next() bool {
}
err := match.VisitStoredFields(func(field string, value []byte) bool {
switch field {
case entityField:
bmi.current.EntityValues = value
case docIDField:
bmi.current.DocID = convert.BytesToUint64(value)
case seriesIDField:
Expand Down
Loading
Loading