Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 47 additions & 56 deletions pkg/dataobj/index/calculate.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,9 @@ import (
"io"
"runtime"
"sync"
"time"

"github.com/bits-and-blooms/bloom/v3"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/prometheus/model/labels"
"golang.org/x/sync/errgroup"

"github.com/grafana/loki/v3/pkg/dataobj"
Expand All @@ -22,15 +19,38 @@ import (
"github.com/grafana/loki/v3/pkg/dataobj/sections/streams"
)

type logsIndexCalculation interface {
Prepare(ctx context.Context, section *dataobj.Section, stats logs.Stats) error
ProcessBatch(ctx context.Context, context *logsCalculationContext, batch []logs.Record) error
Flush(ctx context.Context, context *logsCalculationContext) error
}

type logsCalculationContext struct {
tenantID string
objectPath string
sectionIdx int64
streamIDLookup map[int64]int64
builderMtx *sync.Mutex
builder *indexobj.Builder
}

// These steps are applied to all logs and are unique to a section
func getLogsCalculationSteps() []logsIndexCalculation {
return []logsIndexCalculation{
&streamStatisticsCalculation{},
&columnValuesCalculation{},
}
}

// Calculator is used to calculate the indexes for a logs object and write them to the builder.
// It reads data from the logs object in order to build bloom filters and per-section stream metadata.
type Calculator struct {
indexobjBuilder *indexobj.Builder
builderMtx sync.Mutex
builderMtx *sync.Mutex
}

func NewCalculator(indexobjBuilder *indexobj.Builder) *Calculator {
return &Calculator{indexobjBuilder: indexobjBuilder}
return &Calculator{indexobjBuilder: indexobjBuilder, builderMtx: &sync.Mutex{}}
}

func (c *Calculator) Reset() {
Expand Down Expand Up @@ -138,14 +158,6 @@ func (c *Calculator) processStreamsSection(ctx context.Context, section *dataobj
// processLogsSection reads information from the logs section in order to build index information in the c.indexobjBuilder.
func (c *Calculator) processLogsSection(ctx context.Context, sectionLogger log.Logger, objectPath string, section *dataobj.Section, sectionIdx int64, streamIDLookup map[int64]int64) error {
logsBuf := make([]logs.Record, 8192)
type logInfo struct {
objectPath string
sectionIdx int64
streamID int64
timestamp time.Time
length int64
}
logsInfo := make([]logInfo, len(logsBuf))

logsSection, err := logs.Open(ctx, section)
if err != nil {
Expand All @@ -160,22 +172,25 @@ func (c *Calculator) processLogsSection(ctx context.Context, sectionLogger log.L
return fmt.Errorf("failed to read log section stats: %w", err)
}

columnBloomBuilders := make(map[string]*bloom.BloomFilter)
columnIndexes := make(map[string]int64)
for _, column := range stats.Columns {
logsType, _ := logs.ParseColumnType(column.Type)
if logsType != logs.ColumnTypeMetadata {
continue
calculationContext := &logsCalculationContext{
tenantID: tenantID,
objectPath: objectPath,
sectionIdx: sectionIdx,
streamIDLookup: streamIDLookup,
builderMtx: c.builderMtx,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: sharing mutex pointers can be a source of bugs because the spread of where logic is defined gets wider and it can be harder to keep track of making sure the mutex is being used properly in all places.

I'd recommend considering changing the semantics such that:

  • The mutex is locked whenever we pass the logsCalculationContext to ProcessBatch/Flush
  • We clarify in the interface API that implementations must not retain the builder after the call returns

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, I've made the changes you suggested and locked the mutex before calling the step implementations.

builder: c.indexobjBuilder,
}

calculationSteps := getLogsCalculationSteps()

for _, calculation := range calculationSteps {
if err := calculation.Prepare(ctx, section, stats); err != nil {
return fmt.Errorf("failed to prepare calculation: %w", err)
}
columnBloomBuilders[column.Name] = bloom.NewWithEstimates(uint(column.Cardinality), 1.0/128.0)
columnIndexes[column.Name] = column.ColumnIndex
}

// Read the whole logs section to extract all the column values.
cnt := 0
// TODO(benclive): Switch to a columnar reader instead of row based
// This is also likely to be more performant, especially if we don't need to read the whole log line.
// Note: the source object would need a new column storing just the length to avoid reading the log line itself.
cnt := 0
rowReader := logs.NewRowReader(logsSection)
for {
n, err := rowReader.Read(ctx, logsBuf)
Expand All @@ -186,41 +201,17 @@ func (c *Calculator) processLogsSection(ctx context.Context, sectionLogger log.L
break
}

for i, log := range logsBuf[:n] {
cnt++
log.Metadata.Range(func(md labels.Label) {
columnBloomBuilders[md.Name].Add([]byte(md.Value))
})
logsInfo[i].objectPath = objectPath
logsInfo[i].sectionIdx = sectionIdx
logsInfo[i].streamID = log.StreamID
logsInfo[i].timestamp = log.Timestamp
logsInfo[i].length = int64(len(log.Line))
}

// Lock the mutex once per read for perf reasons.
c.builderMtx.Lock()
for _, log := range logsInfo[:n] {
err = c.indexobjBuilder.ObserveLogLine(tenantID, log.objectPath, log.sectionIdx, log.streamID, streamIDLookup[log.streamID], log.timestamp, log.length)
if err != nil {
c.builderMtx.Unlock()
return fmt.Errorf("failed to observe log line: %w", err)
cnt += n
for _, calculation := range calculationSteps {
if err := calculation.ProcessBatch(ctx, calculationContext, logsBuf[:n]); err != nil {
return fmt.Errorf("failed to process batch: %w", err)
}
}
c.builderMtx.Unlock()
}

// Write the indexes (bloom filters) to the new index object.
for columnName, bloom := range columnBloomBuilders {
bloomBytes, err := bloom.MarshalBinary()
if err != nil {
return fmt.Errorf("failed to marshal bloom filter: %w", err)
}
c.builderMtx.Lock()
err = c.indexobjBuilder.AppendColumnIndex(tenantID, objectPath, sectionIdx, columnName, columnIndexes[columnName], bloomBytes)
c.builderMtx.Unlock()
if err != nil {
return fmt.Errorf("failed to append column index: %w", err)
for _, calculation := range calculationSteps {
if err := calculation.Flush(ctx, calculationContext); err != nil {
return fmt.Errorf("failed to flush calculation results: %w", err)
}
}

Expand Down
56 changes: 56 additions & 0 deletions pkg/dataobj/index/column_values.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package index

import (
"context"
"fmt"

"github.com/bits-and-blooms/bloom/v3"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/sections/logs"
"github.com/prometheus/prometheus/model/labels"
)

type columnValuesCalculation struct {
columnBloomBuilders map[string]*bloom.BloomFilter
columnIndexes map[string]int64
}

func (c *columnValuesCalculation) Prepare(ctx context.Context, _ *dataobj.Section, stats logs.Stats) error {
c.columnBloomBuilders = make(map[string]*bloom.BloomFilter)
c.columnIndexes = make(map[string]int64)

for _, column := range stats.Columns {
logsType, _ := logs.ParseColumnType(column.Type)
if logsType != logs.ColumnTypeMetadata {
continue
}
c.columnBloomBuilders[column.Name] = bloom.NewWithEstimates(uint(column.Cardinality), 1.0/128.0)
c.columnIndexes[column.Name] = column.ColumnIndex
}
return nil
}

func (c *columnValuesCalculation) ProcessBatch(ctx context.Context, _ *logsCalculationContext, batch []logs.Record) error {
for _, log := range batch {
log.Metadata.Range(func(md labels.Label) {
c.columnBloomBuilders[md.Name].Add([]byte(md.Value))
})
}
return nil
}

func (c *columnValuesCalculation) Flush(ctx context.Context, context *logsCalculationContext) error {
context.builderMtx.Lock()
defer context.builderMtx.Unlock()
for columnName, bloom := range c.columnBloomBuilders {
bloomBytes, err := bloom.MarshalBinary()
if err != nil {
return fmt.Errorf("failed to marshal bloom filter: %w", err)
}
err = context.builder.AppendColumnIndex(context.tenantID, context.objectPath, context.sectionIdx, columnName, c.columnIndexes[columnName], bloomBytes)
if err != nil {
return fmt.Errorf("failed to append column index: %w", err)
}
}
return nil
}
31 changes: 31 additions & 0 deletions pkg/dataobj/index/stream_statistics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package index

import (
"context"
"fmt"

"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/sections/logs"
)

type streamStatisticsCalculation struct{}

func (c *streamStatisticsCalculation) Prepare(ctx context.Context, section *dataobj.Section, stats logs.Stats) error {
return nil
}

func (c *streamStatisticsCalculation) ProcessBatch(ctx context.Context, context *logsCalculationContext, batch []logs.Record) error {
context.builderMtx.Lock()
defer context.builderMtx.Unlock()
for _, log := range batch {
err := context.builder.ObserveLogLine(context.tenantID, context.objectPath, context.sectionIdx, log.StreamID, context.streamIDLookup[log.StreamID], log.Timestamp, int64(len(log.Line)))
if err != nil {
return fmt.Errorf("failed to observe log line: %w", err)
}
}
return nil
}

func (c *streamStatisticsCalculation) Flush(ctx context.Context, context *logsCalculationContext) error {
return nil
}
Loading