Skip to content
17 changes: 15 additions & 2 deletions internal/beater/monitoringtest/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,18 @@ func ExpectContainOtelMetrics(
assertOtelMetrics(t, reader, expectedMetrics, false, false)
}

func ExpectContainAndNotContainOtelMetrics(
t *testing.T,
reader sdkmetric.Reader,
expectedMetrics map[string]any,
notExpectedMetricKeys []string,
) {
foundMetricKeys := assertOtelMetrics(t, reader, expectedMetrics, false, false)
for _, key := range notExpectedMetricKeys {
assert.NotContains(t, foundMetricKeys, key)
}
}

func ExpectContainOtelMetricsKeys(t assert.TestingT, reader sdkmetric.Reader, expectedMetricsKeys []string) {
expectedMetrics := make(map[string]any)
for _, metricKey := range expectedMetricsKeys {
Expand All @@ -62,7 +74,7 @@ func assertOtelMetrics(
reader sdkmetric.Reader,
expectedMetrics map[string]any,
fullMatch, skipValAssert bool,
) {
) []string {
var rm metricdata.ResourceMetrics
assert.NoError(t, reader.Collect(context.Background(), &rm))

Expand Down Expand Up @@ -118,8 +130,9 @@ func assertOtelMetrics(
expectedMetricsKeys = append(expectedMetricsKeys, k)
}
if fullMatch {
assert.ElementsMatch(t, expectedMetricsKeys, foundMetrics)
assert.ElementsMatch(t, foundMetrics, expectedMetricsKeys)
} else {
assert.Subset(t, foundMetrics, expectedMetricsKeys)
}
return foundMetrics
}
3 changes: 2 additions & 1 deletion x-pack/apm-server/sampling/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ import (
"golang.org/x/sync/errgroup"

"github.com/elastic/apm-data/model/modelpb"
"github.com/elastic/elastic-agent-libs/logp"

"github.com/elastic/apm-server/internal/logs"
"github.com/elastic/apm-server/x-pack/apm-server/sampling/eventstorage"
"github.com/elastic/apm-server/x-pack/apm-server/sampling/pubsub"
"github.com/elastic/elastic-agent-libs/logp"
)

const (
Expand Down
177 changes: 175 additions & 2 deletions x-pack/apm-server/sampling/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"math/rand"
"os"
"path/filepath"
"sync"
"testing"
"time"

Expand All @@ -23,12 +24,13 @@ import (
"google.golang.org/protobuf/testing/protocmp"

"github.com/elastic/apm-data/model/modelpb"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/logp/logptest"

"github.com/elastic/apm-server/internal/beater/monitoringtest"
"github.com/elastic/apm-server/x-pack/apm-server/sampling"
"github.com/elastic/apm-server/x-pack/apm-server/sampling/eventstorage"
"github.com/elastic/apm-server/x-pack/apm-server/sampling/pubsub/pubsubtest"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/logp/logptest"
)

func TestProcessUnsampled(t *testing.T) {
Expand Down Expand Up @@ -830,6 +832,177 @@ func TestGracefulShutdown(t *testing.T) {
assert.Equal(t, int(sampleRate*float64(totalTraces)), count)
}

type blockingRW struct {
ctx context.Context
unblockWriteEvent chan struct{}
unblockWriteSampled chan struct{}
next eventstorage.RW
}

func (m blockingRW) ReadTraceEvents(traceID string, out *modelpb.Batch) error {
return m.next.ReadTraceEvents(traceID, out)
}

func (m blockingRW) WriteTraceEvent(traceID, id string, event *modelpb.APMEvent) error {
select {
case <-m.ctx.Done():
return m.ctx.Err()
case <-m.unblockWriteEvent:
return m.next.WriteTraceEvent(traceID, id, event)
}
}

func (m blockingRW) WriteTraceSampled(traceID string, sampled bool) error {
select {
case <-m.ctx.Done():
return m.ctx.Err()
case <-m.unblockWriteSampled:
return m.next.WriteTraceSampled(traceID, sampled)
}
}

func (m blockingRW) IsTraceSampled(traceID string) (bool, error) {
return m.next.IsTraceSampled(traceID)
}

func (m blockingRW) DeleteTraceEvent(traceID, id string) error {
return m.next.DeleteTraceEvent(traceID, id)
}

func TestPotentialRaceCondition(t *testing.T) {
unblockWriteEvent := make(chan struct{}, 1)
unblockWriteSampled := make(chan struct{}, 1)

flushInterval := time.Second
tempdirConfig := newTempdirConfig(t)
timeoutCtx, cancel := context.WithTimeout(context.Background(), 10*flushInterval)
defer cancel()

tempdirConfig.Config.FlushInterval = flushInterval
// Wrap existing storage with blockingRW so we can control the flow.
tempdirConfig.Config.Storage = &blockingRW{
ctx: timeoutCtx,
unblockWriteEvent: unblockWriteEvent,
unblockWriteSampled: unblockWriteSampled,
next: tempdirConfig.Config.Storage,
}
// Collect reported transactions.
reported := make(chan modelpb.Batch)
tempdirConfig.Config.BatchProcessor = modelpb.ProcessBatchFunc(func(ctx context.Context, batch *modelpb.Batch) error {
select {
case <-ctx.Done():
return ctx.Err()
case reported <- batch.Clone():
return nil
}
})

processor, err := sampling.NewProcessor(tempdirConfig.Config, logptest.NewTestingLogger(t, ""))
require.NoError(t, err)
go processor.Run()
defer processor.Stop(context.Background())

// Send root transaction, which will be written to storage since write event is unblocked.
unblockWriteEvent <- struct{}{}
batch1 := modelpb.Batch{{
Trace: &modelpb.Trace{Id: "trace1"},
Transaction: &modelpb.Transaction{
Type: "type",
Id: "transaction1",
Sampled: true,
},
}}
err = processor.ProcessBatch(context.Background(), &batch1)
require.NoError(t, err)
assert.Len(t, batch1, 0)
monitoringtest.ExpectContainAndNotContainOtelMetrics(t, tempdirConfig.metricReader,
map[string]any{
"apm-server.sampling.tail.events.processed": 1,
"apm-server.sampling.tail.events.stored": 1,
},
[]string{
"apm-server.sampling.tail.events.sampled",
},
)

// Send child transaction, which will be blocked from writing to storage for now.
var wg sync.WaitGroup
wg.Go(func() {
batch2 := modelpb.Batch{{
Trace: &modelpb.Trace{Id: "trace1"},
Transaction: &modelpb.Transaction{
Type: "type",
Id: "transaction2",
Sampled: true,
},
ParentId: "transaction1",
}}
err = processor.ProcessBatch(context.Background(), &batch2)
require.NoError(t, err)
assert.Len(t, batch2, 0)
})

// Unblock write sampled so that the first transaction is published.
unblockWriteSampled <- struct{}{}
// Wait for first transaction (sampled) to be published.
select {
case batch := <-reported:
if len(batch) != 1 {
t.Fatal("expected only one event in first publish")
}
assert.Equal(t, batch[0].Transaction.Id, "transaction1")
case <-timeoutCtx.Done():
t.Fatal("test timed out")
}
monitoringtest.ExpectContainAndNotContainOtelMetrics(t, tempdirConfig.metricReader,
map[string]any{
// This comes from second transaction, since we record it before finish processing somehow.
"apm-server.sampling.tail.events.processed": 1,
// This comes from first transaction being sampled.
"apm-server.sampling.tail.events.sampled": 1,
},
[]string{
"apm-server.sampling.tail.events.stored",
},
)

// Unblock write event so that the second transaction is written to storage.
unblockWriteEvent <- struct{}{}
wg.Wait()
monitoringtest.ExpectContainAndNotContainOtelMetrics(t, tempdirConfig.metricReader,
map[string]any{
// This comes from second transaction being stored.
"apm-server.sampling.tail.events.stored": 1,
},
[]string{
"apm-server.sampling.tail.events.sampled",
"apm-server.sampling.tail.events.processed",
},
)

// Unblock write sampled to check if second transaction gets sampled.
// It should not be sampled since the trace is already be sampled previously.
unblockWriteSampled <- struct{}{}
select {
case <-reported:
t.Fatal("no transaction should be reported here")
case <-time.After(2 * flushInterval):
}

// Stop processor so we can examine DB.
assert.NoError(t, processor.Stop(context.Background()))
assert.NoError(t, tempdirConfig.Config.DB.Flush())
close(reported)
db := tempdirConfig.Config.DB
reader := newUnlimitedReadWriter(db)

var batch modelpb.Batch
assert.NoError(t, reader.ReadTraceEvents("trace1", &batch))
assert.Len(t, batch, 2)
assert.Equal(t, batch[0].Transaction.Id, "transaction1")
assert.Equal(t, batch[1].Transaction.Id, "transaction2")
}

type testConfig struct {
sampling.Config
tempDir string
Expand Down
Loading