Skip to content

Metadata caching in compaction planning and execution#15026

Merged
andyasp merged 14 commits intomainfrom
aasp/metadata-caching-during-compaction
Apr 21, 2026
Merged

Metadata caching in compaction planning and execution#15026
andyasp merged 14 commits intomainfrom
aasp/metadata-caching-during-compaction

Conversation

@andyasp
Copy link
Copy Markdown
Contributor

@andyasp andyasp commented Apr 14, 2026

What this PR does

This introduces batchCachingMetaFetcher, which aims to be compatible with the compactor's previous usage of MetaFetcher/metaSyncer to get block.Meta, except it issues a GetMulti to a cache.Cache instead of using an in-memory or on-disk caching scheme. This change only applies when the compactor scheduler is enabled. Using a remote cache is advantageous for the compactor scheduler workflow because any compactor can work on any job.

Design notes:

  • The caching content key exposure likely deserves discussion. I do not plan on merging this PR until I reach out about that.
  • I extracted garbageCollectBlocks from metaSyncer so it could be called without needing a metaSyncer
  • There is no "fetch metric" collection while executing compaction jobs. This is a bit awkward, but it matches how it was before because the other metrics are tied in with "sync" metrics (used during planning).
  • For transparency, the unit tests were largely written with AI and then revised.

Checklist

  • Tests updated.
  • Documentation added.
  • CHANGELOG.md updated - the order of entries should be [CHANGE], [FEATURE], [ENHANCEMENT], [BUGFIX]. If changelog entry is not needed, please add the changelog-not-needed label to the PR.
  • about-versioning.md updated with experimental features.

Note

Medium Risk
Changes compactor scheduler-mode planning/execution to rely on a shared metadata cache and new fetch paths, which could impact compaction job selection and failure behavior if cache keys/TTLs or marker filtering differ from the previous MetaFetcher/metaSyncer flow.

Overview
Adds batchCachingMetaFetcher, which batch-loads meta.json via cache.GetMulti and falls back to object storage, including lookback filtering and deletion/no-compact marker handling.

Scheduler-mode compaction is rewired to use this fetcher for both planning and execution (including populating cache on planning misses), introduces configurable compactor.scheduler-client.metadata-cache.* settings, and changes compaction-job execution to abandon jobs when block metadata is missing or corrupt.

Refactors garbage collection by extracting garbageCollectBlocks() from metaSyncer for reuse without syncing state, removes MetaFetcher.FetchRequestedMetas() (and its tests), and exposes bucketcache.ContentKey() so the new fetcher can compute cache keys directly.

Reviewed by Cursor Bugbot for commit 355c205. Bugbot is set up for automated code reviews on this repo. Configure here.

@andyasp andyasp added component/compactor changelog-not-needed PRs that don't need a CHANGELOG.md entry labels Apr 14, 2026
@andyasp andyasp requested a review from a team as a code owner April 14, 2026 23:20
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Autofix Details

Bugbot Autofix prepared a fix for the issue found in the latest run.

  • ✅ Fixed: No-compact blocks filtered before dedup changes GC behavior
    • I restored the original filter ordering by keeping no-compact blocks through metadata discovery and applying NoCompactionMarkFilter after deduplication, with a regression test proving duplicate source blocks are still identified for GC.

Create PR

Or push these changes by commenting:

@cursor push a367809fd1
Preview (a367809fd1)
diff --git a/pkg/compactor/batch_caching_meta_fetcher.go b/pkg/compactor/batch_caching_meta_fetcher.go
--- a/pkg/compactor/batch_caching_meta_fetcher.go
+++ b/pkg/compactor/batch_caching_meta_fetcher.go
@@ -192,11 +192,10 @@
 // discoverBlocks lists blocks and block markers to discover which blocks are relevant for compaction planning.
 // It sets discovery-related synced metrics on the provided synced gauge vec.
 func (f *batchCachingMetaFetcher) discoverBlocks(ctx context.Context, maxLookback time.Duration, synced block.GaugeVec) ([]ulid.ULID, error) {
-	var lookbackExcluded, deletionMarked, noCompactMarked float64
+	var lookbackExcluded, deletionMarked float64
 	defer func() {
 		synced.WithLabelValues(block.LookbackExcludedMeta).Set(lookbackExcluded)
 		synced.WithLabelValues(block.MarkedForDeletionMeta).Set(deletionMarked)
-		synced.WithLabelValues(block.MarkedForNoCompactionMeta).Set(noCompactMarked)
 	}()
 
 	var minAllowedBlockID ulid.ULID
@@ -227,14 +226,11 @@
 	})
 
 	deletionMarks := map[ulid.ULID]struct{}{}
-	noCompactMarks := map[ulid.ULID]struct{}{}
 	g.Go(func() error {
 		err := f.userBkt.Iter(gCtx, block.MarkersPathname+"/", func(name string) error {
 			base := path.Base(name)
 			if id, ok := block.IsDeletionMarkFilename(base); ok {
 				deletionMarks[id] = struct{}{}
-			} else if id, ok := block.IsNoCompactMarkFilename(base); ok {
-				noCompactMarks[id] = struct{}{}
 			}
 			return nil
 		})
@@ -253,10 +249,6 @@
 			deletionMarked++
 			continue
 		}
-		if _, marked := noCompactMarks[id]; marked {
-			noCompactMarked++
-			continue
-		}
 		unmarked = append(unmarked, id)
 	}
 

diff --git a/pkg/compactor/batch_caching_meta_fetcher_test.go b/pkg/compactor/batch_caching_meta_fetcher_test.go
--- a/pkg/compactor/batch_caching_meta_fetcher_test.go
+++ b/pkg/compactor/batch_caching_meta_fetcher_test.go
@@ -130,14 +130,15 @@
 		assert.Equal(t, float64(1), testutil.ToFloat64(metrics.Synced.WithLabelValues(block.LoadedMeta)))
 	})
 
-	t.Run("no-compaction markers filter out blocks", func(t *testing.T) {
+	t.Run("no-compaction markers are filtered by metadata filter", func(t *testing.T) {
 		fetcher, bkt, _, metrics := newTestFetcher(t, tenant)
+		instrumentedBucket := objstore.WithNoopInstr(bkt)
 
 		uploadBlockMeta(t, bkt, block1Meta)
 		uploadBlockMeta(t, bkt, block2Meta)
 		uploadNoCompactMark(t, bkt, block2Meta.ULID)
 
-		metas, err := fetcher.FetchMetasFromListing(ctx, 0, nil, metrics)
+		metas, err := fetcher.FetchMetasFromListing(ctx, 0, []block.MetadataFilter{NewNoCompactionMarkFilter(instrumentedBucket)}, metrics)
 
 		require.NoError(t, err)
 		assert.Len(t, metas, 1)
@@ -146,6 +147,30 @@
 		assert.Equal(t, float64(1), testutil.ToFloat64(metrics.Synced.WithLabelValues(block.LoadedMeta)))
 	})
 
+	t.Run("dedup runs before no-compaction filtering", func(t *testing.T) {
+		fetcher, bkt, _, metrics := newTestFetcher(t, tenant)
+		instrumentedBucket := objstore.WithNoopInstr(bkt)
+		deduplicateBlocksFilter := NewShardAwareDeduplicateFilter()
+
+		sourceMeta := createBlockMeta(t, now.Add(-2*time.Hour), now.Add(-time.Hour))
+		sourceMeta.Compaction.Sources = []ulid.ULID{sourceMeta.ULID}
+		compactedMeta := createBlockMeta(t, now.Add(-time.Hour), now)
+		compactedMeta.Compaction.Sources = []ulid.ULID{sourceMeta.ULID}
+
+		uploadBlockMeta(t, bkt, sourceMeta)
+		uploadBlockMeta(t, bkt, compactedMeta)
+		uploadNoCompactMark(t, bkt, compactedMeta.ULID)
+
+		metas, err := fetcher.FetchMetasFromListing(ctx, 0, []block.MetadataFilter{
+			deduplicateBlocksFilter,
+			NewNoCompactionMarkFilter(instrumentedBucket),
+		}, metrics)
+
+		require.NoError(t, err)
+		assert.Empty(t, metas)
+		assert.ElementsMatch(t, []ulid.ULID{sourceMeta.ULID}, deduplicateBlocksFilter.DuplicateIDs())
+	})
+
 	t.Run("corrupted cache entry triggers re-fetch", func(t *testing.T) {
 		fetcher, bkt, c, metrics := newTestFetcher(t, tenant)
 

diff --git a/pkg/compactor/executor.go b/pkg/compactor/executor.go
--- a/pkg/compactor/executor.go
+++ b/pkg/compactor/executor.go
@@ -582,14 +582,15 @@
 		maxLookback = 0
 	}
 
-	// The BatchCachingMetaFetcher handles marker filtering on its own
 	deduplicateBlocksFilter := NewShardAwareDeduplicateFilter()
+	noCompactionMarkFilter := NewNoCompactionMarkFilter(userBucket)
 	fetcher := newBatchCachingMetaFetcher(userBucket, e.metadataCache, userLogger, tenant, c.compactorCfg.MetaSyncConcurrency, e.cfg.MetadataCacheConfig.MetafileContentTTL)
 
 	level.Info(userLogger).Log("msg", "start sync of metas")
 	metas, err := fetcher.FetchMetasFromListing(ctx, maxLookback, []block.MetadataFilter{
 		NewLabelRemoverFilter(compactionIgnoredLabels),
 		deduplicateBlocksFilter,
+		noCompactionMarkFilter,
 	}, block.NewFetcherMetrics(reg, nil))
 	if err != nil {
 		return nil, fmt.Errorf("meta fetch failed: %w", err)

You can send follow-ups to the cloud agent here.

Comment thread pkg/compactor/batch_caching_meta_fetcher.go
@andyasp andyasp marked this pull request as draft April 14, 2026 23:29
@andyasp andyasp marked this pull request as ready for review April 15, 2026 22:11
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Autofix Details

Bugbot Autofix prepared a fix for the issue found in the latest run.

  • ✅ Fixed: Metadata cache leaked on scheduler client creation failure
    • newSchedulerExecutor now defers metadata cache cleanup and stops the cache when scheduler client initialization fails before the executor is returned.

Create PR

Or push these changes by commenting:

@cursor push 83cfbab652
Preview (83cfbab652)
diff --git a/pkg/compactor/executor.go b/pkg/compactor/executor.go
--- a/pkg/compactor/executor.go
+++ b/pkg/compactor/executor.go
@@ -168,6 +168,12 @@
 	if err != nil {
 		return nil, fmt.Errorf("failed to create metadata cache: %w", err)
 	}
+	metadataCacheNeedsCleanup := true
+	defer func() {
+		if metadataCacheNeedsCleanup {
+			metadataCache.Stop()
+		}
+	}()
 
 	executor := &schedulerExecutor{
 		cfg:                      cfg,
@@ -200,6 +206,7 @@
 		return nil, err
 	}
 
+	metadataCacheNeedsCleanup = false
 	return executor, nil
 }

You can send follow-ups to the cloud agent here.

Comment thread pkg/compactor/executor.go
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Autofix Details

Bugbot Autofix prepared a fix for the issue found in the latest run.

  • ✅ Fixed: Nil pointer dereference on cache cleanup in error path
    • I confirmed the constructor error path could call Stop on a nil cache and fixed it by guarding metadata cache cleanup with a nil check before stopping.

Create PR

Or push these changes by commenting:

@cursor push f3ecc32416
Preview (f3ecc32416)
diff --git a/pkg/compactor/executor.go b/pkg/compactor/executor.go
--- a/pkg/compactor/executor.go
+++ b/pkg/compactor/executor.go
@@ -197,7 +197,9 @@
 	// since grpc.Dial() creates the connection immediately and connects lazily.
 	executor.schedulerClient, executor.schedulerConn, err = executor.makeSchedulerClient()
 	if err != nil {
-		metadataCache.Stop()
+		if metadataCache != nil {
+			metadataCache.Stop()
+		}
 		return nil, err
 	}

You can send follow-ups to the cloud agent here.

Comment thread pkg/compactor/executor.go Outdated
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Autofix Details

Bugbot Autofix prepared a fix for the issue found in the latest run.

  • ✅ Fixed: Cache metrics missing standard "thanos_" prefix
    • Updated the compactor metadata cache client to wrap its registerer with the standard "thanos_" prefix so metrics align with existing cache dashboards and alerts.

Create PR

Or push these changes by commenting:

@cursor push 39e3f5dd52
Preview (39e3f5dd52)
diff --git a/pkg/compactor/executor.go b/pkg/compactor/executor.go
--- a/pkg/compactor/executor.go
+++ b/pkg/compactor/executor.go
@@ -164,7 +164,7 @@
 }
 
 func newSchedulerExecutor(cfg SchedulerClientConfig, logger log.Logger, invalidClusterValidation *prometheus.CounterVec, reg prometheus.Registerer) (*schedulerExecutor, error) {
-	metadataCache, err := cache.CreateClient("compactor-metadata-cache", cfg.MetadataCacheConfig.BackendConfig, logger, reg)
+	metadataCache, err := cache.CreateClient("compactor-metadata-cache", cfg.MetadataCacheConfig.BackendConfig, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
 	if err != nil {
 		return nil, fmt.Errorf("failed to create metadata cache: %w", err)
 	}

You can send follow-ups to the cloud agent here.

Comment thread pkg/compactor/executor.go Outdated
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 2 potential issues.

Autofix Details

Bugbot Autofix prepared a fix for 1 of the 2 issues found in the latest run.

  • ✅ Fixed: Error from innerFetchMetas not checked before continuing
    • FetchMetasFromListing now returns immediately on innerFetchMetas errors and skips TxGauge submission on failed syncs so previous synced gauge values are preserved.

Create PR

Or push these changes by commenting:

@cursor push d0815bcea2
Preview (d0815bcea2)
diff --git a/pkg/compactor/batch_caching_meta_fetcher.go b/pkg/compactor/batch_caching_meta_fetcher.go
--- a/pkg/compactor/batch_caching_meta_fetcher.go
+++ b/pkg/compactor/batch_caching_meta_fetcher.go
@@ -68,6 +68,7 @@
 		metrics.SyncDuration.Observe(time.Since(start).Seconds())
 		if err != nil {
 			metrics.SyncFailures.Inc()
+			return
 		}
 		metrics.Synced.WithLabelValues(block.LoadedMeta).Set(float64(len(metas)))
 		metrics.Submit()
@@ -79,6 +80,9 @@
 	}
 
 	metas, stats, err := f.innerFetchMetas(ctx, blockIDs, false, true, metrics.Synced, filters)
+	if err != nil {
+		return nil, err
+	}
 	stats.updateMetrics(metrics)
 
 	// Filter blocks marked as no-compact after innerFetchMetas to not get in the front of deduplication

diff --git a/pkg/compactor/batch_caching_meta_fetcher_test.go b/pkg/compactor/batch_caching_meta_fetcher_test.go
--- a/pkg/compactor/batch_caching_meta_fetcher_test.go
+++ b/pkg/compactor/batch_caching_meta_fetcher_test.go
@@ -232,22 +232,29 @@
 		assert.Equal(t, float64(1), testutil.ToFloat64(metrics.SyncFailures))
 	})
 
-	t.Run("get error increments failed sync and meta", func(t *testing.T) {
+	t.Run("get error does not submit synced gauges", func(t *testing.T) {
 		inner := objstore.NewInMemBucket()
 		uploadBlockMeta(t, inner, block1Meta)
-		errBkt := &bucket.ErrorInjectedBucketClient{
-			Bucket:   inner,
-			Injector: bucket.InjectErrorOn(bucket.OpGet, path.Join(block1Meta.ULID.String(), block.MetaFilename), errors.New("injected get error")),
-		}
-		metrics := block.NewFetcherMetrics(prometheus.NewPedanticRegistry(), nil)
+
+		reg := prometheus.NewPedanticRegistry()
+		metrics := block.NewFetcherMetrics(reg, nil)
+		errBkt := &bucket.ErrorInjectedBucketClient{Bucket: inner}
 		fetcher := newBatchCachingMetaFetcher(errBkt, nil, log.NewNopLogger(), tenant, 2, 24*time.Hour)
 
-		_, err := fetcher.FetchMetasFromListing(ctx, 0, nil, metrics)
+		metas, err := fetcher.FetchMetasFromListing(ctx, 0, nil, metrics)
+		require.NoError(t, err)
+		assert.Len(t, metas, 1)
+		assert.Equal(t, float64(1), gatherSyncedMetricValue(t, reg, block.LoadedMeta))
+		assert.Equal(t, float64(0), gatherSyncedMetricValue(t, reg, block.FailedMeta))
+
+		errBkt.Injector = bucket.InjectErrorOn(bucket.OpGet, path.Join(block1Meta.ULID.String(), block.MetaFilename), errors.New("injected get error"))
+		_, err = fetcher.FetchMetasFromListing(ctx, 0, nil, metrics)
 		require.Error(t, err)
 
-		assert.Equal(t, float64(1), testutil.ToFloat64(metrics.Syncs))
+		assert.Equal(t, float64(2), testutil.ToFloat64(metrics.Syncs))
 		assert.Equal(t, float64(1), testutil.ToFloat64(metrics.SyncFailures))
-		assert.Equal(t, float64(1), testutil.ToFloat64(metrics.Synced.WithLabelValues(block.FailedMeta)))
+		assert.Equal(t, float64(1), gatherSyncedMetricValue(t, reg, block.LoadedMeta))
+		assert.Equal(t, float64(0), gatherSyncedMetricValue(t, reg, block.FailedMeta))
 	})
 
 	t.Run("empty block listing", func(t *testing.T) {
@@ -434,3 +441,26 @@
 	delete(metas, f.id)
 	return nil
 }
+
+func gatherSyncedMetricValue(t *testing.T, gatherer prometheus.Gatherer, state string) float64 {
+	t.Helper()
+
+	metrics, err := gatherer.Gather()
+	require.NoError(t, err)
+
+	for _, metricFamily := range metrics {
+		if metricFamily.GetName() != "blocks_meta_synced" {
+			continue
+		}
+		for _, metric := range metricFamily.GetMetric() {
+			for _, label := range metric.GetLabel() {
+				if label.GetName() == "state" && label.GetValue() == state {
+					return metric.GetGauge().GetValue()
+				}
+			}
+		}
+	}
+
+	require.FailNowf(t, "metric not found", "blocks_meta_synced{state=%q} not found", state)
+	return 0
+}

You can send follow-ups to the cloud agent here.

Comment thread pkg/compactor/batch_caching_meta_fetcher.go Outdated
Comment thread pkg/storage/tsdb/bucketcache/caching_bucket.go
@andyasp andyasp force-pushed the aasp/metadata-caching-during-compaction branch from fd59987 to 7b23a81 Compare April 16, 2026 21:16
logger: logger,
tenant: tenant,
concurrency: concurrency,
contentTTL: contentTTL,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing that may be worth optimizing is dynamically setting a TTL based on how long we think a metadata may live. That may be a bit involved.

Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Autofix Details

Bugbot Autofix prepared a fix for the issue found in the latest run.

  • ✅ Fixed: Nil registerer wrapping will panic with configured cache
    • I added a nil-check before wrapping the registerer in newSchedulerExecutor and a regression test covering nil registerer plus configured metadata cache so this path no longer panics.

Create PR

Or push these changes by commenting:

@cursor push bf721e30dd
Preview (bf721e30dd)
diff --git a/pkg/compactor/executor.go b/pkg/compactor/executor.go
--- a/pkg/compactor/executor.go
+++ b/pkg/compactor/executor.go
@@ -164,7 +164,10 @@
 }
 
 func newSchedulerExecutor(cfg SchedulerClientConfig, logger log.Logger, invalidClusterValidation *prometheus.CounterVec, reg prometheus.Registerer) (*schedulerExecutor, error) {
-	cacheReg := prometheus.WrapRegistererWithPrefix("thanos_", prometheus.WrapRegistererWith(prometheus.Labels{"component": "compactor"}, reg))
+	var cacheReg prometheus.Registerer
+	if reg != nil {
+		cacheReg = prometheus.WrapRegistererWithPrefix("thanos_", prometheus.WrapRegistererWith(prometheus.Labels{"component": "compactor"}, reg))
+	}
 	metadataCache, err := cache.CreateClient("metadata-cache", cfg.MetadataCacheConfig.BackendConfig, logger, cacheReg)
 	if err != nil {
 		return nil, fmt.Errorf("failed to create metadata cache: %w", err)

diff --git a/pkg/compactor/executor_test.go b/pkg/compactor/executor_test.go
--- a/pkg/compactor/executor_test.go
+++ b/pkg/compactor/executor_test.go
@@ -17,6 +17,7 @@
 	"time"
 
 	"github.com/go-kit/log"
+	"github.com/grafana/dskit/cache"
 	"github.com/grafana/dskit/flagext"
 	"github.com/grafana/dskit/kv/consul"
 	"github.com/grafana/dskit/ring"
@@ -157,6 +158,25 @@
 	return exec
 }
 
+func TestNewSchedulerExecutor_NilRegistererWithConfiguredMetadataCache(t *testing.T) {
+	cfg := makeTestCompactorConfig()
+	cfg.SchedulerClientConfig.MetadataCacheConfig.Backend = cache.BackendMemcached
+	cfg.SchedulerClientConfig.MetadataCacheConfig.Memcached.Addresses = flagext.StringSliceCSV{"localhost:11211"}
+
+	var (
+		exec *schedulerExecutor
+		err  error
+	)
+	require.NotPanics(t, func() {
+		exec, err = newSchedulerExecutor(cfg.SchedulerClientConfig, log.NewNopLogger(), nil, nil)
+	})
+	require.NoError(t, err)
+	require.NotNil(t, exec)
+	t.Cleanup(func() {
+		require.NoError(t, exec.stop())
+	})
+}
+
 func prepareCompactorForExecutorTest(t *testing.T, cfg Config, bkt objstore.Bucket, cfgProvider ConfigProvider) *MultitenantCompactor {
 	t.Helper()
 	c, _, _, _, _ := prepareWithConfigProvider(t, cfg, bkt, cfgProvider)

You can send follow-ups to the cloud agent here.

Comment thread pkg/compactor/executor.go
Comment on lines +61 to +63
// FetchMetasFromListing discovers blockIDs through an object storage listing,
// filters by ULID time and deletion markers, then batch fetches metadata from cache with fallback to object storage.
func (f *batchCachingMetaFetcher) FetchMetasFromListing(ctx context.Context, maxLookback time.Duration, filters []block.MetadataFilter, metrics *block.FetcherMetrics) (metas map[ulid.ULID]*block.Meta, err error) {
Copy link
Copy Markdown
Contributor

@npazosmendez npazosmendez Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also mention it filters out blocks marked for "no compaction", it caught me off guard. I would even suggest putting it in the method's name, but FetchCompactableMetasFromListing is maybe too long?

But then again I'm not very familiar with the MetaFetcher/Syncer interfaces

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was previously done in a filter, but I included it in the method itself to combine the delete marker and no-compact marker listing into one.

Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

Bugbot Autofix prepared a fix for the issue found in the latest run.

  • ✅ Fixed: Corrupted meta silently skipped changes job outcome to abandon
    • FetchMetasFromIDs now returns ErrorSyncMetaCorrupted for corrupted meta.json so compaction execution correctly reassigns instead of abandoning, with regression tests added.

Create PR

Or push these changes by commenting:

@cursor push e9b8940f81
Preview (e9b8940f81)
diff --git a/pkg/compactor/batch_caching_meta_fetcher.go b/pkg/compactor/batch_caching_meta_fetcher.go
--- a/pkg/compactor/batch_caching_meta_fetcher.go
+++ b/pkg/compactor/batch_caching_meta_fetcher.go
@@ -120,8 +120,8 @@
 
 // innerFetchMetas fetches metadata for the given block IDs from cache and/or storage
 // with fast-fail behavior and applies filters afterwards.
-// When failOnNotFound is true, a missing meta.json in storage returns an error
-// instead of being silently skipped.
+// When failOnNotFound is true, a missing or corrupted meta.json in storage
+// returns an error instead of being silently skipped.
 // When cacheContent is true, cache misses that were later successfully loaded will
 // be set in the cache.
 func (f *batchCachingMetaFetcher) innerFetchMetas(ctx context.Context, blockIDs []ulid.ULID, failOnNotFound bool, cacheContent bool, synced block.GaugeVec, filters []block.MetadataFilter) (map[ulid.ULID]*block.Meta, *fetchStats, error) {
@@ -174,7 +174,11 @@
 
 			m := &block.Meta{}
 			if err := json.Unmarshal(data, m); err != nil {
-				// Corrupted meta.json: skip the block, consistent with MetaFetcher behavior.
+				if failOnNotFound {
+					return fmt.Errorf("corrupted block metadata in bucket for %s: %w", id, block.ErrorSyncMetaCorrupted)
+				}
+
+				// Corrupted meta.json in listing-based sync: skip the block, consistent with MetaFetcher behavior.
 				level.Warn(f.logger).Log("msg", "corrupted block meta, skipping", "block", id, "err", err)
 				stats.corrupted.Inc()
 				return nil

diff --git a/pkg/compactor/batch_caching_meta_fetcher_test.go b/pkg/compactor/batch_caching_meta_fetcher_test.go
--- a/pkg/compactor/batch_caching_meta_fetcher_test.go
+++ b/pkg/compactor/batch_caching_meta_fetcher_test.go
@@ -329,6 +329,19 @@
 		assert.Contains(t, err.Error(), block2Meta.ULID.String())
 	})
 
+	t.Run("returns error when meta is corrupted in storage", func(t *testing.T) {
+		fetcher, bkt, _, _ := newTestFetcher(t, tenant)
+
+		uploadBlockMeta(t, bkt, block1Meta)
+		require.NoError(t, bkt.Upload(ctx, path.Join(block2Meta.ULID.String(), block.MetaFilename), bytes.NewReader([]byte("not valid json"))))
+
+		_, err := fetcher.FetchMetasFromIDs(ctx, []ulid.ULID{block1Meta.ULID, block2Meta.ULID}, nil)
+
+		require.Error(t, err)
+		assert.ErrorIs(t, err, block.ErrorSyncMetaCorrupted)
+		assert.Contains(t, err.Error(), block2Meta.ULID.String())
+	})
+
 	t.Run("returns cached meta without storage fetch", func(t *testing.T) {
 		fetcher, _, c, _ := newTestFetcher(t, tenant)
 

diff --git a/pkg/compactor/executor_test.go b/pkg/compactor/executor_test.go
--- a/pkg/compactor/executor_test.go
+++ b/pkg/compactor/executor_test.go
@@ -3,6 +3,7 @@
 package compactor
 
 import (
+	"bytes"
 	"cmp"
 	"context"
 	"errors"
@@ -716,6 +717,19 @@
 			expectNewBlocksCount: 0,
 			expectError:          true,
 		},
+		"reassign_when_requested_block_meta_is_corrupted": {
+			setupBucket: func(t *testing.T, bkt objstore.Bucket) setupResult {
+				require.NoError(t, bkt.Upload(context.Background(), fmt.Sprintf("test-tenant/%s/meta.json", testBlockID1.String()), bytes.NewReader([]byte("not valid json"))))
+				return setupResult{
+					blockIDsToCompact: []ulid.ULID{testBlockID1},
+					compactedBlocks:   nil,
+					uncompactedBlocks: []ulid.ULID{testBlockID1},
+				}
+			},
+			expectedStatus:       compactorschedulerpb.UPDATE_TYPE_REASSIGN,
+			expectNewBlocksCount: 0,
+			expectError:          true,
+		},
 	}
 
 	for testName, tc := range tests {

You can send follow-ups to the cloud agent here.

Reviewed by Cursor Bugbot for commit 684bd4c. Configure here.

Comment thread pkg/compactor/batch_caching_meta_fetcher.go
@andyasp andyasp merged commit 839ded0 into main Apr 21, 2026
115 of 116 checks passed
@andyasp andyasp deleted the aasp/metadata-caching-during-compaction branch April 21, 2026 18:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

changelog-not-needed PRs that don't need a CHANGELOG.md entry component/compactor

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants