Skip to content
This repository has been archived by the owner on Nov 7, 2022. It is now read-only.

Commit

Permalink
OC exporter: new configuration settings and use first available worker (
Browse files Browse the repository at this point in the history
#555)

* Add new OC exporter configuration settings

Adding the new configuration settings now exposed by the oc-agent exporter:

- keepalive
- reconnection delay

Plus an option, secure, to use system cert pool (useful for https with well-known CAs).

Improve error messages

Using first available oc exporter instead of round robin

Adding test covering stop

Fix stop issue

* Add comments to keepalive settings example

* Add errAlreadyStopped if data reaches exporter after stop
  • Loading branch information
Paulo Janotti authored May 24, 2019
1 parent 686dcf3 commit 2157469
Show file tree
Hide file tree
Showing 5 changed files with 278 additions and 38 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ exporters:
compression: "gzip"
cert-pem-file: "server_ca_public.pem" # optional to enable TLS
endpoint: "127.0.0.1:55678"
reconnection-delay: 2s

jaeger:
collector_endpoint: "http://127.0.0.1:14268/api/traces"
Expand Down
1 change: 1 addition & 0 deletions demos/trace/oc-agent-config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
receivers:
opencensus:
address: ":55678"
reconnection-delay: 2s
jaeger:
collector_http_port: 14268

Expand Down
11 changes: 11 additions & 0 deletions exporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,18 @@ queued-exporters:
exporters:
opencensus:
endpoint: "127.0.0.1:55566"
cert-pem-file: "server_ca_public.pem" # PEM file used to enable TLS. For trusted CAs from
# system pool use "secure:" setting, see below.
compression: "gzip"
reconnection-dealy: 2s # Delay (+70% jitter) before reconnection attempt in case of error.
secure: true # Used to export to destinations trusted by certificates from the system pool.
# "cert-pem-file:" takes precedence over this setting.
keepalive:
# Keepalive settings for gRPC clients, see https://godoc.org/google.golang.org/grpc/keepalive#ClientParameters.
# Recommended to be set for cases that need to support bursts of data and periods of inactivity.
time: 30s # After inactive for this amount to time client will send a ping to the server.
timeout: 5s # Amount of time that client waits for a keepalive ping response before closing the connection.
permit-without-stream: true # Permits the keepalive ping even if no RPCs are in use, if false no ping is sent.
my-org-jaeger: # A second processor with its own configuration options
num-workers: 2
queue-size: 100
Expand Down
156 changes: 124 additions & 32 deletions exporter/opencensusexporter/opencensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,48 +16,75 @@ package opencensusexporter

import (
"context"
"errors"
"crypto/x509"
"fmt"
"sync/atomic"
"sync"
"time"

"contrib.go.opencensus.io/exporter/ocagent"
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
"github.com/spf13/viper"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"

"github.com/census-instrumentation/opencensus-service/consumer"
"github.com/census-instrumentation/opencensus-service/data"
"github.com/census-instrumentation/opencensus-service/exporter/exporterhelper"
"github.com/census-instrumentation/opencensus-service/internal"
"github.com/census-instrumentation/opencensus-service/internal/compression"
"github.com/census-instrumentation/opencensus-service/internal/compression/grpc"
compressiongrpc "github.com/census-instrumentation/opencensus-service/internal/compression/grpc"
)

type opencensusConfig struct {
Endpoint string `mapstructure:"endpoint,omitempty"`
Compression string `mapstructure:"compression,omitempty"`
Headers map[string]string `mapstructure:"headers,omitempty"`
NumWorkers int `mapstructure:"num-workers,omitempty"`
CertPemFile string `mapstructure:"cert-pem-file,omitempty"`
// keepaliveConfig exposes the keepalive.ClientParameters to be used by the exporter.
// Refer to the original data-structure for the meaning of each parameter.
type keepaliveConfig struct {
Time time.Duration `mapstructure:"time,omitempty"`
Timeout time.Duration `mapstructure:"timeout,omitempty"`
PermitWithoutStream bool `mapstructure:"permit-without-stream,omitempty"`
}

// TODO: add insecure, service name options.
type opencensusConfig struct {
Endpoint string `mapstructure:"endpoint,omitempty"`
Compression string `mapstructure:"compression,omitempty"`
Headers map[string]string `mapstructure:"headers,omitempty"`
NumWorkers int `mapstructure:"num-workers,omitempty"`
CertPemFile string `mapstructure:"cert-pem-file,omitempty"`
UseSecure bool `mapstructure:"secure,omitempty"`
ReconnectionDelay time.Duration `mapstructure:"reconnection-delay,omitempty"`
KeepaliveParameters *keepaliveConfig `mapstructure:"keepalive,omitempty"`
// TODO: service name options.
}

type ocagentExporter struct {
counter uint32
exporters []*ocagent.Exporter
exporters chan *ocagent.Exporter
}

type ocTraceExporterErrorCode int
type ocTraceExporterError struct {
code ocTraceExporterErrorCode
msg string
}

var _ error = (*ocTraceExporterError)(nil)

func (e *ocTraceExporterError) Error() string {
return e.msg
}

const (
defaultNumWorkers int = 2
)

var (
// ErrEndpointRequired indicates that this exporter was not provided with an endpoint in its config.
ErrEndpointRequired = errors.New("OpenCensus exporter config requires an Endpoint")
// ErrUnsupportedCompressionType indicates that this exporter was provided with a compression protocol it does not support.
ErrUnsupportedCompressionType = errors.New("OpenCensus exporter unsupported compression type")
// ErrUnableToGetTLSCreds indicates that this exporter could not read the provided TLS credentials.
ErrUnableToGetTLSCreds = errors.New("OpenCensus exporter unable to read TLS credentials")
_ ocTraceExporterErrorCode = iota // skip 0
// errEndpointRequired indicates that this exporter was not provided with an endpoint in its config.
errEndpointRequired
// errUnsupportedCompressionType indicates that this exporter was provided with a compression protocol it does not support.
errUnsupportedCompressionType
// errUnableToGetTLSCreds indicates that this exporter could not read the provided TLS credentials.
errUnableToGetTLSCreds
// errAlreadyStopped indicates that the exporter was already stopped.
errAlreadyStopped
)

// OpenCensusTraceExportersFromViper unmarshals the viper and returns an consumer.TraceConsumer targeting
Expand All @@ -75,49 +102,75 @@ func OpenCensusTraceExportersFromViper(v *viper.Viper) (tps []consumer.TraceCons
}

if ocac.Endpoint == "" {
return nil, nil, nil, ErrEndpointRequired
return nil, nil, nil, &ocTraceExporterError{
code: errEndpointRequired,
msg: "OpenCensus exporter config requires an Endpoint",
}
}

opts := []ocagent.ExporterOption{ocagent.WithAddress(ocac.Endpoint)}
if ocac.Compression != "" {
if compressionKey := grpc.GetGRPCCompressionKey(ocac.Compression); compressionKey != compression.Unsupported {
if compressionKey := compressiongrpc.GetGRPCCompressionKey(ocac.Compression); compressionKey != compression.Unsupported {
opts = append(opts, ocagent.UseCompressor(compressionKey))
} else {
return nil, nil, nil, ErrUnsupportedCompressionType
return nil, nil, nil, &ocTraceExporterError{
code: errUnsupportedCompressionType,
msg: fmt.Sprintf("OpenCensus exporter unsupported compression type %q", ocac.Compression),
}
}
}
if ocac.CertPemFile != "" {
creds, err := credentials.NewClientTLSFromFile(ocac.CertPemFile, "")
if err != nil {
return nil, nil, nil, ErrUnableToGetTLSCreds
return nil, nil, nil, &ocTraceExporterError{
code: errUnableToGetTLSCreds,
msg: fmt.Sprintf("OpenCensus exporter unable to read TLS credentials from pem file %q: %v", ocac.CertPemFile, err),
}
}
opts = append(opts, ocagent.WithTLSCredentials(creds))
} else if ocac.UseSecure {
certPool, err := x509.SystemCertPool()
if err != nil {
return nil, nil, nil, &ocTraceExporterError{
code: errUnableToGetTLSCreds,
msg: fmt.Sprintf(
"OpenCensus exporter unable to read certificates from system pool: %v", err),
}
}
creds := credentials.NewClientTLSFromCert(certPool, "")
opts = append(opts, ocagent.WithTLSCredentials(creds))
} else {
opts = append(opts, ocagent.WithInsecure())
}
if len(ocac.Headers) > 0 {
opts = append(opts, ocagent.WithHeaders(ocac.Headers))
}
if ocac.ReconnectionDelay > 0 {
opts = append(opts, ocagent.WithReconnectionPeriod(ocac.ReconnectionDelay))
}
if ocac.KeepaliveParameters != nil {
opts = append(opts, ocagent.WithGRPCDialOption(grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: ocac.KeepaliveParameters.Time,
Timeout: ocac.KeepaliveParameters.Timeout,
PermitWithoutStream: ocac.KeepaliveParameters.PermitWithoutStream,
})))
}

numWorkers := defaultNumWorkers
if ocac.NumWorkers > 0 {
numWorkers = ocac.NumWorkers
}

exporters := make([]*ocagent.Exporter, 0, numWorkers)
exportersChan := make(chan *ocagent.Exporter, numWorkers)
for exporterIndex := 0; exporterIndex < numWorkers; exporterIndex++ {
exporter, serr := ocagent.NewExporter(opts...)
if serr != nil {
return nil, nil, nil, fmt.Errorf("cannot configure OpenCensus Trace exporter: %v", serr)
}
exporters = append(exporters, exporter)
doneFns = append(doneFns, func() error {
exporter.Flush()
return nil
})
exportersChan <- exporter
}

oce := &ocagentExporter{exporters: exporters}
oce := &ocagentExporter{exporters: exportersChan}
oexp, err := exporterhelper.NewTraceExporter(
"oc_trace",
oce.PushTraceData,
Expand All @@ -129,22 +182,61 @@ func OpenCensusTraceExportersFromViper(v *viper.Viper) (tps []consumer.TraceCons
}

tps = append(tps, oexp)
doneFns = append(doneFns, oce.stop)

// TODO: (@odeke-em, @songya23) implement ExportMetrics for OpenCensus.
// mps = append(mps, oexp)
return
}

func (oce *ocagentExporter) stop() error {
wg := &sync.WaitGroup{}
var errors []error
var errorsMu sync.Mutex
visitedCnt := 0
for currExporter := range oce.exporters {
wg.Add(1)
go func(exporter *ocagent.Exporter) {
defer wg.Done()
err := exporter.Stop()
if err != nil {
errorsMu.Lock()
errors = append(errors, err)
errorsMu.Unlock()
}
}(currExporter)
visitedCnt++
if visitedCnt == cap(oce.exporters) {
// Visited and started Stop on all exporters, just wait for the stop to finish.
break
}
}

wg.Wait()
close(oce.exporters)

return internal.CombineErrors(errors)
}

func (oce *ocagentExporter) PushTraceData(ctx context.Context, td data.TraceData) (int, error) {
// Get an exporter worker round-robin
exporter := oce.exporters[atomic.AddUint32(&oce.counter, 1)%uint32(len(oce.exporters))]
// Get first available exporter.
exporter, ok := <-oce.exporters
if !ok {
err := &ocTraceExporterError{
code: errAlreadyStopped,
msg: fmt.Sprintf("OpenCensus exporter was already stopped."),
}
return len(td.Spans), err
}

err := exporter.ExportTraceServiceRequest(
&agenttracepb.ExportTraceServiceRequest{
Spans: td.Spans,
Resource: td.Resource,
Node: td.Node,
},
)
oce.exporters <- exporter
if err != nil {
return len(td.Spans), err
}
Expand Down
Loading

0 comments on commit 2157469

Please sign in to comment.