Skip to content
17 changes: 15 additions & 2 deletions internal/beater/monitoringtest/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,18 @@ func ExpectContainOtelMetrics(
assertOtelMetrics(t, reader, expectedMetrics, false, false)
}

func ExpectContainAndNotContainOtelMetrics(
t *testing.T,
reader sdkmetric.Reader,
expectedMetrics map[string]any,
notExpectedMetricKeys []string,
) {
foundMetricKeys := assertOtelMetrics(t, reader, expectedMetrics, false, false)
for _, key := range notExpectedMetricKeys {
assert.NotContains(t, foundMetricKeys, key)
}
}

func ExpectContainOtelMetricsKeys(t assert.TestingT, reader sdkmetric.Reader, expectedMetricsKeys []string) {
expectedMetrics := make(map[string]any)
for _, metricKey := range expectedMetricsKeys {
Expand All @@ -62,7 +74,7 @@ func assertOtelMetrics(
reader sdkmetric.Reader,
expectedMetrics map[string]any,
fullMatch, skipValAssert bool,
) {
) []string {
var rm metricdata.ResourceMetrics
assert.NoError(t, reader.Collect(context.Background(), &rm))

Expand Down Expand Up @@ -118,8 +130,9 @@ func assertOtelMetrics(
expectedMetricsKeys = append(expectedMetricsKeys, k)
}
if fullMatch {
assert.ElementsMatch(t, expectedMetricsKeys, foundMetrics)
assert.ElementsMatch(t, foundMetrics, expectedMetricsKeys)
} else {
assert.Subset(t, foundMetrics, expectedMetricsKeys)
}
return foundMetrics
}
160 changes: 121 additions & 39 deletions x-pack/apm-server/sampling/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ import (
"golang.org/x/sync/errgroup"

"github.com/elastic/apm-data/model/modelpb"
"github.com/elastic/elastic-agent-libs/logp"

"github.com/elastic/apm-server/internal/logs"
"github.com/elastic/apm-server/x-pack/apm-server/sampling/eventstorage"
"github.com/elastic/apm-server/x-pack/apm-server/sampling/pubsub"
"github.com/elastic/elastic-agent-libs/logp"
)

const (
Expand All @@ -43,6 +44,8 @@ type Processor struct {
eventStore eventstorage.RW
eventMetrics eventMetrics

ongoingTraces sync.Map

stopMu sync.Mutex
stopping chan struct{}
stopped chan struct{}
Expand Down Expand Up @@ -188,6 +191,9 @@ func (p *Processor) processTransaction(event *modelpb.APMEvent) (report, stored
}

if event.GetParentId() != "" {
// Mark the trace as ongoing since there's an incoming child transaction.
p.addOngoingTrace(event.Trace.Id)
defer p.removeOngoingTrace(event.Trace.Id)
// Non-root transaction: write to local storage while we wait
// for a sampling decision.
return false, true, p.eventStore.WriteTraceEvent(
Expand Down Expand Up @@ -233,6 +239,9 @@ func (p *Processor) processSpan(event *modelpb.APMEvent) (report, stored bool, _
traceSampled, err := p.eventStore.IsTraceSampled(event.Trace.Id)
if err != nil {
if err == eventstorage.ErrNotFound {
// Mark the trace as ongoing since there's an incoming span.
p.addOngoingTrace(event.Trace.Id)
defer p.removeOngoingTrace(event.Trace.Id)
// Tail-sampling decision has not yet been made, write event to local storage.
return false, true, p.eventStore.WriteTraceEvent(event.Trace.Id, event.Span.Id, event)
}
Expand All @@ -245,6 +254,39 @@ func (p *Processor) processSpan(event *modelpb.APMEvent) (report, stored bool, _
return traceSampled, false, nil
}

func (p *Processor) addOngoingTrace(traceID string) {
p.ongoingTraces.Store(traceID, struct{}{})
}

func (p *Processor) removeOngoingTrace(traceID string) {
p.ongoingTraces.Delete(traceID)
}

func (p *Processor) hasOngoingTrace(traceID string) bool {
_, ok := p.ongoingTraces.Load(traceID)
return ok
}

// waitForOngoingTrace waits until one of 3 conditions is satisfied before returning:
// 1. The trace is no longer ongoing, which is polled at each tick.
// 2. The context is canceled.
// 3. The number of loops finished.
func (p *Processor) waitForOngoingTrace(ctx context.Context, traceID string, loops int, tick time.Duration) error {
ticker := time.NewTicker(tick)
for range loops {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
_, ok := p.ongoingTraces.Load(traceID)
if !ok {
return nil
}
}
}
return nil
}

// Stop stops the processor.
// Note that the underlying StorageManager must be closed independently
// to ensure writes are synced to disk.
Expand Down Expand Up @@ -318,6 +360,7 @@ func (p *Processor) Run() error {
remoteSampledTraceIDs := make(chan string)
localSampledTraceIDs := make(chan string)
publishSampledTraceIDs := make(chan string)
ongoingSampledTraceIDs := make(chan string)
gracefulContext, cancelGracefulContext := context.WithCancel(context.Background())
defer cancelGracefulContext()
var g errgroup.Group
Expand Down Expand Up @@ -410,7 +453,6 @@ func (p *Processor) Run() error {
}
})
g.Go(func() error {
var events modelpb.Batch
// TODO(axw) pace the publishing over the flush interval?
// Alternatively we can rely on backpressure from the reporter,
// removing the artificial one second timeout from publisher code
Expand Down Expand Up @@ -449,47 +491,41 @@ func (p *Processor) Run() error {
)
}

events = events[:0]
if err := p.eventStore.ReadTraceEvents(traceID, &events); err != nil {
p.rateLimitedLogger.Warnf(
"received error reading trace events: %s", err,
)
// Trace has child transactions or spans in processing, send the trace to
// ongoingSampledTraceIDs for another goroutine to wait and report to avoid blocking
// current goroutine.
//
// This only happens to local sampled trace IDs due to potential race condition
// between receiving the transaction / span and making the sampling decision.
if !remoteDecision && p.hasOngoingTrace(traceID) {
if err := sendTraceIDs(gracefulContext, ongoingSampledTraceIDs, []string{traceID}); err != nil {
return err
}
continue
}
if n := len(events); n > 0 {
p.logger.Debugf("reporting %d events", n)
if remoteDecision {
// Remote decisions may be received multiple times,
// e.g. if this server restarts and resubscribes to
// remote sampling decisions before they have been
// deleted. We delete events from local storage so
// we don't publish duplicates; delivery is therefore
// at-most-once, not guaranteed.
//
// TODO(carsonip): pebble supports range deletes and may be better than
// deleting events separately, but as we do not use transactions, it is
// possible to race and delete something that is not read.
for _, event := range events {
switch event.Type() {
case modelpb.TransactionEventType:
if err := p.eventStore.DeleteTraceEvent(event.Trace.Id, event.Transaction.Id); err != nil {
p.logger.With(logp.Error(err)).Warn("failed to delete transaction from local storage")
}
case modelpb.SpanEventType:
if err := p.eventStore.DeleteTraceEvent(event.Trace.Id, event.Span.Id); err != nil {
p.logger.With(logp.Error(err)).Warn("failed to delete span from local storage")
}
}
}
}
p.eventMetrics.sampled.Add(gracefulContext, int64(len(events)))
if err := p.config.BatchProcessor.ProcessBatch(gracefulContext, &events); err != nil {
p.logger.With(logp.Error(err)).Warn("failed to report events")
}

for i := range events {
events[i] = nil // not required but ensure that there is no ref to the freed event
// Trace either comes remotely or has no child transactions or spans in processing,
// so simply report the events.
p.readAndReportEvents(gracefulContext, traceID, remoteDecision)
}
})
g.Go(func() error {
ongoingSampledTraceIDs := ongoingSampledTraceIDs
for {
select {
case <-gracefulContext.Done():
return gracefulContext.Err()
case traceID, ok := <-ongoingSampledTraceIDs:
if !ok {
return nil
}
// TODO(eric): Update this to be configurable.
if err := p.waitForOngoingTrace(gracefulContext, traceID, 10, 500*time.Millisecond); err != nil {
return err
}
// This only happens to local sampled trace IDs due to potential race condition
// between receiving the transaction / span and making the sampling decision.
p.readAndReportEvents(gracefulContext, traceID, false)
}
}
})
Expand All @@ -499,6 +535,52 @@ func (p *Processor) Run() error {
return nil
}

func (p *Processor) readAndReportEvents(ctx context.Context, traceID string, remoteDecision bool) {
var events modelpb.Batch
if err := p.eventStore.ReadTraceEvents(traceID, &events); err != nil {
p.rateLimitedLogger.Warnf(
"received error reading trace events: %s", err,
)
return
}

if n := len(events); n > 0 {
p.logger.Debugf("reporting %d events", n)
if remoteDecision {
// Remote decisions may be received multiple times,
// e.g. if this server restarts and resubscribes to
// remote sampling decisions before they have been
// deleted. We delete events from local storage so
// we don't publish duplicates; delivery is therefore
// at-most-once, not guaranteed.
//
// TODO(carsonip): pebble supports range deletes and may be better than
// deleting events separately, but as we do not use transactions, it is
// possible to race and delete something that is not read.
for _, event := range events {
switch event.Type() {
case modelpb.TransactionEventType:
if err := p.eventStore.DeleteTraceEvent(event.Trace.Id, event.Transaction.Id); err != nil {
p.logger.With(logp.Error(err)).Warn("failed to delete transaction from local storage")
}
case modelpb.SpanEventType:
if err := p.eventStore.DeleteTraceEvent(event.Trace.Id, event.Span.Id); err != nil {
p.logger.With(logp.Error(err)).Warn("failed to delete span from local storage")
}
}
}
}
p.eventMetrics.sampled.Add(ctx, int64(len(events)))
if err := p.config.BatchProcessor.ProcessBatch(ctx, &events); err != nil {
p.logger.With(logp.Error(err)).Warn("failed to report events")
}

for i := range events {
events[i] = nil // not required but ensure that there is no ref to the freed event
}
}
}

func readSubscriberPosition(logger *logp.Logger, s *eventstorage.StorageManager) pubsub.SubscriberPosition {
var pos pubsub.SubscriberPosition
data, err := s.ReadSubscriberPosition()
Expand Down
Loading
Loading