diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index bea2d1899677a..d88c723bd0b18 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -1155,6 +1155,14 @@ dataobj: # CLI flag: -dataobj-index-builder.events-per-index [events_per_index: | default = 32] + # Experimental: How often to check for stale partitions to flush + # CLI flag: -dataobj-index-builder.flush-interval + [flush_interval: | default = 1m] + + # Experimental: Maximum time to wait before flushing buffered events + # CLI flag: -dataobj-index-builder.max-idle-time + [max_idle_time: | default = 30m] + metastore: # Experimental: A prefix to use for storing indexes in object storage. Used # for testing only. diff --git a/pkg/dataobj/index/builder.go b/pkg/dataobj/index/builder.go index 96993a49036e4..bf471c5a04374 100644 --- a/pkg/dataobj/index/builder.go +++ b/pkg/dataobj/index/builder.go @@ -1,12 +1,8 @@ package index import ( - "bytes" "context" - "crypto/sha256" - "encoding/hex" "errors" - "flag" "fmt" "io" "sync" @@ -15,7 +11,6 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/backoff" - "github.com/grafana/dskit/multierror" "github.com/grafana/dskit/services" "github.com/prometheus/client_golang/prometheus" "github.com/thanos-io/objstore" @@ -32,18 +27,33 @@ import ( var ErrPartitionRevoked = errors.New("partition revoked") -type Config struct { - indexobj.BuilderConfig `yaml:",inline"` - EventsPerIndex int `yaml:"events_per_index" experimental:"true"` +type triggerType string + +const ( + triggerTypeAppend triggerType = "append" + triggerTypeMaxIdle triggerType = "max-idle" +) + +func (tt triggerType) String() string { + switch tt { + case triggerTypeAppend: + return "append" + case triggerTypeMaxIdle: + return "max-idle" + default: + return "unknown" + } } -func (cfg *Config) RegisterFlags(f *flag.FlagSet) { - cfg.RegisterFlagsWithPrefix("dataobj-index-builder.", f) +type bufferedEvent struct { + event metastore.ObjectWrittenEvent + record *kgo.Record } -func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { - cfg.BuilderConfig.RegisterFlagsWithPrefix(prefix, f) - f.IntVar(&cfg.EventsPerIndex, prefix+"events-per-index", 32, "Experimental: The number of events to batch before building an index") +type partitionState struct { + events []bufferedEvent + lastActivity time.Time + isProcessing bool } type downloadedObject struct { @@ -81,31 +91,21 @@ type Builder struct { client kafkaClient topic string - // Processing pipeline - downloadQueue chan metastore.ObjectWrittenEvent - downloadedObjects chan downloadedObject - calculator calculator - tocWriter *metastore.TableOfContentsWriter - - bufferedEvents map[int32][]metastore.ObjectWrittenEvent + // Indexer handles all index building + indexer indexer - // Builder initialization - builderCfg indexobj.BuilderConfig - objectBucket objstore.Bucket - indexStorageBucket objstore.Bucket // The bucket to store the indexes might not be the same one as where we read the objects from - scratchStore scratch.Store + // Partition management only + partitionStates map[int32]*partitionState + flushTicker *time.Ticker - // Metrics - metrics *indexBuilderMetrics + // Only kafka commit functionality + metrics *builderMetrics // Control and coordination - ctx context.Context - cancel context.CancelCauseFunc - wg sync.WaitGroup - logger log.Logger - activeCalculationPartition int32 - cancelActiveCalculation context.CancelCauseFunc - partitionsMutex sync.Mutex + wg sync.WaitGroup + logger log.Logger + partitionsMutex sync.Mutex + activeCalculations map[int32]context.CancelCauseFunc } func NewIndexBuilder( @@ -123,11 +123,17 @@ func NewIndexBuilder( "component": "index_builder", }, reg) - metrics := newIndexBuilderMetrics() - if err := metrics.register(builderReg); err != nil { + builderMetrics := newBuilderMetrics() + if err := builderMetrics.register(builderReg); err != nil { return nil, fmt.Errorf("failed to register metrics for index builder: %w", err) } + indexerMetrics := newIndexerMetrics() + if err := indexerMetrics.register(builderReg); err != nil { + return nil, fmt.Errorf("failed to register indexer metrics: %w", err) + } + + // Create index building dependencies builder, err := indexobj.NewBuilder(cfg.BuilderConfig, scratchStore) if err != nil { return nil, fmt.Errorf("failed to create index builder: %w", err) @@ -135,31 +141,31 @@ func NewIndexBuilder( calculator := NewCalculator(builder) indexStorageBucket := objstore.NewPrefixedBucket(bucket, mCfg.IndexStoragePrefix) - tocWriter := metastore.NewTableOfContentsWriter(indexStorageBucket, logger) if err := builder.RegisterMetrics(builderReg); err != nil { return nil, fmt.Errorf("failed to register metrics for index builder: %w", err) } - // Set up queues to download the next object (I/O bound) while processing the current one (CPU bound) in order to maximize throughput. - // Setting the channel buffer sizes caps the total memory usage by only keeping up to 3 objects in memory at a time: One being processed, one fully downloaded and one being downloaded from the queue. - downloadQueue := make(chan metastore.ObjectWrittenEvent, cfg.EventsPerIndex) - downloadedObjects := make(chan downloadedObject, 1) - s := &Builder{ cfg: cfg, mCfg: mCfg, logger: logger, - objectBucket: bucket, - indexStorageBucket: indexStorageBucket, - tocWriter: tocWriter, - downloadedObjects: downloadedObjects, - downloadQueue: downloadQueue, - metrics: metrics, - calculator: calculator, - bufferedEvents: make(map[int32][]metastore.ObjectWrittenEvent), + metrics: builderMetrics, + partitionStates: make(map[int32]*partitionState), + activeCalculations: make(map[int32]context.CancelCauseFunc), } + // Create self-contained indexer + s.indexer = newSerialIndexer( + calculator, + bucket, + indexStorageBucket, + builderMetrics, + indexerMetrics, + logger, + indexerConfig{QueueSize: 64}, + ) + kafkaCfg.AutoCreateTopicEnabled = true eventConsumerClient, err := client.NewReaderClient( "index_builder", @@ -180,7 +186,7 @@ func NewIndexBuilder( } s.client = eventConsumerClient - s.Service = services.NewBasicService(nil, s.run, s.stopping) + s.Service = services.NewBasicService(nil, s.running, s.stopping) return s, nil } @@ -191,7 +197,11 @@ func (p *Builder) handlePartitionsAssigned(_ context.Context, _ *kgo.Client, top for _, partitions := range topics { for _, partition := range partitions { - p.bufferedEvents[partition] = make([]metastore.ObjectWrittenEvent, 0) + p.partitionStates[partition] = &partitionState{ + events: make([]bufferedEvent, 0), + lastActivity: time.Now(), + isProcessing: false, + } } } } @@ -203,53 +213,55 @@ func (p *Builder) handlePartitionsRevoked(_ context.Context, _ *kgo.Client, topi for _, partitions := range topics { for _, partition := range partitions { - delete(p.bufferedEvents, partition) - if p.activeCalculationPartition == partition && p.cancelActiveCalculation != nil { - p.cancelActiveCalculation(ErrPartitionRevoked) + delete(p.partitionStates, partition) + + // Cancel any active calculations + if cancel, exists := p.activeCalculations[partition]; exists { + cancel(ErrPartitionRevoked) + delete(p.activeCalculations, partition) } } } } -func (p *Builder) run(ctx context.Context) error { - p.ctx, p.cancel = context.WithCancelCause(ctx) - - p.wg.Add(1) - go func() { - // Download worker - defer p.wg.Done() - for event := range p.downloadQueue { - objLogger := log.With(p.logger, "object_path", event.ObjectPath) - downloadStart := time.Now() - - objectReader, err := p.objectBucket.Get(p.ctx, event.ObjectPath) - if err != nil { - p.downloadedObjects <- downloadedObject{ - event: event, - err: fmt.Errorf("failed to fetch object from storage: %w", err), - } - continue - } +func (p *Builder) running(ctx context.Context) error { + // Start indexer service first + if err := p.indexer.StartAsync(ctx); err != nil { + return fmt.Errorf("failed to start indexer service: %w", err) + } + if err := p.indexer.AwaitRunning(ctx); err != nil { + return fmt.Errorf("indexer service failed to start: %w", err) + } - object, err := io.ReadAll(objectReader) - _ = objectReader.Close() - if err != nil { - p.downloadedObjects <- downloadedObject{ - event: event, - err: fmt.Errorf("failed to read object: %w", err), + // Start flush worker if configured + if p.cfg.FlushInterval > 0 { + p.flushTicker = time.NewTicker(p.cfg.FlushInterval) + p.wg.Add(1) + go func() { + defer p.wg.Done() + defer p.flushTicker.Stop() + + for { + select { + case <-p.flushTicker.C: + p.checkAndFlushStalePartitions(ctx) + case <-ctx.Done(): + return } - continue - } - level.Info(objLogger).Log("msg", "downloaded object", "duration", time.Since(downloadStart), "size_mb", float64(len(object))/1024/1024, "avg_speed_mbps", float64(len(object))/time.Since(downloadStart).Seconds()/1024/1024) - p.downloadedObjects <- downloadedObject{ - event: event, - objectBytes: &object, } - } - }() + }() + } level.Info(p.logger).Log("msg", "started index builder service") + + // Main Kafka processing loop for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + fetches := p.client.PollRecords(ctx, -1) if err := fetches.Err0(); err != nil { if errors.Is(err, kgo.ErrClientClosed) || errors.Is(err, context.Canceled) { @@ -266,199 +278,204 @@ func (p *Builder) run(ctx context.Context) error { level.Error(p.logger).Log("msg", "failed to fetch records for topic partition", "topic", fetch.Topic, "partition", fetch.Partition, "err", err.Error()) return } - // TODO(benclive): Verify if we need to return re-poll ASAP or if sequential processing is good enough. for _, record := range fetch.Records { - p.processRecord(record) + p.processRecord(ctx, record) } }) } } func (p *Builder) stopping(failureCase error) error { - close(p.downloadQueue) - p.cancel(failureCase) + // Stop indexer service first - this handles calculation cleanup via context cancellation + p.indexer.StopAsync() + if err := p.indexer.AwaitTerminated(context.Background()); err != nil { + level.Error(p.logger).Log("msg", "failed to stop indexer service", "err", err) + } + + // Stop other components + if p.flushTicker != nil { + p.flushTicker.Stop() + } p.wg.Wait() - close(p.downloadedObjects) p.client.Close() - return nil + return failureCase } // processRecord processes a single record. It is not safe for concurrent use. -func (p *Builder) processRecord(record *kgo.Record) { - calculationCtx, eventsToIndex := p.appendRecord(record) - if len(eventsToIndex) < p.cfg.EventsPerIndex { +func (p *Builder) processRecord(ctx context.Context, record *kgo.Record) { + calculationCtx, eventsToIndex := p.appendRecord(ctx, record) + if len(eventsToIndex) == 0 { return } defer p.cleanupPartition(record.Partition) - // Build the index. - err := p.buildIndex(calculationCtx, eventsToIndex) + // Submit to indexer service and wait for completion + records, err := p.indexer.submitBuild(calculationCtx, eventsToIndex, record.Partition, triggerTypeAppend) if err != nil { if errors.Is(context.Cause(calculationCtx), ErrPartitionRevoked) { - level.Debug(p.logger).Log("msg", "partition revoked, aborting index build", "partition", p.activeCalculationPartition) + level.Debug(p.logger).Log("msg", "partition revoked, aborting index build", "partition", record.Partition) return } - level.Error(p.logger).Log("msg", "failed to build index", "err", err, "partition", p.activeCalculationPartition) + level.Error(p.logger).Log("msg", "failed to build index", "err", err, "partition", record.Partition) return } - // Commit back to the partition we just built. This is always the record we just received, otherwise we would not have triggered the build. - if err := p.commitRecords(calculationCtx, record); err != nil { + // Commit the records + if err := p.commitRecords(calculationCtx, records); err != nil { if errors.Is(context.Cause(calculationCtx), ErrPartitionRevoked) { - level.Debug(p.logger).Log("msg", "partition revoked, aborting index commit", "partition", p.activeCalculationPartition) + level.Debug(p.logger).Log("msg", "partition revoked, aborting index commit", "partition", record.Partition) return } - level.Error(p.logger).Log("msg", "failed to commit records", "err", err, "partition", p.activeCalculationPartition) + level.Error(p.logger).Log("msg", "failed to commit records", "err", err, "partition", record.Partition) return } } -// Appends a record and returns a slice of records to index. The slice will be empty if no indexing is required. -func (p *Builder) appendRecord(record *kgo.Record) (context.Context, []metastore.ObjectWrittenEvent) { - p.partitionsMutex.Lock() - defer p.partitionsMutex.Unlock() - +// Appends a record and returns a slice of buffered events to index. The slice will be empty if no indexing is required. +func (p *Builder) appendRecord(ctx context.Context, record *kgo.Record) (context.Context, []bufferedEvent) { event := &metastore.ObjectWrittenEvent{} if err := event.Unmarshal(record.Value); err != nil { level.Error(p.logger).Log("msg", "failed to unmarshal metastore event", "err", err) return nil, nil } - _, ok := p.bufferedEvents[record.Partition] - if !ok { - // We don't own this partition anymore as it was just revoked. Abort further processing. - return nil, nil - } - - p.bufferedEvents[record.Partition] = append(p.bufferedEvents[record.Partition], *event) - level.Debug(p.logger).Log("msg", "buffered new event for partition", "count", len(p.bufferedEvents[record.Partition]), "partition", record.Partition) - - if len(p.bufferedEvents[record.Partition]) < p.cfg.EventsPerIndex { - // No more work to do - return nil, nil + bufferedEvt := &bufferedEvent{ + event: *event, + record: record, } - var calculationCtx context.Context - eventsToIndex := make([]metastore.ObjectWrittenEvent, len(p.bufferedEvents[record.Partition])) - copy(eventsToIndex, p.bufferedEvents[record.Partition]) - - p.activeCalculationPartition = record.Partition - calculationCtx, p.cancelActiveCalculation = context.WithCancelCause(p.ctx) - - return calculationCtx, eventsToIndex + return p.bufferAndTryProcess(ctx, record.Partition, bufferedEvt, triggerTypeAppend) } func (p *Builder) cleanupPartition(partition int32) { p.partitionsMutex.Lock() defer p.partitionsMutex.Unlock() - p.cancelActiveCalculation(nil) + // Cancel active calculation for this partition + if cancel, exists := p.activeCalculations[partition]; exists { + cancel(nil) + delete(p.activeCalculations, partition) + } - if _, ok := p.bufferedEvents[partition]; ok { - // We still own this partition, so truncate the events for future processing. - p.bufferedEvents[partition] = p.bufferedEvents[partition][:0] + if state, ok := p.partitionStates[partition]; ok { + // Clear processed events and reset processing flag + state.events = state.events[:0] + state.isProcessing = false + state.lastActivity = time.Now() } } -func (p *Builder) buildIndex(ctx context.Context, events []metastore.ObjectWrittenEvent) error { - level.Debug(p.logger).Log("msg", "building index", "events", len(events), "partition", p.activeCalculationPartition) - start := time.Now() +func (p *Builder) checkAndFlushStalePartitions(ctx context.Context) { + p.partitionsMutex.Lock() + partitionsToFlush := make([]int32, 0) - // Observe processing delay - writeTime, err := time.Parse(time.RFC3339, events[0].WriteTime) - if err != nil { - level.Error(p.logger).Log("msg", "failed to parse write time", "err", err) - return err + for partition, state := range p.partitionStates { + if !state.isProcessing && + time.Since(state.lastActivity) >= p.cfg.MaxIdleTime { + partitionsToFlush = append(partitionsToFlush, partition) + } } - p.metrics.setProcessingDelay(writeTime) + p.partitionsMutex.Unlock() - // Trigger the downloads - for _, event := range events { - p.downloadQueue <- event + for _, partition := range partitionsToFlush { + p.flushPartition(ctx, partition) } +} - // Process the results as they are downloaded - processingErrors := multierror.New() - for i := 0; i < len(events); i++ { - obj := <-p.downloadedObjects - objLogger := log.With(p.logger, "object_path", obj.event.ObjectPath) - level.Debug(objLogger).Log("msg", "processing object") +func (p *Builder) flushPartition(ctx context.Context, partition int32) { + calculationCtx, eventsToFlush := p.bufferAndTryProcess(ctx, partition, nil, triggerTypeMaxIdle) + if len(eventsToFlush) == 0 { + return + } - if obj.err != nil { - processingErrors.Add(fmt.Errorf("failed to download object: %w", obj.err)) - continue - } + p.wg.Add(1) + go func() { + defer p.wg.Done() + defer p.cleanupPartition(partition) + + level.Info(p.logger).Log("msg", "flushing stale partition", + "partition", partition, "events", len(eventsToFlush)) - reader, err := dataobj.FromReaderAt(bytes.NewReader(*obj.objectBytes), int64(len(*obj.objectBytes))) + // Submit to indexer service and wait for completion + records, err := p.indexer.submitBuild(calculationCtx, eventsToFlush, partition, triggerTypeMaxIdle) if err != nil { - processingErrors.Add(fmt.Errorf("failed to read object: %w", err)) - continue + if errors.Is(context.Cause(calculationCtx), ErrPartitionRevoked) { + level.Debug(p.logger).Log("msg", "partition revoked during flush", "partition", partition) + return + } + level.Error(p.logger).Log("msg", "failed to flush partition", "partition", partition, "err", err) + return } - if err := p.calculator.Calculate(ctx, objLogger, reader, obj.event.ObjectPath); err != nil { - processingErrors.Add(fmt.Errorf("failed to calculate index: %w", err)) - continue + // Commit the records + if err := p.commitRecords(calculationCtx, records); err != nil { + if errors.Is(context.Cause(calculationCtx), ErrPartitionRevoked) { + level.Debug(p.logger).Log("msg", "partition revoked during flush commit", "partition", partition) + return + } + level.Error(p.logger).Log("msg", "failed to commit flush records", "partition", partition, "err", err) } - } + }() +} - if processingErrors.Err() != nil { - return processingErrors.Err() - } +// bufferAndTryProcess is the unified method that handles both buffering and processing decisions +func (p *Builder) bufferAndTryProcess(ctx context.Context, partition int32, newEvent *bufferedEvent, trigger triggerType) (context.Context, []bufferedEvent) { + p.partitionsMutex.Lock() + defer p.partitionsMutex.Unlock() - tenantTimeRanges := p.calculator.TimeRanges() - obj, closer, err := p.calculator.Flush() - if err != nil { - return fmt.Errorf("failed to flush builder: %w", err) + state, exists := p.partitionStates[partition] + if !exists { + return nil, nil } - defer closer.Close() - key, err := ObjectKey(ctx, obj) - if err != nil { - return fmt.Errorf("failed to generate object key: %w", err) + // Add new event to buffer if provided (normal processing case) + if newEvent != nil { + state.events = append(state.events, *newEvent) + state.lastActivity = time.Now() + level.Debug(p.logger).Log("msg", "buffered new event for partition", "count", len(state.events), "partition", partition) } - reader, err := obj.Reader(ctx) - if err != nil { - return fmt.Errorf("failed to read object: %w", err) + // Check if we can start processing + if state.isProcessing || len(state.events) == 0 { + return nil, nil } - defer reader.Close() - if err := p.indexStorageBucket.Upload(ctx, key, reader); err != nil { - return fmt.Errorf("failed to upload index: %w", err) + // Check trigger-specific requirements + switch trigger { + case triggerTypeAppend: + if len(state.events) < p.cfg.EventsPerIndex { + return nil, nil + } + case triggerTypeMaxIdle: + if time.Since(state.lastActivity) < p.cfg.MaxIdleTime { + return nil, nil + } + default: + level.Error(p.logger).Log("msg", "unknown trigger type") + return nil, nil } - metastoreTocWriter := metastore.NewTableOfContentsWriter(p.indexStorageBucket, p.logger) - if err := metastoreTocWriter.WriteEntry(p.ctx, key, tenantTimeRanges); err != nil { - return fmt.Errorf("failed to update metastore ToC file: %w", err) - } + // Atomically mark as processing and extract events + state.isProcessing = true + eventsToProcess := make([]bufferedEvent, len(state.events)) + copy(eventsToProcess, state.events) - level.Debug(p.logger).Log("msg", "finished building new index file", "partition", p.activeCalculationPartition, "events", len(events), "size", obj.Size(), "duration", time.Since(start), "tenants", len(tenantTimeRanges), "path", key) - return nil -} + // Set up cancellation context with proper coordination + calculationCtx, cancel := context.WithCancelCause(ctx) + p.activeCalculations[partition] = cancel -// ObjectKey determines the key in object storage to upload the object to, based on our path scheme. -func ObjectKey(ctx context.Context, object *dataobj.Object) (string, error) { - h := sha256.New224() + level.Debug(p.logger).Log("msg", "started processing partition", + "partition", partition, "events", len(eventsToProcess), "trigger", trigger) - reader, err := object.Reader(ctx) - if err != nil { - return "", err - } - defer reader.Close() + return calculationCtx, eventsToProcess +} - if _, err := io.Copy(h, reader); err != nil { - return "", err +func (p *Builder) commitRecords(ctx context.Context, records []*kgo.Record) error { + if len(records) == 0 { + return nil } - var sumBytes [sha256.Size224]byte - sum := h.Sum(sumBytes[:0]) - sumStr := hex.EncodeToString(sum[:]) - - return fmt.Sprintf("indexes/%s/%s", sumStr[:2], sumStr[2:]), nil -} - -func (p *Builder) commitRecords(ctx context.Context, record *kgo.Record) error { backoff := backoff.New(ctx, backoff.Config{ MinBackoff: 100 * time.Millisecond, MaxBackoff: 10 * time.Second, @@ -469,11 +486,11 @@ func (p *Builder) commitRecords(ctx context.Context, record *kgo.Record) error { backoff.Reset() for backoff.Ongoing() { p.metrics.incCommitsTotal() - err := p.client.CommitRecords(ctx, record) + err := p.client.CommitRecords(ctx, records...) if err == nil { return nil } - level.Error(p.logger).Log("msg", "failed to commit records", "err", err) + level.Error(p.logger).Log("msg", "failed to commit records", "err", err, "count", len(records)) p.metrics.incCommitFailures() lastErr = err backoff.Wait() diff --git a/pkg/dataobj/index/builder_test.go b/pkg/dataobj/index/builder_test.go index 8027382ffb911..eaf2296750b04 100644 --- a/pkg/dataobj/index/builder_test.go +++ b/pkg/dataobj/index/builder_test.go @@ -19,7 +19,6 @@ import ( "github.com/grafana/loki/v3/pkg/dataobj/consumer/logsobj" "github.com/grafana/loki/v3/pkg/dataobj/index/indexobj" "github.com/grafana/loki/v3/pkg/dataobj/metastore" - "github.com/grafana/loki/v3/pkg/dataobj/metastore/multitenancy" "github.com/grafana/loki/v3/pkg/dataobj/sections/pointers" "github.com/grafana/loki/v3/pkg/kafka" "github.com/grafana/loki/v3/pkg/kafka/testkafka" @@ -36,38 +35,6 @@ var testBuilderConfig = indexobj.BuilderConfig{ SectionStripeMergeLimit: 2, } -func buildLogObject(t *testing.T, app string, path string, bucket objstore.Bucket) { - candidate, err := logsobj.NewBuilder(logsobj.BuilderConfig{ - TargetPageSize: 128 * 1024, - TargetObjectSize: 4 * 1024 * 1024, - TargetSectionSize: 2 * 1024 * 1024, - - BufferSize: 4 * 1024 * 1024, - SectionStripeMergeLimit: 2, - }, nil) - require.NoError(t, err) - - for i := 0; i < 10; i++ { - stream := logproto.Stream{ - Labels: fmt.Sprintf("{app=\"%s\",stream=\"%d\"}", app, i), - Entries: []logproto.Entry{{Timestamp: time.Now(), Line: fmt.Sprintf("line %d", i)}}, - } - err = candidate.Append("tenant", stream) - require.NoError(t, err) - } - - obj, closer, err := candidate.Flush() - require.NoError(t, err) - defer closer.Close() - - reader, err := obj.Reader(t.Context()) - require.NoError(t, err) - defer reader.Close() - - err = bucket.Upload(t.Context(), path, reader) - require.NoError(t, err) -} - func TestIndexBuilder_PartitionRevocation(t *testing.T) { t.Parallel() ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) @@ -98,8 +65,6 @@ func TestIndexBuilder_PartitionRevocation(t *testing.T) { prometheus.NewRegistry(), ) require.NoError(t, err) - builder.calculator = &mockCalculator{} - builder.ctx = ctx builder.client.Close() builder.client = &mockKafkaClient{} @@ -125,23 +90,20 @@ func TestIndexBuilder_PartitionRevocation(t *testing.T) { if i == 2 { trigger <- struct{}{} } - builder.processRecord(&kgo.Record{ + builder.processRecord(ctx, &kgo.Record{ Value: eventBytes, Partition: int32(1), }) if i < 2 { - // After revocation is triggered, we can't guarantee that the partition will be in the buffered events map. - require.NotNil(t, builder.bufferedEvents[1]) - require.Len(t, builder.bufferedEvents[1], 0) + // After revocation is triggered, we can't guarantee that the partition will be in the partition states map. + require.NotNil(t, builder.partitionStates[1]) + require.Len(t, builder.partitionStates[1].events, 0) } } - // Verify that the first records were processed successfully. - require.GreaterOrEqual(t, builder.calculator.(*mockCalculator).count, 2) - require.NotNil(t, builder.calculator.(*mockCalculator).object) // Verify that the partition was revoked. - require.Equal(t, 2, len(builder.bufferedEvents)) - require.Nil(t, builder.bufferedEvents[1]) + require.Equal(t, 2, len(builder.partitionStates)) + require.Nil(t, builder.partitionStates[1]) } func TestIndexBuilder(t *testing.T) { @@ -193,7 +155,7 @@ func TestIndexBuilder(t *testing.T) { eventBytes, err := event.Marshal() require.NoError(t, err) - p.processRecord(&kgo.Record{ + p.processRecord(context.Background(), &kgo.Record{ Value: eventBytes, Partition: int32(0), }) @@ -251,34 +213,6 @@ func readAllSectionPointers(t *testing.T, bucket objstore.Bucket) []pointers.Sec return out } -// mockCalculator is a calculator that does nothing for use in tests -type mockCalculator struct { - count int - object *dataobj.Object -} - -func (c *mockCalculator) Calculate(_ context.Context, _ log.Logger, object *dataobj.Object, _ string) error { - c.count++ - c.object = object - return nil -} - -func (c *mockCalculator) Flush() (*dataobj.Object, io.Closer, error) { - return c.object, io.NopCloser(bytes.NewReader([]byte{})), nil -} - -func (c *mockCalculator) TimeRanges() []multitenancy.TimeRange { - return []multitenancy.TimeRange{ - { - Tenant: "test", - MinTime: time.Now(), - MaxTime: time.Now().Add(time.Hour), - }, - } -} - -func (c *mockCalculator) Reset() {} - // A mockKafkaClient implements the kafkaClient interface for tests. type mockKafkaClient struct{} @@ -291,3 +225,35 @@ func (m *mockKafkaClient) PollRecords(_ context.Context, _ int) kgo.Fetches { } func (m *mockKafkaClient) Close() {} + +func buildLogObject(t *testing.T, app string, path string, bucket objstore.Bucket) { + candidate, err := logsobj.NewBuilder(logsobj.BuilderConfig{ + TargetPageSize: 128 * 1024, + TargetObjectSize: 4 * 1024 * 1024, + TargetSectionSize: 2 * 1024 * 1024, + + BufferSize: 4 * 1024 * 1024, + SectionStripeMergeLimit: 2, + }, nil) + require.NoError(t, err) + + for i := 0; i < 10; i++ { + stream := logproto.Stream{ + Labels: fmt.Sprintf("{app=\"%s\",stream=\"%d\"}", app, i), + Entries: []logproto.Entry{{Timestamp: time.Now(), Line: fmt.Sprintf("line %d", i)}}, + } + err = candidate.Append("tenant", stream) + require.NoError(t, err) + } + + obj, closer, err := candidate.Flush() + require.NoError(t, err) + defer closer.Close() + + reader, err := obj.Reader(t.Context()) + require.NoError(t, err) + defer reader.Close() + + err = bucket.Upload(t.Context(), path, reader) + require.NoError(t, err) +} diff --git a/pkg/dataobj/index/config.go b/pkg/dataobj/index/config.go new file mode 100644 index 0000000000000..abf7a00f189ae --- /dev/null +++ b/pkg/dataobj/index/config.go @@ -0,0 +1,26 @@ +package index + +import ( + "flag" + "time" + + "github.com/grafana/loki/v3/pkg/dataobj/index/indexobj" +) + +type Config struct { + indexobj.BuilderConfig `yaml:",inline"` + EventsPerIndex int `yaml:"events_per_index" experimental:"true"` + FlushInterval time.Duration `yaml:"flush_interval" experimental:"true"` + MaxIdleTime time.Duration `yaml:"max_idle_time" experimental:"true"` +} + +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + cfg.RegisterFlagsWithPrefix("dataobj-index-builder.", f) +} + +func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + cfg.BuilderConfig.RegisterFlagsWithPrefix(prefix, f) + f.IntVar(&cfg.EventsPerIndex, prefix+"events-per-index", 32, "Experimental: The number of events to batch before building an index") + f.DurationVar(&cfg.FlushInterval, prefix+"flush-interval", 1*time.Minute, "Experimental: How often to check for stale partitions to flush") + f.DurationVar(&cfg.MaxIdleTime, prefix+"max-idle-time", 30*time.Minute, "Experimental: Maximum time to wait before flushing buffered events") +} diff --git a/pkg/dataobj/index/indexer.go b/pkg/dataobj/index/indexer.go new file mode 100644 index 0000000000000..768df4e38e777 --- /dev/null +++ b/pkg/dataobj/index/indexer.go @@ -0,0 +1,452 @@ +package index + +import ( + "bytes" + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "io" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/multierror" + "github.com/grafana/dskit/services" + "github.com/thanos-io/objstore" + "github.com/twmb/franz-go/pkg/kgo" + + "github.com/grafana/loki/v3/pkg/dataobj" + "github.com/grafana/loki/v3/pkg/dataobj/metastore" +) + +// buildRequest represents a request to build an index +type buildRequest struct { + events []bufferedEvent + partition int32 + trigger triggerType + ctx context.Context + resultChan chan buildResult +} + +// buildResult represents the result of an index build operation +type buildResult struct { + indexPath string + records []*kgo.Record // Records to commit after successful build + err error +} + +// indexerConfig contains configuration for the indexer +type indexerConfig struct { + QueueSize int // Size of the build request queue +} + +// indexer handles serialized index building operations +type indexer interface { + services.Service + + // submitBuild submits a build request and waits for completion + submitBuild(ctx context.Context, events []bufferedEvent, partition int32, trigger triggerType) ([]*kgo.Record, error) +} + +// serialIndexer implements Indexer with a single worker goroutine using dskit Service +type serialIndexer struct { + services.Service + + // Dependencies - no builder dependency! + calculator calculator + objectBucket objstore.Bucket + indexStorageBucket objstore.Bucket + builderMetrics *builderMetrics + indexerMetrics *indexerMetrics + logger log.Logger + + // Download pipeline + downloadQueue chan metastore.ObjectWrittenEvent + downloadedObjects chan downloadedObject + + // Worker management + buildRequestChan chan buildRequest + buildWorkerWg sync.WaitGroup + downloadWorkerWg sync.WaitGroup +} + +// newSerialIndexer creates a new self-contained SerialIndexer +func newSerialIndexer( + calculator calculator, + objectBucket objstore.Bucket, + indexStorageBucket objstore.Bucket, + builderMetrics *builderMetrics, + indexerMetrics *indexerMetrics, + logger log.Logger, + cfg indexerConfig, +) *serialIndexer { + if cfg.QueueSize == 0 { + cfg.QueueSize = 64 + } + + si := &serialIndexer{ + calculator: calculator, + objectBucket: objectBucket, + indexStorageBucket: indexStorageBucket, + builderMetrics: builderMetrics, + indexerMetrics: indexerMetrics, + logger: logger, + buildRequestChan: make(chan buildRequest, cfg.QueueSize), + downloadQueue: make(chan metastore.ObjectWrittenEvent, 32), + downloadedObjects: make(chan downloadedObject, 1), + } + + // Initialize dskit Service + si.Service = services.NewBasicService(si.starting, si.running, si.stopping) + + return si +} + +// starting is called when the service is starting +func (si *serialIndexer) starting(_ context.Context) error { + level.Info(si.logger).Log("msg", "starting serial indexer") + return nil +} + +// running is the main service loop +func (si *serialIndexer) running(ctx context.Context) error { + level.Info(si.logger).Log("msg", "serial indexer running") + + // Start download worker + si.downloadWorkerWg.Add(1) + go si.downloadWorker(ctx) + + // Start build worker + si.buildWorkerWg.Add(1) + go si.buildWorker(ctx) + + // Wait for context cancellation + <-ctx.Done() + return nil +} + +// stopping is called when the service is stopping +func (si *serialIndexer) stopping(_ error) error { + level.Info(si.logger).Log("msg", "stopping serial indexer") + + // Close channels to signal workers to stop + close(si.downloadQueue) + close(si.buildRequestChan) + + // Wait for workers to finish + si.downloadWorkerWg.Wait() + si.buildWorkerWg.Wait() + + // Close the downloaded objects channel after workers are done + close(si.downloadedObjects) + + level.Info(si.logger).Log("msg", "stopped serial indexer") + return nil +} + +// submitBuild submits a build request and waits for completion +func (si *serialIndexer) submitBuild(ctx context.Context, events []bufferedEvent, partition int32, trigger triggerType) ([]*kgo.Record, error) { + // Check if service is running + if si.State() != services.Running { + return nil, fmt.Errorf("indexer service is not running (state: %s)", si.State()) + } + + resultChan := make(chan buildResult, 1) + + req := buildRequest{ + events: events, + partition: partition, + trigger: trigger, + ctx: ctx, + resultChan: resultChan, + } + + // Submit request + select { + case si.buildRequestChan <- req: + si.indexerMetrics.incRequests() + level.Debug(si.logger).Log("msg", "submitted build request", + "partition", partition, "events", len(events), "trigger", trigger) + case <-ctx.Done(): + return nil, ctx.Err() + } + + // Wait for result + select { + case result := <-resultChan: + if result.err != nil { + level.Error(si.logger).Log("msg", "build request failed", + "partition", partition, "err", result.err) + } else { + level.Debug(si.logger).Log("msg", "build request completed", + "partition", partition, "index_path", result.indexPath) + } + return result.records, result.err + case <-ctx.Done(): + return nil, ctx.Err() + } +} + +// downloadWorker handles object downloads +func (si *serialIndexer) downloadWorker(ctx context.Context) { + defer si.downloadWorkerWg.Done() + + level.Debug(si.logger).Log("msg", "download worker started") + defer level.Debug(si.logger).Log("msg", "download worker stopped") + + for { + select { + case event, ok := <-si.downloadQueue: + if !ok { + // Channel closed, worker should exit + return + } + + objLogger := log.With(si.logger, "object_path", event.ObjectPath) + downloadStart := time.Now() + + objectReader, err := si.objectBucket.Get(ctx, event.ObjectPath) + if err != nil { + select { + case si.downloadedObjects <- downloadedObject{ + event: event, + err: fmt.Errorf("failed to fetch object from storage: %w", err), + }: + case <-ctx.Done(): + return + } + continue + } + + object, err := io.ReadAll(objectReader) + _ = objectReader.Close() + if err != nil { + select { + case si.downloadedObjects <- downloadedObject{ + event: event, + err: fmt.Errorf("failed to read object: %w", err), + }: + case <-ctx.Done(): + return + } + continue + } + + level.Info(objLogger).Log("msg", "downloaded object", "duration", time.Since(downloadStart), + "size_mb", float64(len(object))/1024/1024, + "avg_speed_mbps", float64(len(object))/time.Since(downloadStart).Seconds()/1024/1024) + + select { + case si.downloadedObjects <- downloadedObject{ + event: event, + objectBytes: &object, + }: + case <-ctx.Done(): + return + } + + case <-ctx.Done(): + return + } + } +} + +// buildWorker is the main worker goroutine that processes build requests +func (si *serialIndexer) buildWorker(ctx context.Context) { + defer si.buildWorkerWg.Done() + + level.Info(si.logger).Log("msg", "build worker started") + defer level.Info(si.logger).Log("msg", "build worker stopped") + + for { + select { + case req, ok := <-si.buildRequestChan: + if !ok { + // Channel closed, worker should exit + return + } + + result := si.processBuildRequest(req) + + select { + case req.resultChan <- result: + // Result delivered successfully + case <-req.ctx.Done(): + // Request was cancelled, but we already did the work + level.Debug(si.logger).Log("msg", "build request was cancelled after completion", + "partition", req.partition) + case <-ctx.Done(): + // Service is shutting down + return + } + + case <-ctx.Done(): + return + } + } +} + +// processBuildRequest processes a single build request - contains the full buildIndex logic +func (si *serialIndexer) processBuildRequest(req buildRequest) buildResult { + start := time.Now() + + level.Debug(si.logger).Log("msg", "processing build request", + "partition", req.partition, "events", len(req.events), "trigger", req.trigger) + + // Extract events for building + events := make([]metastore.ObjectWrittenEvent, len(req.events)) + for i, buffered := range req.events { + events[i] = buffered.event + } + + // Build the index using internal method + indexPath, err := si.buildIndex(req.ctx, events, req.partition) + + // Update metrics + buildTime := time.Since(start) + si.updateMetrics(buildTime) + + if err != nil { + level.Error(si.logger).Log("msg", "failed to build index", + "partition", req.partition, "err", err, "duration", buildTime) + return buildResult{err: err} + } + + level.Debug(si.logger).Log("msg", "successfully built index", + "partition", req.partition, "index_path", indexPath, "duration", buildTime, + "events", len(events)) + + // Extract records for committing + records := make([]*kgo.Record, len(req.events)) + for i, buffered := range req.events { + records[i] = buffered.record + } + + return buildResult{ + indexPath: indexPath, + records: records, + err: nil, + } +} + +// buildIndex is the core index building logic (moved from builder) +func (si *serialIndexer) buildIndex(ctx context.Context, events []metastore.ObjectWrittenEvent, partition int32) (string, error) { + level.Debug(si.logger).Log("msg", "building index", "events", len(events), "partition", partition) + start := time.Now() + + // Observe processing delay + writeTime, err := time.Parse(time.RFC3339, events[0].WriteTime) + if err != nil { + level.Error(si.logger).Log("msg", "failed to parse write time", "err", err) + return "", err + } + si.builderMetrics.setProcessingDelay(writeTime) + + // Trigger the downloads + for _, event := range events { + select { + case si.downloadQueue <- event: + // Successfully sent event for download + case <-ctx.Done(): + return "", ctx.Err() + } + } + + // Process the results as they are downloaded + processingErrors := multierror.New() + for range len(events) { + var obj downloadedObject + select { + case obj = <-si.downloadedObjects: + case <-ctx.Done(): + return "", ctx.Err() + } + + objLogger := log.With(si.logger, "object_path", obj.event.ObjectPath) + level.Debug(objLogger).Log("msg", "processing object") + + if obj.err != nil { + processingErrors.Add(fmt.Errorf("failed to download object: %w", obj.err)) + continue + } + + reader, err := dataobj.FromReaderAt(bytes.NewReader(*obj.objectBytes), int64(len(*obj.objectBytes))) + if err != nil { + processingErrors.Add(fmt.Errorf("failed to read object: %w", err)) + continue + } + + if err := si.calculator.Calculate(ctx, objLogger, reader, obj.event.ObjectPath); err != nil { + processingErrors.Add(fmt.Errorf("failed to calculate index: %w", err)) + continue + } + } + + if processingErrors.Err() != nil { + return "", processingErrors.Err() + } + + tenantTimeRanges := si.calculator.TimeRanges() + obj, closer, err := si.calculator.Flush() + if err != nil { + return "", fmt.Errorf("failed to flush builder: %w", err) + } + defer closer.Close() + + key, err := ObjectKey(ctx, obj) + if err != nil { + return "", fmt.Errorf("failed to generate object key: %w", err) + } + + reader, err := obj.Reader(ctx) + if err != nil { + return "", fmt.Errorf("failed to read object: %w", err) + } + defer reader.Close() + + if err := si.indexStorageBucket.Upload(ctx, key, reader); err != nil { + return "", fmt.Errorf("failed to upload index: %w", err) + } + + metastoreTocWriter := metastore.NewTableOfContentsWriter(si.indexStorageBucket, si.logger) + if err := metastoreTocWriter.WriteEntry(ctx, key, tenantTimeRanges); err != nil { + return "", fmt.Errorf("failed to update metastore ToC file: %w", err) + } + + level.Debug(si.logger).Log("msg", "finished building new index file", "partition", partition, + "events", len(events), "size", obj.Size(), "duration", time.Since(start), + "tenants", len(tenantTimeRanges), "path", key) + + return key, nil +} + +// updateMetrics updates internal build metrics +func (si *serialIndexer) updateMetrics(buildTime time.Duration) { + si.indexerMetrics.incBuilds() + si.indexerMetrics.setBuildTime(buildTime) + si.indexerMetrics.setQueueDepth(len(si.buildRequestChan)) +} + +// ObjectKey generates the object key for storing an index object in object storage. +// This is a public wrapper around the generateObjectKey functionality. +func ObjectKey(ctx context.Context, object *dataobj.Object) (string, error) { + h := sha256.New224() + + reader, err := object.Reader(ctx) + if err != nil { + return "", err + } + defer reader.Close() + + if _, err := io.Copy(h, reader); err != nil { + return "", err + } + + var sumBytes [sha256.Size224]byte + sum := h.Sum(sumBytes[:0]) + sumStr := hex.EncodeToString(sum[:]) + + return fmt.Sprintf("indexes/%s/%s", sumStr[:2], sumStr[2:]), nil +} diff --git a/pkg/dataobj/index/indexer_test.go b/pkg/dataobj/index/indexer_test.go new file mode 100644 index 0000000000000..a4281ec16007b --- /dev/null +++ b/pkg/dataobj/index/indexer_test.go @@ -0,0 +1,319 @@ +package index + +import ( + "bytes" + "context" + "fmt" + "io" + "os" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore" + "github.com/twmb/franz-go/pkg/kgo" + + "github.com/grafana/loki/v3/pkg/dataobj" + "github.com/grafana/loki/v3/pkg/dataobj/metastore" + "github.com/grafana/loki/v3/pkg/dataobj/metastore/multitenancy" +) + +func TestSerialIndexer_BuildIndex(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Set up test data + bucket := objstore.NewInMemBucket() + buildLogObject(t, "loki", "test-path-0", bucket) + + event := metastore.ObjectWrittenEvent{ + ObjectPath: "test-path-0", + WriteTime: time.Now().Format(time.RFC3339), + } + + record := &kgo.Record{ + Value: nil, // Will be set below + Partition: int32(0), + } + eventBytes, err := event.Marshal() + require.NoError(t, err) + record.Value = eventBytes + + bufferedEvt := bufferedEvent{ + event: event, + record: record, + } + + // Create indexer with mock calculator + mockCalc := &mockCalculator{} + indexStorageBucket := objstore.NewInMemBucket() + + // Create dedicated registry for this test + reg := prometheus.NewRegistry() + + builderMetrics := newBuilderMetrics() + require.NoError(t, builderMetrics.register(reg)) + + indexerMetrics := newIndexerMetrics() + require.NoError(t, indexerMetrics.register(reg)) + + indexer := newSerialIndexer( + mockCalc, + bucket, + indexStorageBucket, + builderMetrics, + indexerMetrics, + log.NewLogfmtLogger(os.Stderr), + indexerConfig{QueueSize: 10}, + ) + + // Start indexer service + require.NoError(t, indexer.StartAsync(ctx)) + require.NoError(t, indexer.AwaitRunning(ctx)) + defer func() { + indexer.StopAsync() + require.NoError(t, indexer.AwaitTerminated(context.Background())) + }() + + // Submit build request + records, err := indexer.submitBuild(ctx, []bufferedEvent{bufferedEvt}, 0, triggerTypeAppend) + require.NoError(t, err) + require.Len(t, records, 1) + require.Equal(t, record, records[0]) + + // Verify calculator was used + require.Equal(t, 1, mockCalc.count) + require.NotNil(t, mockCalc.object) + + // Verify Prometheus metrics + require.Equal(t, float64(1), testutil.ToFloat64(indexerMetrics.totalRequests)) + require.Equal(t, float64(1), testutil.ToFloat64(indexerMetrics.totalBuilds)) +} + +func TestSerialIndexer_MultipleBuilds(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + // Set up test data + bucket := objstore.NewInMemBucket() + buildLogObject(t, "loki", "test-path-0", bucket) + buildLogObject(t, "testing", "test-path-1", bucket) + + events := []bufferedEvent{} + for i := range 2 { + event := metastore.ObjectWrittenEvent{ + ObjectPath: fmt.Sprintf("test-path-%d", i), + WriteTime: time.Now().Format(time.RFC3339), + } + + record := &kgo.Record{ + Partition: int32(0), + } + eventBytes, err := event.Marshal() + require.NoError(t, err) + record.Value = eventBytes + + events = append(events, bufferedEvent{ + event: event, + record: record, + }) + } + + // Create indexer with mock calculator + mockCalc := &mockCalculator{} + indexStorageBucket := objstore.NewInMemBucket() + + // Create dedicated registry for this test + reg := prometheus.NewRegistry() + + builderMetrics := newBuilderMetrics() + require.NoError(t, builderMetrics.register(reg)) + + indexerMetrics := newIndexerMetrics() + require.NoError(t, indexerMetrics.register(reg)) + + indexer := newSerialIndexer( + mockCalc, + bucket, + indexStorageBucket, + builderMetrics, + indexerMetrics, + log.NewLogfmtLogger(os.Stderr), + indexerConfig{QueueSize: 10}, + ) + + // Start indexer service + require.NoError(t, indexer.StartAsync(ctx)) + require.NoError(t, indexer.AwaitRunning(ctx)) + defer func() { + indexer.StopAsync() + require.NoError(t, indexer.AwaitTerminated(context.Background())) + }() + + // Submit build request with multiple events + records, err := indexer.submitBuild(ctx, events, 0, triggerTypeAppend) + require.NoError(t, err) + require.Len(t, records, 2) + + // Verify calculator processed all events + require.Equal(t, 2, mockCalc.count) + require.NotNil(t, mockCalc.object) + + // Verify Prometheus metrics - multiple events in single request/build + require.Equal(t, float64(1), testutil.ToFloat64(indexerMetrics.totalRequests)) + require.Equal(t, float64(1), testutil.ToFloat64(indexerMetrics.totalBuilds)) +} + +func TestSerialIndexer_ServiceNotRunning(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Create indexer without starting it + mockCalc := &mockCalculator{} + bucket := objstore.NewInMemBucket() + indexStorageBucket := objstore.NewInMemBucket() + builderMetrics := newBuilderMetrics() + require.NoError(t, builderMetrics.register(prometheus.NewRegistry())) + + indexerMetrics := newIndexerMetrics() + require.NoError(t, indexerMetrics.register(prometheus.NewRegistry())) + + indexer := newSerialIndexer( + mockCalc, + bucket, + indexStorageBucket, + builderMetrics, + indexerMetrics, + log.NewNopLogger(), + indexerConfig{QueueSize: 10}, + ) + + // Try to submit build without starting service + event := metastore.ObjectWrittenEvent{ + ObjectPath: "test-path-0", + WriteTime: time.Now().Format(time.RFC3339), + } + record := &kgo.Record{Partition: int32(0)} + bufferedEvt := bufferedEvent{event: event, record: record} + + _, err := indexer.submitBuild(ctx, []bufferedEvent{bufferedEvt}, 0, triggerTypeAppend) + require.Error(t, err) + require.Contains(t, err.Error(), "indexer service is not running") +} + +func TestSerialIndexer_ConcurrentBuilds(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + // Set up test data + bucket := objstore.NewInMemBucket() + for i := 0; i < 5; i++ { + buildLogObject(t, fmt.Sprintf("app-%d", i), fmt.Sprintf("test-path-%d", i), bucket) + } + + // Create indexer with mock calculator + mockCalc := &mockCalculator{} + indexStorageBucket := objstore.NewInMemBucket() + + // Create dedicated registry for this test + reg := prometheus.NewRegistry() + + builderMetrics := newBuilderMetrics() + require.NoError(t, builderMetrics.register(reg)) + + indexerMetrics := newIndexerMetrics() + require.NoError(t, indexerMetrics.register(reg)) + + indexer := newSerialIndexer( + mockCalc, + bucket, + indexStorageBucket, + builderMetrics, + indexerMetrics, + log.NewLogfmtLogger(os.Stderr), + indexerConfig{QueueSize: 10}, + ) + + // Start indexer service + require.NoError(t, indexer.StartAsync(ctx)) + require.NoError(t, indexer.AwaitRunning(ctx)) + defer func() { + indexer.StopAsync() + require.NoError(t, indexer.AwaitTerminated(context.Background())) + }() + + // Submit multiple concurrent build requests + numRequests := 5 + results := make(chan error, numRequests) + + for i := 0; i < numRequests; i++ { + go func(idx int) { + event := metastore.ObjectWrittenEvent{ + ObjectPath: fmt.Sprintf("test-path-%d", idx), + WriteTime: time.Now().Format(time.RFC3339), + } + + record := &kgo.Record{Partition: int32(idx)} + eventBytes, err := event.Marshal() + if err != nil { + results <- err + return + } + record.Value = eventBytes + + bufferedEvt := bufferedEvent{event: event, record: record} + + _, err = indexer.submitBuild(ctx, []bufferedEvent{bufferedEvt}, int32(idx), triggerTypeAppend) + results <- err + }(i) + } + + // Wait for all requests to complete + for i := 0; i < numRequests; i++ { + require.NoError(t, <-results) + } + + // Verify all events were processed (serialized) + require.Equal(t, numRequests, mockCalc.count) + + // Verify Prometheus metrics - multiple concurrent requests + require.Equal(t, float64(numRequests), testutil.ToFloat64(indexerMetrics.totalRequests)) + require.Equal(t, float64(numRequests), testutil.ToFloat64(indexerMetrics.totalBuilds)) + require.Greater(t, testutil.ToFloat64(indexerMetrics.buildTimeSeconds), float64(0)) + require.Equal(t, float64(0), testutil.ToFloat64(indexerMetrics.queueDepth)) +} + +// mockCalculator is a calculator that does nothing for use in tests +type mockCalculator struct { + count int + object *dataobj.Object +} + +func (c *mockCalculator) Calculate(_ context.Context, _ log.Logger, object *dataobj.Object, _ string) error { + c.count++ + c.object = object + return nil +} + +func (c *mockCalculator) Flush() (*dataobj.Object, io.Closer, error) { + return c.object, io.NopCloser(bytes.NewReader([]byte{})), nil +} + +func (c *mockCalculator) TimeRanges() []multitenancy.TimeRange { + return []multitenancy.TimeRange{ + { + Tenant: "test", + MinTime: time.Now(), + MaxTime: time.Now().Add(time.Hour), + }, + } +} + +func (c *mockCalculator) Reset() {} diff --git a/pkg/dataobj/index/metrics.go b/pkg/dataobj/index/metrics.go index 21a9d4c33fe31..61385dc7f81ec 100644 --- a/pkg/dataobj/index/metrics.go +++ b/pkg/dataobj/index/metrics.go @@ -6,7 +6,7 @@ import ( "github.com/prometheus/client_golang/prometheus" ) -type indexBuilderMetrics struct { +type builderMetrics struct { // Error counters commitFailures prometheus.Counter @@ -17,8 +17,8 @@ type indexBuilderMetrics struct { processingDelay prometheus.Gauge // Latest delta between record timestamp and current time } -func newIndexBuilderMetrics() *indexBuilderMetrics { - p := &indexBuilderMetrics{ +func newBuilderMetrics() *builderMetrics { + p := &builderMetrics{ commitFailures: prometheus.NewCounter(prometheus.CounterOpts{ Name: "loki_index_builder_commit_failures_total", Help: "Total number of commit failures", @@ -36,7 +36,7 @@ func newIndexBuilderMetrics() *indexBuilderMetrics { return p } -func (p *indexBuilderMetrics) register(reg prometheus.Registerer) error { +func (p *builderMetrics) register(reg prometheus.Registerer) error { collectors := []prometheus.Collector{ p.commitFailures, p.commitsTotal, @@ -53,7 +53,7 @@ func (p *indexBuilderMetrics) register(reg prometheus.Registerer) error { return nil } -func (p *indexBuilderMetrics) unregister(reg prometheus.Registerer) { +func (p *builderMetrics) unregister(reg prometheus.Registerer) { collectors := []prometheus.Collector{ p.commitFailures, p.commitsTotal, @@ -65,17 +65,99 @@ func (p *indexBuilderMetrics) unregister(reg prometheus.Registerer) { } } -func (p *indexBuilderMetrics) incCommitFailures() { +func (p *builderMetrics) incCommitFailures() { p.commitFailures.Inc() } -func (p *indexBuilderMetrics) incCommitsTotal() { +func (p *builderMetrics) incCommitsTotal() { p.commitsTotal.Inc() } -func (p *indexBuilderMetrics) setProcessingDelay(recordTimestamp time.Time) { +func (p *builderMetrics) setProcessingDelay(recordTimestamp time.Time) { // Convert milliseconds to seconds and calculate delay if !recordTimestamp.IsZero() { // Only observe if timestamp is valid p.processingDelay.Set(time.Since(recordTimestamp).Seconds()) } } + +type indexerMetrics struct { + // Request counters + totalRequests prometheus.Counter + totalBuilds prometheus.Counter + + // Build time metrics + buildTimeSeconds prometheus.Gauge + + // Queue metrics + queueDepth prometheus.Gauge +} + +func newIndexerMetrics() *indexerMetrics { + m := &indexerMetrics{ + totalRequests: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "loki_index_builder_requests_total", + Help: "Total number of build requests submitted to the indexer", + }), + totalBuilds: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "loki_index_builder_builds_total", + Help: "Total number of index builds completed", + }), + buildTimeSeconds: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "loki_index_builder_build_time_seconds", + Help: "Time spent on the last index build in seconds", + }), + queueDepth: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "loki_index_builder_queue_depth", + Help: "Current depth of the build request queue", + }), + } + + return m +} + +func (m *indexerMetrics) register(reg prometheus.Registerer) error { + collectors := []prometheus.Collector{ + m.totalRequests, + m.totalBuilds, + m.buildTimeSeconds, + m.queueDepth, + } + + for _, collector := range collectors { + if err := reg.Register(collector); err != nil { + if _, ok := err.(prometheus.AlreadyRegisteredError); !ok { + return err + } + } + } + return nil +} + +func (m *indexerMetrics) unregister(reg prometheus.Registerer) { + collectors := []prometheus.Collector{ + m.totalRequests, + m.totalBuilds, + m.buildTimeSeconds, + m.queueDepth, + } + + for _, collector := range collectors { + reg.Unregister(collector) + } +} + +func (m *indexerMetrics) incRequests() { + m.totalRequests.Inc() +} + +func (m *indexerMetrics) incBuilds() { + m.totalBuilds.Inc() +} + +func (m *indexerMetrics) setBuildTime(duration time.Duration) { + m.buildTimeSeconds.Set(duration.Seconds()) +} + +func (m *indexerMetrics) setQueueDepth(depth int) { + m.queueDepth.Set(float64(depth)) +}