diff --git a/input/elasticapm/processor.go b/input/elasticapm/processor.go index 59d903b5..84613dff 100644 --- a/input/elasticapm/processor.go +++ b/input/elasticapm/processor.go @@ -179,7 +179,7 @@ func (p *Processor) readBatch( reader *streamReader, result *Result, ) (int, error) { - _, sp := p.tracer.Start(ctx, "readBatch") + _, sp := p.tracer.Start(ctx, "readBatch", trace.WithSpanKind(trace.SpanKindInternal)) defer sp.End() // input events are decoded and appended to the batch @@ -250,7 +250,7 @@ func (p *Processor) HandleStream( processor modelpb.BatchProcessor, result *Result, ) error { - ctx, sp := p.tracer.Start(ctx, "Stream") + ctx, sp := p.tracer.Start(ctx, "Stream", trace.WithSpanKind(trace.SpanKindInternal)) defer sp.End() // Limit the number of concurrent batch decodes. // @@ -336,7 +336,7 @@ func (p *Processor) getStreamReader(r io.Reader) *streamReader { } func (p *Processor) semAcquire(ctx context.Context) error { - ctx, sp := p.tracer.Start(ctx, "Semaphore.Acquire") + ctx, sp := p.tracer.Start(ctx, "Semaphore.Acquire", trace.WithSpanKind(trace.SpanKindInternal)) defer sp.End() return p.sem.Acquire(ctx, 1) diff --git a/input/otlp/trace_semaphore.go b/input/otlp/trace_semaphore.go index 6c8458ea..15427bc6 100644 --- a/input/otlp/trace_semaphore.go +++ b/input/otlp/trace_semaphore.go @@ -26,7 +26,7 @@ import ( ) func semAcquire(ctx context.Context, tracer trace.Tracer, sem input.Semaphore, i int64) error { - ctx, sp := tracer.Start(ctx, "Semaphore.Acquire") + ctx, sp := tracer.Start(ctx, "Semaphore.Acquire", trace.WithSpanKind(trace.SpanKindInternal)) defer sp.End() return sem.Acquire(ctx, i) diff --git a/model/modelprocessor/tracer.go b/model/modelprocessor/tracer.go index d90274aa..a99e03bd 100644 --- a/model/modelprocessor/tracer.go +++ b/model/modelprocessor/tracer.go @@ -50,7 +50,7 @@ func NewTracer(n string, p modelpb.BatchProcessor, opts ...ConfigOption) *Tracer // ProcessBatch runs for each batch run, and emits telemetry func (c *Tracer) ProcessBatch(ctx context.Context, b *modelpb.Batch) error { - ctx, span := c.tracer.Start(ctx, c.spanName) + ctx, span := c.tracer.Start(ctx, c.spanName, trace.WithSpanKind(trace.SpanKindInternal)) defer span.End() err := c.processor.ProcessBatch(ctx, b)