Skip to content
17 changes: 15 additions & 2 deletions internal/beater/monitoringtest/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,18 @@ func ExpectContainOtelMetrics(
assertOtelMetrics(t, reader, expectedMetrics, false, false)
}

func ExpectContainAndNotContainOtelMetrics(
t *testing.T,
reader sdkmetric.Reader,
expectedMetrics map[string]any,
notExpectedMetricKeys []string,
) {
foundMetricKeys := assertOtelMetrics(t, reader, expectedMetrics, false, false)
for _, key := range notExpectedMetricKeys {
assert.NotContains(t, foundMetricKeys, key)
}
}

func ExpectContainOtelMetricsKeys(t assert.TestingT, reader sdkmetric.Reader, expectedMetricsKeys []string) {
expectedMetrics := make(map[string]any)
for _, metricKey := range expectedMetricsKeys {
Expand All @@ -62,7 +74,7 @@ func assertOtelMetrics(
reader sdkmetric.Reader,
expectedMetrics map[string]any,
fullMatch, skipValAssert bool,
) {
) []string {
var rm metricdata.ResourceMetrics
assert.NoError(t, reader.Collect(context.Background(), &rm))

Expand Down Expand Up @@ -118,8 +130,9 @@ func assertOtelMetrics(
expectedMetricsKeys = append(expectedMetricsKeys, k)
}
if fullMatch {
assert.ElementsMatch(t, expectedMetricsKeys, foundMetrics)
assert.ElementsMatch(t, foundMetrics, expectedMetricsKeys)
} else {
assert.Subset(t, foundMetrics, expectedMetricsKeys)
}
return foundMetrics
}
92 changes: 92 additions & 0 deletions x-pack/apm-server/sampling/eventstorage/rw.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ package eventstorage
import (
"errors"
"fmt"
"sync"

"github.com/cespare/xxhash/v2"
"github.com/elastic/apm-data/model/modelpb"
)

Expand Down Expand Up @@ -139,3 +141,93 @@ func (s StorageLimitReadWriter) DeleteTraceEvent(traceID, id string) error {
// Technically DeleteTraceEvent writes, but it should have a net effect of reducing disk usage
return s.nextRW.DeleteTraceEvent(traceID, id)
}

type ShardLockReadWriter struct {
rws []*lockedReadWriter
}

func NewShardLockReadWriter(numShards int, nextRW RW) *ShardLockReadWriter {
if numShards <= 0 {
panic("ShardLockReadWriter numShards must be greater than zero")
}
rws := make([]*lockedReadWriter, numShards)
for i := 0; i < numShards; i++ {
rws[i] = newLockedReadWriter(nextRW)
}
return &ShardLockReadWriter{rws: rws}
}

// ReadTraceEvents calls ReadTraceEvents, using a sharded, locked RW.
func (s *ShardLockReadWriter) ReadTraceEvents(traceID string, out *modelpb.Batch) error {
return s.getReadWriter(traceID).ReadTraceEvents(traceID, out)
}

// WriteTraceEvent calls WriteTraceEvent, using a sharded, locked RW.
func (s *ShardLockReadWriter) WriteTraceEvent(traceID, id string, event *modelpb.APMEvent) error {
return s.getReadWriter(traceID).WriteTraceEvent(traceID, id, event)
}

// WriteTraceSampled calls WriteTraceSampled, using a sharded, locked, RW.
func (s *ShardLockReadWriter) WriteTraceSampled(traceID string, sampled bool) error {
return s.getReadWriter(traceID).WriteTraceSampled(traceID, sampled)
}

// IsTraceSampled calls IsTraceSampled, using a sharded, locked RW.
func (s *ShardLockReadWriter) IsTraceSampled(traceID string) (bool, error) {
return s.getReadWriter(traceID).IsTraceSampled(traceID)
}

// DeleteTraceEvent calls DeleteTraceEvent, using a sharded, locked RW.
func (s *ShardLockReadWriter) DeleteTraceEvent(traceID, id string) error {
return s.getReadWriter(traceID).DeleteTraceEvent(traceID, id)
}

// getReadWriter returns a lockedReadWriter for the given trace ID.
//
// This method is idempotent, which is necessary to avoid transaction
// conflicts and ensure all events are reported once a sampling decision
// has been recorded.
func (s *ShardLockReadWriter) getReadWriter(traceID string) *lockedReadWriter {
var h xxhash.Digest
_, _ = h.WriteString(traceID)
return s.rws[h.Sum64()%uint64(len(s.rws))]
}

type lockedReadWriter struct {
mu sync.Mutex
rw RW
}

func newLockedReadWriter(rw RW) *lockedReadWriter {
return &lockedReadWriter{rw: rw}
}

func (rw *lockedReadWriter) ReadTraceEvents(traceID string, out *modelpb.Batch) error {
rw.mu.Lock()
defer rw.mu.Unlock()
return rw.rw.ReadTraceEvents(traceID, out)
}

func (rw *lockedReadWriter) WriteTraceEvent(traceID, id string, event *modelpb.APMEvent) error {
rw.mu.Lock()
defer rw.mu.Unlock()
return rw.rw.WriteTraceEvent(traceID, id, event)
}

func (rw *lockedReadWriter) WriteTraceSampled(traceID string, sampled bool) error {
rw.mu.Lock()
defer rw.mu.Unlock()
return rw.rw.WriteTraceSampled(traceID, sampled)
}

func (rw *lockedReadWriter) IsTraceSampled(traceID string) (bool, error) {
rw.mu.Lock()
defer rw.mu.Unlock()
return rw.rw.IsTraceSampled(traceID)
}
Comment on lines +223 to +227
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I acknowledge 8.x doesn't have the optimization to use a RWMutex, but given the ingest hot path is IsTraceSampled->WriteTraceEvent, I wonder if swapping it to RWMutex would further reduce lock contention. However, it may or may not show up in the benchmark, depending on GOMAXPROCS, as well as trace sampled hit rate (which will be 0 because the benchmark always generates new trace ID)


func (rw *lockedReadWriter) DeleteTraceEvent(traceID, id string) error {
rw.mu.Lock()
defer rw.mu.Unlock()
return rw.rw.DeleteTraceEvent(traceID, id)
}
6 changes: 4 additions & 2 deletions x-pack/apm-server/sampling/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,19 @@ import (
"errors"
"fmt"
"os"
"runtime"
"sync"
"time"

"go.opentelemetry.io/otel/metric"
"golang.org/x/sync/errgroup"

"github.com/elastic/apm-data/model/modelpb"
"github.com/elastic/elastic-agent-libs/logp"

"github.com/elastic/apm-server/internal/logs"
"github.com/elastic/apm-server/x-pack/apm-server/sampling/eventstorage"
"github.com/elastic/apm-server/x-pack/apm-server/sampling/pubsub"
"github.com/elastic/elastic-agent-libs/logp"
)

const (
Expand Down Expand Up @@ -71,7 +73,7 @@ func NewProcessor(config Config, logger *logp.Logger) (*Processor, error) {
logger: logger,
rateLimitedLogger: logger.WithOptions(logs.WithRateLimit(loggerRateLimit)),
groups: newTraceGroups(meter, config.Policies, config.MaxDynamicServices, config.IngestRateDecayFactor),
eventStore: config.Storage,
eventStore: eventstorage.NewShardLockReadWriter(runtime.GOMAXPROCS(0), config.Storage),
stopping: make(chan struct{}),
stopped: make(chan struct{}),
}
Expand Down
154 changes: 152 additions & 2 deletions x-pack/apm-server/sampling/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"math/rand"
"os"
"path/filepath"
"sync"
"testing"
"time"

Expand All @@ -23,12 +24,13 @@ import (
"google.golang.org/protobuf/testing/protocmp"

"github.com/elastic/apm-data/model/modelpb"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/logp/logptest"

"github.com/elastic/apm-server/internal/beater/monitoringtest"
"github.com/elastic/apm-server/x-pack/apm-server/sampling"
"github.com/elastic/apm-server/x-pack/apm-server/sampling/eventstorage"
"github.com/elastic/apm-server/x-pack/apm-server/sampling/pubsub/pubsubtest"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/logp/logptest"
)

func TestProcessUnsampled(t *testing.T) {
Expand Down Expand Up @@ -830,6 +832,154 @@ func TestGracefulShutdown(t *testing.T) {
assert.Equal(t, int(sampleRate*float64(totalTraces)), count)
}

type blockingRW struct {
ctx context.Context
unblockWriteEvent chan struct{}
unblockWriteSampled chan struct{}
next eventstorage.RW
}

func (m blockingRW) ReadTraceEvents(traceID string, out *modelpb.Batch) error {
return m.next.ReadTraceEvents(traceID, out)
}

func (m blockingRW) WriteTraceEvent(traceID, id string, event *modelpb.APMEvent) error {
select {
case <-m.ctx.Done():
return m.ctx.Err()
case <-m.unblockWriteEvent:
return m.next.WriteTraceEvent(traceID, id, event)
}
}

func (m blockingRW) WriteTraceSampled(traceID string, sampled bool) error {
select {
case <-m.ctx.Done():
return m.ctx.Err()
case <-m.unblockWriteSampled:
return m.next.WriteTraceSampled(traceID, sampled)
}
}

func (m blockingRW) IsTraceSampled(traceID string) (bool, error) {
return m.next.IsTraceSampled(traceID)
}

func (m blockingRW) DeleteTraceEvent(traceID, id string) error {
return m.next.DeleteTraceEvent(traceID, id)
}

func TestPotentialRaceCondition(t *testing.T) {
unblockWriteEvent := make(chan struct{}, 1)
unblockWriteSampled := make(chan struct{}, 1)

flushInterval := time.Second
tempdirConfig := newTempdirConfig(t)
timeoutCtx, cancel := context.WithTimeout(context.Background(), 10*flushInterval)
defer cancel()

tempdirConfig.Config.FlushInterval = flushInterval
// Wrap existing storage with blockingRW so we can control the flow.
tempdirConfig.Config.Storage = &blockingRW{
ctx: timeoutCtx,
unblockWriteEvent: unblockWriteEvent,
unblockWriteSampled: unblockWriteSampled,
next: tempdirConfig.Config.Storage,
}
// Collect reported transactions.
reported := make(chan modelpb.Batch)
tempdirConfig.Config.BatchProcessor = modelpb.ProcessBatchFunc(func(ctx context.Context, batch *modelpb.Batch) error {
select {
case <-ctx.Done():
return ctx.Err()
case reported <- batch.Clone():
return nil
}
})

processor, err := sampling.NewProcessor(tempdirConfig.Config, logptest.NewTestingLogger(t, ""))
require.NoError(t, err)
go processor.Run()
defer processor.Stop(context.Background())

// Send root transaction, which will be written to storage since write event is unblocked.
unblockWriteEvent <- struct{}{}
batch1 := modelpb.Batch{{
Trace: &modelpb.Trace{Id: "trace1"},
Transaction: &modelpb.Transaction{
Type: "type",
Id: "transaction1",
Sampled: true,
},
}}
err = processor.ProcessBatch(context.Background(), &batch1)
require.NoError(t, err)
assert.Len(t, batch1, 0)
monitoringtest.ExpectContainAndNotContainOtelMetrics(t, tempdirConfig.metricReader,
map[string]any{
"apm-server.sampling.tail.events.processed": 1,
"apm-server.sampling.tail.events.stored": 1,
},
[]string{
"apm-server.sampling.tail.events.sampled",
},
)

// Send child transaction, which will be blocked from writing to storage for now.
var wg sync.WaitGroup
wg.Go(func() {
batch2 := modelpb.Batch{{
Trace: &modelpb.Trace{Id: "trace1"},
Transaction: &modelpb.Transaction{
Type: "type",
Id: "transaction2",
Sampled: true,
},
ParentId: "transaction1",
}}
err = processor.ProcessBatch(context.Background(), &batch2)
require.NoError(t, err)
assert.Len(t, batch2, 0)
})

// Unblock write sampled so that the first transaction is published.
unblockWriteSampled <- struct{}{}
time.Sleep(2 * flushInterval)
// Unblock write event so that the second transaction is written to storage.
unblockWriteEvent <- struct{}{}
wg.Wait()
monitoringtest.ExpectContainAndNotContainOtelMetrics(t, tempdirConfig.metricReader,
map[string]any{
// This comes from second transaction being stored.
"apm-server.sampling.tail.events.stored": 1,
"apm-server.sampling.tail.events.processed": 1,
},
[]string{
"apm-server.sampling.tail.events.sampled",
},
)

select {
case batch := <-reported:
if len(batch) != 2 {
t.Fatal("expected two events in publish")
}
assert.Equal(t, batch[0].Transaction.Id, "transaction1")
assert.Equal(t, batch[1].Transaction.Id, "transaction2")
case <-timeoutCtx.Done():
t.Fatal("test timed out waiting for publish")
}
monitoringtest.ExpectContainAndNotContainOtelMetrics(t, tempdirConfig.metricReader,
map[string]any{
"apm-server.sampling.tail.events.sampled": 2,
},
[]string{
"apm-server.sampling.tail.events.processed",
"apm-server.sampling.tail.events.stored",
},
)
}

type testConfig struct {
sampling.Config
tempDir string
Expand Down
Loading