diff --git a/internal/beater/monitoringtest/opentelemetry.go b/internal/beater/monitoringtest/opentelemetry.go index 9b643f3d171..271399e4138 100644 --- a/internal/beater/monitoringtest/opentelemetry.go +++ b/internal/beater/monitoringtest/opentelemetry.go @@ -42,6 +42,18 @@ func ExpectContainOtelMetrics( assertOtelMetrics(t, reader, expectedMetrics, false, false) } +func ExpectContainAndNotContainOtelMetrics( + t *testing.T, + reader sdkmetric.Reader, + expectedMetrics map[string]any, + notExpectedMetricKeys []string, +) { + foundMetricKeys := assertOtelMetrics(t, reader, expectedMetrics, false, false) + for _, key := range notExpectedMetricKeys { + assert.NotContains(t, foundMetricKeys, key) + } +} + func ExpectContainOtelMetricsKeys(t assert.TestingT, reader sdkmetric.Reader, expectedMetricsKeys []string) { expectedMetrics := make(map[string]any) for _, metricKey := range expectedMetricsKeys { @@ -62,7 +74,7 @@ func assertOtelMetrics( reader sdkmetric.Reader, expectedMetrics map[string]any, fullMatch, skipValAssert bool, -) { +) []string { var rm metricdata.ResourceMetrics assert.NoError(t, reader.Collect(context.Background(), &rm)) @@ -118,8 +130,9 @@ func assertOtelMetrics( expectedMetricsKeys = append(expectedMetricsKeys, k) } if fullMatch { - assert.ElementsMatch(t, expectedMetricsKeys, foundMetrics) + assert.ElementsMatch(t, foundMetrics, expectedMetricsKeys) } else { assert.Subset(t, foundMetrics, expectedMetricsKeys) } + return foundMetrics } diff --git a/x-pack/apm-server/sampling/eventstorage/rw.go b/x-pack/apm-server/sampling/eventstorage/rw.go index a12b60f52d9..d341de13b43 100644 --- a/x-pack/apm-server/sampling/eventstorage/rw.go +++ b/x-pack/apm-server/sampling/eventstorage/rw.go @@ -7,7 +7,9 @@ package eventstorage import ( "errors" "fmt" + "sync" + "github.com/cespare/xxhash/v2" "github.com/elastic/apm-data/model/modelpb" ) @@ -139,3 +141,93 @@ func (s StorageLimitReadWriter) DeleteTraceEvent(traceID, id string) error { // Technically DeleteTraceEvent writes, but it should have a net effect of reducing disk usage return s.nextRW.DeleteTraceEvent(traceID, id) } + +type ShardLockReadWriter struct { + rws []*lockedReadWriter +} + +func NewShardLockReadWriter(numShards int, nextRW RW) *ShardLockReadWriter { + if numShards <= 0 { + panic("ShardLockReadWriter numShards must be greater than zero") + } + rws := make([]*lockedReadWriter, numShards) + for i := 0; i < numShards; i++ { + rws[i] = newLockedReadWriter(nextRW) + } + return &ShardLockReadWriter{rws: rws} +} + +// ReadTraceEvents calls ReadTraceEvents, using a sharded, locked RW. +func (s *ShardLockReadWriter) ReadTraceEvents(traceID string, out *modelpb.Batch) error { + return s.getReadWriter(traceID).ReadTraceEvents(traceID, out) +} + +// WriteTraceEvent calls WriteTraceEvent, using a sharded, locked RW. +func (s *ShardLockReadWriter) WriteTraceEvent(traceID, id string, event *modelpb.APMEvent) error { + return s.getReadWriter(traceID).WriteTraceEvent(traceID, id, event) +} + +// WriteTraceSampled calls WriteTraceSampled, using a sharded, locked, RW. +func (s *ShardLockReadWriter) WriteTraceSampled(traceID string, sampled bool) error { + return s.getReadWriter(traceID).WriteTraceSampled(traceID, sampled) +} + +// IsTraceSampled calls IsTraceSampled, using a sharded, locked RW. +func (s *ShardLockReadWriter) IsTraceSampled(traceID string) (bool, error) { + return s.getReadWriter(traceID).IsTraceSampled(traceID) +} + +// DeleteTraceEvent calls DeleteTraceEvent, using a sharded, locked RW. +func (s *ShardLockReadWriter) DeleteTraceEvent(traceID, id string) error { + return s.getReadWriter(traceID).DeleteTraceEvent(traceID, id) +} + +// getReadWriter returns a lockedReadWriter for the given trace ID. +// +// This method is idempotent, which is necessary to avoid transaction +// conflicts and ensure all events are reported once a sampling decision +// has been recorded. +func (s *ShardLockReadWriter) getReadWriter(traceID string) *lockedReadWriter { + var h xxhash.Digest + _, _ = h.WriteString(traceID) + return s.rws[h.Sum64()%uint64(len(s.rws))] +} + +type lockedReadWriter struct { + mu sync.Mutex + rw RW +} + +func newLockedReadWriter(rw RW) *lockedReadWriter { + return &lockedReadWriter{rw: rw} +} + +func (rw *lockedReadWriter) ReadTraceEvents(traceID string, out *modelpb.Batch) error { + rw.mu.Lock() + defer rw.mu.Unlock() + return rw.rw.ReadTraceEvents(traceID, out) +} + +func (rw *lockedReadWriter) WriteTraceEvent(traceID, id string, event *modelpb.APMEvent) error { + rw.mu.Lock() + defer rw.mu.Unlock() + return rw.rw.WriteTraceEvent(traceID, id, event) +} + +func (rw *lockedReadWriter) WriteTraceSampled(traceID string, sampled bool) error { + rw.mu.Lock() + defer rw.mu.Unlock() + return rw.rw.WriteTraceSampled(traceID, sampled) +} + +func (rw *lockedReadWriter) IsTraceSampled(traceID string) (bool, error) { + rw.mu.Lock() + defer rw.mu.Unlock() + return rw.rw.IsTraceSampled(traceID) +} + +func (rw *lockedReadWriter) DeleteTraceEvent(traceID, id string) error { + rw.mu.Lock() + defer rw.mu.Unlock() + return rw.rw.DeleteTraceEvent(traceID, id) +} diff --git a/x-pack/apm-server/sampling/processor.go b/x-pack/apm-server/sampling/processor.go index c2d9816570f..e225771f3cf 100644 --- a/x-pack/apm-server/sampling/processor.go +++ b/x-pack/apm-server/sampling/processor.go @@ -10,6 +10,7 @@ import ( "errors" "fmt" "os" + "runtime" "sync" "time" @@ -17,10 +18,11 @@ import ( "golang.org/x/sync/errgroup" "github.com/elastic/apm-data/model/modelpb" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/apm-server/internal/logs" "github.com/elastic/apm-server/x-pack/apm-server/sampling/eventstorage" "github.com/elastic/apm-server/x-pack/apm-server/sampling/pubsub" - "github.com/elastic/elastic-agent-libs/logp" ) const ( @@ -71,7 +73,7 @@ func NewProcessor(config Config, logger *logp.Logger) (*Processor, error) { logger: logger, rateLimitedLogger: logger.WithOptions(logs.WithRateLimit(loggerRateLimit)), groups: newTraceGroups(meter, config.Policies, config.MaxDynamicServices, config.IngestRateDecayFactor), - eventStore: config.Storage, + eventStore: eventstorage.NewShardLockReadWriter(runtime.GOMAXPROCS(0), config.Storage), stopping: make(chan struct{}), stopped: make(chan struct{}), } diff --git a/x-pack/apm-server/sampling/processor_test.go b/x-pack/apm-server/sampling/processor_test.go index 905611c7848..8f774764952 100644 --- a/x-pack/apm-server/sampling/processor_test.go +++ b/x-pack/apm-server/sampling/processor_test.go @@ -11,6 +11,7 @@ import ( "math/rand" "os" "path/filepath" + "sync" "testing" "time" @@ -23,12 +24,13 @@ import ( "google.golang.org/protobuf/testing/protocmp" "github.com/elastic/apm-data/model/modelpb" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/logp/logptest" + "github.com/elastic/apm-server/internal/beater/monitoringtest" "github.com/elastic/apm-server/x-pack/apm-server/sampling" "github.com/elastic/apm-server/x-pack/apm-server/sampling/eventstorage" "github.com/elastic/apm-server/x-pack/apm-server/sampling/pubsub/pubsubtest" - "github.com/elastic/elastic-agent-libs/logp" - "github.com/elastic/elastic-agent-libs/logp/logptest" ) func TestProcessUnsampled(t *testing.T) { @@ -830,6 +832,154 @@ func TestGracefulShutdown(t *testing.T) { assert.Equal(t, int(sampleRate*float64(totalTraces)), count) } +type blockingRW struct { + ctx context.Context + unblockWriteEvent chan struct{} + unblockWriteSampled chan struct{} + next eventstorage.RW +} + +func (m blockingRW) ReadTraceEvents(traceID string, out *modelpb.Batch) error { + return m.next.ReadTraceEvents(traceID, out) +} + +func (m blockingRW) WriteTraceEvent(traceID, id string, event *modelpb.APMEvent) error { + select { + case <-m.ctx.Done(): + return m.ctx.Err() + case <-m.unblockWriteEvent: + return m.next.WriteTraceEvent(traceID, id, event) + } +} + +func (m blockingRW) WriteTraceSampled(traceID string, sampled bool) error { + select { + case <-m.ctx.Done(): + return m.ctx.Err() + case <-m.unblockWriteSampled: + return m.next.WriteTraceSampled(traceID, sampled) + } +} + +func (m blockingRW) IsTraceSampled(traceID string) (bool, error) { + return m.next.IsTraceSampled(traceID) +} + +func (m blockingRW) DeleteTraceEvent(traceID, id string) error { + return m.next.DeleteTraceEvent(traceID, id) +} + +func TestPotentialRaceCondition(t *testing.T) { + unblockWriteEvent := make(chan struct{}, 1) + unblockWriteSampled := make(chan struct{}, 1) + + flushInterval := time.Second + tempdirConfig := newTempdirConfig(t) + timeoutCtx, cancel := context.WithTimeout(context.Background(), 10*flushInterval) + defer cancel() + + tempdirConfig.Config.FlushInterval = flushInterval + // Wrap existing storage with blockingRW so we can control the flow. + tempdirConfig.Config.Storage = &blockingRW{ + ctx: timeoutCtx, + unblockWriteEvent: unblockWriteEvent, + unblockWriteSampled: unblockWriteSampled, + next: tempdirConfig.Config.Storage, + } + // Collect reported transactions. + reported := make(chan modelpb.Batch) + tempdirConfig.Config.BatchProcessor = modelpb.ProcessBatchFunc(func(ctx context.Context, batch *modelpb.Batch) error { + select { + case <-ctx.Done(): + return ctx.Err() + case reported <- batch.Clone(): + return nil + } + }) + + processor, err := sampling.NewProcessor(tempdirConfig.Config, logptest.NewTestingLogger(t, "")) + require.NoError(t, err) + go processor.Run() + defer processor.Stop(context.Background()) + + // Send root transaction, which will be written to storage since write event is unblocked. + unblockWriteEvent <- struct{}{} + batch1 := modelpb.Batch{{ + Trace: &modelpb.Trace{Id: "trace1"}, + Transaction: &modelpb.Transaction{ + Type: "type", + Id: "transaction1", + Sampled: true, + }, + }} + err = processor.ProcessBatch(context.Background(), &batch1) + require.NoError(t, err) + assert.Len(t, batch1, 0) + monitoringtest.ExpectContainAndNotContainOtelMetrics(t, tempdirConfig.metricReader, + map[string]any{ + "apm-server.sampling.tail.events.processed": 1, + "apm-server.sampling.tail.events.stored": 1, + }, + []string{ + "apm-server.sampling.tail.events.sampled", + }, + ) + + // Send child transaction, which will be blocked from writing to storage for now. + var wg sync.WaitGroup + wg.Go(func() { + batch2 := modelpb.Batch{{ + Trace: &modelpb.Trace{Id: "trace1"}, + Transaction: &modelpb.Transaction{ + Type: "type", + Id: "transaction2", + Sampled: true, + }, + ParentId: "transaction1", + }} + err = processor.ProcessBatch(context.Background(), &batch2) + require.NoError(t, err) + assert.Len(t, batch2, 0) + }) + + // Unblock write sampled so that the first transaction is published. + unblockWriteSampled <- struct{}{} + time.Sleep(2 * flushInterval) + // Unblock write event so that the second transaction is written to storage. + unblockWriteEvent <- struct{}{} + wg.Wait() + monitoringtest.ExpectContainAndNotContainOtelMetrics(t, tempdirConfig.metricReader, + map[string]any{ + // This comes from second transaction being stored. + "apm-server.sampling.tail.events.stored": 1, + "apm-server.sampling.tail.events.processed": 1, + }, + []string{ + "apm-server.sampling.tail.events.sampled", + }, + ) + + select { + case batch := <-reported: + if len(batch) != 2 { + t.Fatal("expected two events in publish") + } + assert.Equal(t, batch[0].Transaction.Id, "transaction1") + assert.Equal(t, batch[1].Transaction.Id, "transaction2") + case <-timeoutCtx.Done(): + t.Fatal("test timed out waiting for publish") + } + monitoringtest.ExpectContainAndNotContainOtelMetrics(t, tempdirConfig.metricReader, + map[string]any{ + "apm-server.sampling.tail.events.sampled": 2, + }, + []string{ + "apm-server.sampling.tail.events.processed", + "apm-server.sampling.tail.events.stored", + }, + ) +} + type testConfig struct { sampling.Config tempDir string