Skip to content
This repository has been archived by the owner on Nov 7, 2022. It is now read-only.

Commit

Permalink
Add source format to TraceData and change ProcessSpan to accept conte…
Browse files Browse the repository at this point in the history
…xt. (#489)
  • Loading branch information
Bogdan Drutu authored Mar 13, 2019
1 parent c88cf9b commit 3e7d344
Show file tree
Hide file tree
Showing 16 changed files with 115 additions and 79 deletions.
6 changes: 5 additions & 1 deletion cmd/occollector/app/sender/jaeger_thrift_http_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package sender

import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
Expand All @@ -26,6 +27,7 @@ import (
"go.uber.org/zap"

"github.com/census-instrumentation/opencensus-service/data"
"github.com/census-instrumentation/opencensus-service/internal/collector/processor"
jaegertranslator "github.com/census-instrumentation/opencensus-service/translator/trace/jaeger"
)

Expand All @@ -41,6 +43,8 @@ type JaegerThriftHTTPSender struct {
logger *zap.Logger
}

var _ processor.SpanProcessor = (*JaegerThriftHTTPSender)(nil)

// HTTPOption sets a parameter for the HttpCollector
type HTTPOption func(s *JaegerThriftHTTPSender)

Expand Down Expand Up @@ -80,7 +84,7 @@ func NewJaegerThriftHTTPSender(
}

// ProcessSpans sends the received data to the configured Jaeger Thrift end-point.
func (s *JaegerThriftHTTPSender) ProcessSpans(td data.TraceData, spanFormat string) error {
func (s *JaegerThriftHTTPSender) ProcessSpans(ctx context.Context, td data.TraceData) error {
// TODO: (@pjanotti) In case of failure the translation to Jaeger Thrift is going to be remade, cache it somehow.
tBatch, err := jaegertranslator.OCProtoToJaegerThrift(td)
if err != nil {
Expand Down
6 changes: 4 additions & 2 deletions cmd/occollector/app/sender/jaeger_thrift_tchannel_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package sender

import (
"context"

"go.uber.org/zap"

reporter "github.com/jaegertracing/jaeger/cmd/agent/app/reporter"
Expand All @@ -31,7 +33,7 @@ type JaegerThriftTChannelSender struct {
reporter reporter.Reporter
}

var _ processor.SpanProcessor = (*JaegerThriftHTTPSender)(nil)
var _ processor.SpanProcessor = (*JaegerThriftTChannelSender)(nil)

// NewJaegerThriftTChannelSender creates new TChannel-based sender.
func NewJaegerThriftTChannelSender(
Expand All @@ -45,7 +47,7 @@ func NewJaegerThriftTChannelSender(
}

// ProcessSpans sends the received data to the configured Jaeger Thrift end-point.
func (s *JaegerThriftTChannelSender) ProcessSpans(td data.TraceData, spanFormat string) error {
func (s *JaegerThriftTChannelSender) ProcessSpans(ctx context.Context, td data.TraceData) error {
// TODO: (@pjanotti) In case of failure the translation to Jaeger Thrift is going to be remade, cache it somehow.
tBatch, err := jaegertranslator.OCProtoToJaegerThrift(td)
if err != nil {
Expand Down
7 changes: 4 additions & 3 deletions data/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ type MetricsData struct {

// TraceData is a struct that groups proto spans with a unique node and a resource.
type TraceData struct {
Node *commonpb.Node
Resource *resourcepb.Resource
Spans []*tracepb.Span
Node *commonpb.Node
Resource *resourcepb.Resource
Spans []*tracepb.Span
SourceFormat string
}
4 changes: 2 additions & 2 deletions internal/collector/processor/exporter_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ func NewTraceExporterProcessor(traceExporters ...consumer.TraceConsumer) SpanPro
return &exporterSpanProcessor{tp: multiconsumer.NewTraceProcessor(traceExporters)}
}

func (sp *exporterSpanProcessor) ProcessSpans(td data.TraceData, spanFormat string) error {
err := sp.tp.ConsumeTraceData(context.Background(), td)
func (sp *exporterSpanProcessor) ProcessSpans(ctx context.Context, td data.TraceData) error {
err := sp.tp.ConsumeTraceData(ctx, td)
if err != nil {
return err
}
Expand Down
12 changes: 7 additions & 5 deletions internal/collector/processor/multi_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package processor

import (
"context"

"github.com/census-instrumentation/opencensus-service/data"
"github.com/census-instrumentation/opencensus-service/internal"

Expand All @@ -25,7 +27,7 @@ import (

// MultiProcessorOption represents options that can be applied to a MultiSpanProcessor.
type MultiProcessorOption func(*multiSpanProcessor)
type preProcessFn func(data.TraceData, string)
type preProcessFn func(context.Context, data.TraceData)

// MultiSpanProcessor enables processing on multiple processors.
// For each incoming span batch, it calls ProcessSpans method on each span
Expand Down Expand Up @@ -59,7 +61,7 @@ func WithPreProcessFn(preProcFn preProcessFn) MultiProcessorOption {
// in each ExportTraceServiceRequest.
func WithAddAttributes(attributes map[string]interface{}, overwrite bool) MultiProcessorOption {
return WithPreProcessFn(
func(td data.TraceData, spanFormat string) {
func(ctx context.Context, td data.TraceData) {
if len(attributes) == 0 {
return
}
Expand Down Expand Up @@ -107,13 +109,13 @@ func WithAddAttributes(attributes map[string]interface{}, overwrite bool) MultiP
}

// ProcessSpans implements the SpanProcessor interface
func (msp *multiSpanProcessor) ProcessSpans(td data.TraceData, spanFormat string) error {
func (msp *multiSpanProcessor) ProcessSpans(ctx context.Context, td data.TraceData) error {
for _, preProcessFn := range msp.preProcessFns {
preProcessFn(td, spanFormat)
preProcessFn(ctx, td)
}
var errors []error
for _, sp := range msp.processors {
err := sp.ProcessSpans(td, spanFormat)
err := sp.ProcessSpans(ctx, td)
if err != nil {
errors = append(errors, err)
}
Expand Down
13 changes: 7 additions & 6 deletions internal/collector/processor/multi_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package processor

import (
"context"
"fmt"
"sync/atomic"
"testing"
Expand All @@ -37,7 +38,7 @@ func TestMultiSpanProcessorMultiplexing(t *testing.T) {
var wantSpansCount = 0
for i := 0; i < 2; i++ {
wantSpansCount += len(td.Spans)
tt.ProcessSpans(td, "test")
tt.ProcessSpans(context.Background(), td)
}

for _, p := range processors {
Expand Down Expand Up @@ -66,7 +67,7 @@ func TestMultiSpanProcessorWhenOneErrors(t *testing.T) {

var wantSpansCount = 0
for i := 0; i < 2; i++ {
err := tt.ProcessSpans(td, "test")
err := tt.ProcessSpans(context.Background(), td)
if err == nil {
t.Errorf("Wanted error got nil")
return
Expand All @@ -91,7 +92,7 @@ func TestMultiSpanProcessorWithPreProcessFn(t *testing.T) {
}

calledFnCount := int32(0)
testPreProcessFn := func(data.TraceData, string) {
testPreProcessFn := func(context.Context, data.TraceData) {
atomic.AddInt32(&calledFnCount, 1)
}

Expand All @@ -104,7 +105,7 @@ func TestMultiSpanProcessorWithPreProcessFn(t *testing.T) {
batchCount := 2
for i := 0; i < batchCount; i++ {
wantSpansCount += len(batch.Spans)
tt.ProcessSpans(batch, "test")
tt.ProcessSpans(context.Background(), batch)
}

for _, p := range processors {
Expand Down Expand Up @@ -160,7 +161,7 @@ func multiSpanProcessorWithAddAttributesTestHelper(t *testing.T, overwrite bool)

spans := make([]*tracepb.Span, 0, len(td.Spans)*2)
for i := 0; i < 2; i++ {
tt.ProcessSpans(td, "test")
tt.ProcessSpans(context.Background(), td)
spans = append(spans, td.Spans...)
}

Expand Down Expand Up @@ -197,7 +198,7 @@ type mockSpanProcessor struct {

var _ SpanProcessor = &mockSpanProcessor{}

func (p *mockSpanProcessor) ProcessSpans(td data.TraceData, spanFormat string) error {
func (p *mockSpanProcessor) ProcessSpans(ctx context.Context, td data.TraceData) error {
batchSize := len(td.Spans)
p.TotalSpans += batchSize
if p.MustFail {
Expand Down
19 changes: 11 additions & 8 deletions internal/collector/processor/nodebatcher/node_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"sync"
"time"

"github.com/census-instrumentation/opencensus-service/observability"

commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
Expand Down Expand Up @@ -102,9 +104,9 @@ func NewBatcher(name string, logger *zap.Logger, sender processor.SpanProcessor,

// ProcessSpans implements batcher as a SpanProcessor and takes the provided spans and adds them to
// batches
func (b *batcher) ProcessSpans(td data.TraceData, spanFormat string) error {
bucketID := b.genBucketID(td.Node, td.Resource, spanFormat)
bucket := b.getOrAddBucket(bucketID, td.Node, td.Resource, spanFormat)
func (b *batcher) ProcessSpans(ctx context.Context, td data.TraceData) error {
bucketID := b.genBucketID(td.Node, td.Resource, td.SourceFormat)
bucket := b.getOrAddBucket(bucketID, td.Node, td.Resource, td.SourceFormat)
bucket.add(td.Spans)
return nil
}
Expand Down Expand Up @@ -219,18 +221,19 @@ func (nb *nodeBatch) sendItems(
tdItems = append(tdItems, items...)
}
td := data.TraceData{
Node: nb.node,
Resource: nb.resource,
Spans: tdItems,
Node: nb.node,
Resource: nb.resource,
Spans: tdItems,
SourceFormat: nb.format,
}

statsTags := processor.StatsTagsForBatch(
nb.parent.name, processor.ServiceNameForNode(nb.node), nb.format,
)
_ = stats.RecordWithTags(context.Background(), statsTags, measure.M(1))

// TODO: This process should be done in an async way, perhaps with a channel + goroutine worker(s)
_ = nb.parent.sender.ProcessSpans(td, nb.format)
ctx := observability.ContextWithReceiverName(context.Background(), nb.format)
_ = nb.parent.sender.ProcessSpans(ctx, td)
}

func (nb *nodeBatch) getAndReset() ([][]*tracepb.Span, uint32) {
Expand Down
36 changes: 21 additions & 15 deletions internal/collector/processor/nodebatcher/node_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package nodebatcher

import (
"context"
"fmt"
"testing"
"time"
Expand Down Expand Up @@ -149,9 +150,10 @@ func TestConcurrentNodeAdds(t *testing.T) {
Node: &commonpb.Node{
ServiceInfo: &commonpb.ServiceInfo{Name: fmt.Sprintf("svc-%d", requestNum)},
},
Spans: spans,
Spans: spans,
SourceFormat: "oc_trace",
}
batcher.ProcessSpans(td, "oc")
go batcher.ProcessSpans(context.Background(), td)
}

err := <-waitForCn
Expand Down Expand Up @@ -196,23 +198,24 @@ func TestBucketRemove(t *testing.T) {
Node: &commonpb.Node{
ServiceInfo: &commonpb.ServiceInfo{Name: "svc"},
},
Spans: spans,
Spans: spans,
SourceFormat: "oc_trace",
}
batcher.ProcessSpans(request, "oc")
batcher.ProcessSpans(context.Background(), request)

err := <-waitForCn
if err != nil {
t.Errorf("failed to wait for sender %s", err)
}

if batcher.getBucket(batcher.genBucketID(request.Node, nil, "oc")) == nil {
if batcher.getBucket(batcher.genBucketID(request.Node, nil, "oc_trace")) == nil {
t.Errorf("Bucket should exist but does not.")
}

// Doesn't seem to be a great way to test this without waiting
<-time.After(2 * time.Duration(removeAfterTicks) * tickTime)

if batcher.getBucket(batcher.genBucketID(request.Node, nil, "oc")) != nil {
if batcher.getBucket(batcher.genBucketID(request.Node, nil, "oc_trace")) != nil {
t.Errorf("Bucket should be deleted but is not.")
}
}
Expand Down Expand Up @@ -246,16 +249,17 @@ func TestBucketTickerStop(t *testing.T) {
Node: &commonpb.Node{
ServiceInfo: &commonpb.ServiceInfo{Name: "svc"},
},
Spans: spans,
Spans: spans,
SourceFormat: "oc_trace",
}
batcher.ProcessSpans(request, "oc")
batcher.ProcessSpans(context.Background(), request)

err := <-waitForCn
if err == nil {
t.Errorf("Unexpectedly received spans")
}

if batcher.getBucket(batcher.genBucketID(request.Node, nil, "oc")) == nil {
if batcher.getBucket(batcher.genBucketID(request.Node, nil, "oc_trace")) == nil {
t.Errorf("Bucket should not be deleted but is.")
}
}
Expand All @@ -275,9 +279,10 @@ func TestConcurrentBatchAdds(t *testing.T) {
Node: &commonpb.Node{
ServiceInfo: &commonpb.ServiceInfo{Name: "svc"},
},
Spans: spans,
Spans: spans,
SourceFormat: "oc_trace",
}
batcher.ProcessSpans(request, "oc")
go batcher.ProcessSpans(context.Background(), request)
}

err := <-waitForCn
Expand Down Expand Up @@ -310,14 +315,15 @@ func BenchmarkConcurrentBatchAdds(b *testing.B) {
Node: &commonpb.Node{
ServiceInfo: &commonpb.ServiceInfo{Name: "svc"},
},
Spans: spans,
Spans: spans,
SourceFormat: "oc_trace",
}
requests = append(requests, request)

b.Run("v1", func(b *testing.B) {
for i := 0; i < b.N; i++ {
for _, td := range requests {
_ = batcher.ProcessSpans(td, "oc")
_ = batcher.ProcessSpans(context.Background(), td)
}
}
})
Expand All @@ -335,7 +341,7 @@ func newNopSender() *nopSender {
return &nopSender{}
}

func (ts *nopSender) ProcessSpans(td data.TraceData, spanFormat string) error {
func (ts *nopSender) ProcessSpans(ctx context.Context, td data.TraceData) error {
return nil
}

Expand All @@ -353,7 +359,7 @@ func newTestSender() *testSender {
}
}

func (ts *testSender) ProcessSpans(td data.TraceData, spanFormat string) error {
func (ts *testSender) ProcessSpans(ctx context.Context, td data.TraceData) error {
ts.reqChan <- td
return nil
}
Expand Down
4 changes: 3 additions & 1 deletion internal/collector/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package processor

import (
"context"

commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
Expand All @@ -27,7 +29,7 @@ import (
// SpanProcessor handles batches of spans converted to OpenCensus proto format.
type SpanProcessor interface {
// ProcessSpans processes spans and return with the number of spans that failed and an error.
ProcessSpans(td data.TraceData, spanFormat string) error
ProcessSpans(ctx context.Context, td data.TraceData) error
// TODO: (@pjanotti) For shutdown improvement, the interface needs a method to attempt that.
}

Expand Down
4 changes: 3 additions & 1 deletion internal/collector/processor/processor_to_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,7 @@ func WrapWithSpanSink(format string, p SpanProcessor) consumer.TraceConsumer {
}

func (ps *protoProcessorSink) ConsumeTraceData(ctx context.Context, td data.TraceData) error {
return ps.protoProcessor.ProcessSpans(td, ps.sourceFormat)
// For the moment ensure that source format is set here before we change receivers to set this.
td.SourceFormat = ps.sourceFormat
return ps.protoProcessor.ProcessSpans(ctx, td)
}
Loading

0 comments on commit 3e7d344

Please sign in to comment.