From 215746930e6ef0272573970aaf8f503c67e7ebab Mon Sep 17 00:00:00 2001 From: Paulo Janotti Date: Fri, 24 May 2019 09:49:56 -0700 Subject: [PATCH] OC exporter: new configuration settings and use first available worker (#555) * Add new OC exporter configuration settings Adding the new configuration settings now exposed by the oc-agent exporter: - keepalive - reconnection delay Plus an option, secure, to use system cert pool (useful for https with well-known CAs). Improve error messages Using first available oc exporter instead of round robin Adding test covering stop Fix stop issue * Add comments to keepalive settings example * Add errAlreadyStopped if data reaches exporter after stop --- README.md | 1 + demos/trace/oc-agent-config.yaml | 1 + exporter/README.md | 11 ++ exporter/opencensusexporter/opencensus.go | 156 ++++++++++++++---- .../opencensusexporter_test.go | 147 ++++++++++++++++- 5 files changed, 278 insertions(+), 38 deletions(-) diff --git a/README.md b/README.md index e552ecb5..5058107a 100644 --- a/README.md +++ b/README.md @@ -156,6 +156,7 @@ exporters: compression: "gzip" cert-pem-file: "server_ca_public.pem" # optional to enable TLS endpoint: "127.0.0.1:55678" + reconnection-delay: 2s jaeger: collector_endpoint: "http://127.0.0.1:14268/api/traces" diff --git a/demos/trace/oc-agent-config.yaml b/demos/trace/oc-agent-config.yaml index cd6d4371..6288f22f 100644 --- a/demos/trace/oc-agent-config.yaml +++ b/demos/trace/oc-agent-config.yaml @@ -1,6 +1,7 @@ receivers: opencensus: address: ":55678" + reconnection-delay: 2s jaeger: collector_http_port: 14268 diff --git a/exporter/README.md b/exporter/README.md index 920116d6..fa967559 100644 --- a/exporter/README.md +++ b/exporter/README.md @@ -85,7 +85,18 @@ queued-exporters: exporters: opencensus: endpoint: "127.0.0.1:55566" + cert-pem-file: "server_ca_public.pem" # PEM file used to enable TLS. For trusted CAs from + # system pool use "secure:" setting, see below. compression: "gzip" + reconnection-dealy: 2s # Delay (+70% jitter) before reconnection attempt in case of error. + secure: true # Used to export to destinations trusted by certificates from the system pool. + # "cert-pem-file:" takes precedence over this setting. + keepalive: + # Keepalive settings for gRPC clients, see https://godoc.org/google.golang.org/grpc/keepalive#ClientParameters. + # Recommended to be set for cases that need to support bursts of data and periods of inactivity. + time: 30s # After inactive for this amount to time client will send a ping to the server. + timeout: 5s # Amount of time that client waits for a keepalive ping response before closing the connection. + permit-without-stream: true # Permits the keepalive ping even if no RPCs are in use, if false no ping is sent. my-org-jaeger: # A second processor with its own configuration options num-workers: 2 queue-size: 100 diff --git a/exporter/opencensusexporter/opencensus.go b/exporter/opencensusexporter/opencensus.go index dd0ae582..64067f60 100644 --- a/exporter/opencensusexporter/opencensus.go +++ b/exporter/opencensusexporter/opencensus.go @@ -16,48 +16,75 @@ package opencensusexporter import ( "context" - "errors" + "crypto/x509" "fmt" - "sync/atomic" + "sync" + "time" "contrib.go.opencensus.io/exporter/ocagent" agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" "github.com/spf13/viper" + "google.golang.org/grpc" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/keepalive" "github.com/census-instrumentation/opencensus-service/consumer" "github.com/census-instrumentation/opencensus-service/data" "github.com/census-instrumentation/opencensus-service/exporter/exporterhelper" + "github.com/census-instrumentation/opencensus-service/internal" "github.com/census-instrumentation/opencensus-service/internal/compression" - "github.com/census-instrumentation/opencensus-service/internal/compression/grpc" + compressiongrpc "github.com/census-instrumentation/opencensus-service/internal/compression/grpc" ) -type opencensusConfig struct { - Endpoint string `mapstructure:"endpoint,omitempty"` - Compression string `mapstructure:"compression,omitempty"` - Headers map[string]string `mapstructure:"headers,omitempty"` - NumWorkers int `mapstructure:"num-workers,omitempty"` - CertPemFile string `mapstructure:"cert-pem-file,omitempty"` +// keepaliveConfig exposes the keepalive.ClientParameters to be used by the exporter. +// Refer to the original data-structure for the meaning of each parameter. +type keepaliveConfig struct { + Time time.Duration `mapstructure:"time,omitempty"` + Timeout time.Duration `mapstructure:"timeout,omitempty"` + PermitWithoutStream bool `mapstructure:"permit-without-stream,omitempty"` +} - // TODO: add insecure, service name options. +type opencensusConfig struct { + Endpoint string `mapstructure:"endpoint,omitempty"` + Compression string `mapstructure:"compression,omitempty"` + Headers map[string]string `mapstructure:"headers,omitempty"` + NumWorkers int `mapstructure:"num-workers,omitempty"` + CertPemFile string `mapstructure:"cert-pem-file,omitempty"` + UseSecure bool `mapstructure:"secure,omitempty"` + ReconnectionDelay time.Duration `mapstructure:"reconnection-delay,omitempty"` + KeepaliveParameters *keepaliveConfig `mapstructure:"keepalive,omitempty"` + // TODO: service name options. } type ocagentExporter struct { counter uint32 - exporters []*ocagent.Exporter + exporters chan *ocagent.Exporter +} + +type ocTraceExporterErrorCode int +type ocTraceExporterError struct { + code ocTraceExporterErrorCode + msg string +} + +var _ error = (*ocTraceExporterError)(nil) + +func (e *ocTraceExporterError) Error() string { + return e.msg } const ( defaultNumWorkers int = 2 -) -var ( - // ErrEndpointRequired indicates that this exporter was not provided with an endpoint in its config. - ErrEndpointRequired = errors.New("OpenCensus exporter config requires an Endpoint") - // ErrUnsupportedCompressionType indicates that this exporter was provided with a compression protocol it does not support. - ErrUnsupportedCompressionType = errors.New("OpenCensus exporter unsupported compression type") - // ErrUnableToGetTLSCreds indicates that this exporter could not read the provided TLS credentials. - ErrUnableToGetTLSCreds = errors.New("OpenCensus exporter unable to read TLS credentials") + _ ocTraceExporterErrorCode = iota // skip 0 + // errEndpointRequired indicates that this exporter was not provided with an endpoint in its config. + errEndpointRequired + // errUnsupportedCompressionType indicates that this exporter was provided with a compression protocol it does not support. + errUnsupportedCompressionType + // errUnableToGetTLSCreds indicates that this exporter could not read the provided TLS credentials. + errUnableToGetTLSCreds + // errAlreadyStopped indicates that the exporter was already stopped. + errAlreadyStopped ) // OpenCensusTraceExportersFromViper unmarshals the viper and returns an consumer.TraceConsumer targeting @@ -75,22 +102,42 @@ func OpenCensusTraceExportersFromViper(v *viper.Viper) (tps []consumer.TraceCons } if ocac.Endpoint == "" { - return nil, nil, nil, ErrEndpointRequired + return nil, nil, nil, &ocTraceExporterError{ + code: errEndpointRequired, + msg: "OpenCensus exporter config requires an Endpoint", + } } opts := []ocagent.ExporterOption{ocagent.WithAddress(ocac.Endpoint)} if ocac.Compression != "" { - if compressionKey := grpc.GetGRPCCompressionKey(ocac.Compression); compressionKey != compression.Unsupported { + if compressionKey := compressiongrpc.GetGRPCCompressionKey(ocac.Compression); compressionKey != compression.Unsupported { opts = append(opts, ocagent.UseCompressor(compressionKey)) } else { - return nil, nil, nil, ErrUnsupportedCompressionType + return nil, nil, nil, &ocTraceExporterError{ + code: errUnsupportedCompressionType, + msg: fmt.Sprintf("OpenCensus exporter unsupported compression type %q", ocac.Compression), + } } } if ocac.CertPemFile != "" { creds, err := credentials.NewClientTLSFromFile(ocac.CertPemFile, "") if err != nil { - return nil, nil, nil, ErrUnableToGetTLSCreds + return nil, nil, nil, &ocTraceExporterError{ + code: errUnableToGetTLSCreds, + msg: fmt.Sprintf("OpenCensus exporter unable to read TLS credentials from pem file %q: %v", ocac.CertPemFile, err), + } + } + opts = append(opts, ocagent.WithTLSCredentials(creds)) + } else if ocac.UseSecure { + certPool, err := x509.SystemCertPool() + if err != nil { + return nil, nil, nil, &ocTraceExporterError{ + code: errUnableToGetTLSCreds, + msg: fmt.Sprintf( + "OpenCensus exporter unable to read certificates from system pool: %v", err), + } } + creds := credentials.NewClientTLSFromCert(certPool, "") opts = append(opts, ocagent.WithTLSCredentials(creds)) } else { opts = append(opts, ocagent.WithInsecure()) @@ -98,26 +145,32 @@ func OpenCensusTraceExportersFromViper(v *viper.Viper) (tps []consumer.TraceCons if len(ocac.Headers) > 0 { opts = append(opts, ocagent.WithHeaders(ocac.Headers)) } + if ocac.ReconnectionDelay > 0 { + opts = append(opts, ocagent.WithReconnectionPeriod(ocac.ReconnectionDelay)) + } + if ocac.KeepaliveParameters != nil { + opts = append(opts, ocagent.WithGRPCDialOption(grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: ocac.KeepaliveParameters.Time, + Timeout: ocac.KeepaliveParameters.Timeout, + PermitWithoutStream: ocac.KeepaliveParameters.PermitWithoutStream, + }))) + } numWorkers := defaultNumWorkers if ocac.NumWorkers > 0 { numWorkers = ocac.NumWorkers } - exporters := make([]*ocagent.Exporter, 0, numWorkers) + exportersChan := make(chan *ocagent.Exporter, numWorkers) for exporterIndex := 0; exporterIndex < numWorkers; exporterIndex++ { exporter, serr := ocagent.NewExporter(opts...) if serr != nil { return nil, nil, nil, fmt.Errorf("cannot configure OpenCensus Trace exporter: %v", serr) } - exporters = append(exporters, exporter) - doneFns = append(doneFns, func() error { - exporter.Flush() - return nil - }) + exportersChan <- exporter } - oce := &ocagentExporter{exporters: exporters} + oce := &ocagentExporter{exporters: exportersChan} oexp, err := exporterhelper.NewTraceExporter( "oc_trace", oce.PushTraceData, @@ -129,15 +182,53 @@ func OpenCensusTraceExportersFromViper(v *viper.Viper) (tps []consumer.TraceCons } tps = append(tps, oexp) + doneFns = append(doneFns, oce.stop) // TODO: (@odeke-em, @songya23) implement ExportMetrics for OpenCensus. // mps = append(mps, oexp) return } +func (oce *ocagentExporter) stop() error { + wg := &sync.WaitGroup{} + var errors []error + var errorsMu sync.Mutex + visitedCnt := 0 + for currExporter := range oce.exporters { + wg.Add(1) + go func(exporter *ocagent.Exporter) { + defer wg.Done() + err := exporter.Stop() + if err != nil { + errorsMu.Lock() + errors = append(errors, err) + errorsMu.Unlock() + } + }(currExporter) + visitedCnt++ + if visitedCnt == cap(oce.exporters) { + // Visited and started Stop on all exporters, just wait for the stop to finish. + break + } + } + + wg.Wait() + close(oce.exporters) + + return internal.CombineErrors(errors) +} + func (oce *ocagentExporter) PushTraceData(ctx context.Context, td data.TraceData) (int, error) { - // Get an exporter worker round-robin - exporter := oce.exporters[atomic.AddUint32(&oce.counter, 1)%uint32(len(oce.exporters))] + // Get first available exporter. + exporter, ok := <-oce.exporters + if !ok { + err := &ocTraceExporterError{ + code: errAlreadyStopped, + msg: fmt.Sprintf("OpenCensus exporter was already stopped."), + } + return len(td.Spans), err + } + err := exporter.ExportTraceServiceRequest( &agenttracepb.ExportTraceServiceRequest{ Spans: td.Spans, @@ -145,6 +236,7 @@ func (oce *ocagentExporter) PushTraceData(ctx context.Context, td data.TraceData Node: td.Node, }, ) + oce.exporters <- exporter if err != nil { return len(td.Spans), err } diff --git a/exporter/opencensusexporter/opencensusexporter_test.go b/exporter/opencensusexporter/opencensusexporter_test.go index 73183a3e..c5b21b24 100644 --- a/exporter/opencensusexporter/opencensusexporter_test.go +++ b/exporter/opencensusexporter/opencensusexporter_test.go @@ -15,8 +15,12 @@ package opencensusexporter import ( + "context" "testing" + "time" + "github.com/census-instrumentation/opencensus-service/data" + "github.com/google/go-cmp/cmp" "github.com/spf13/viper" ) @@ -26,8 +30,8 @@ func TestOpenCensusTraceExportersFromViper(t *testing.T) { v.Set("opencensus.endpoint", "") _, _, _, err := OpenCensusTraceExportersFromViper(v) - if err != ErrEndpointRequired { - t.Fatalf("Expected to get ErrEndpointRequired. Got %v", err) + if errorCode(err) != errEndpointRequired { + t.Fatalf("Expected to get errEndpointRequired. Got %v", err) } v.Set("opencensus.endpoint", "127.0.0.1:55678") @@ -47,8 +51,8 @@ func TestOpenCensusTraceExportersFromViper_TLS(t *testing.T) { v.Set("opencensus.cert-pem-file", "dummy_file.pem") _, _, _, err := OpenCensusTraceExportersFromViper(v) - if err != ErrUnableToGetTLSCreds { - t.Fatalf("Expected to get ErrUnableToGetTLSCreds but did not") + if errorCode(err) != errUnableToGetTLSCreds { + t.Fatalf("Expected to get errUnableToGetTLSCreds but got %v", err) } v.Set("opencensus.cert-pem-file", "testdata/test_cert.pem") @@ -66,8 +70,8 @@ func TestOpenCensusTraceExportersFromViper_Compression(t *testing.T) { v.Set("opencensus.endpoint", "127.0.0.1:55678") v.Set("opencensus.compression", "random-compression") _, _, _, err := OpenCensusTraceExportersFromViper(v) - if err != ErrUnsupportedCompressionType { - t.Fatalf("Expected to get ErrUnsupportedCompressionType but did not") + if errorCode(err) != errUnsupportedCompressionType { + t.Fatalf("Expected to get errUnsupportedCompressionType but got %v", err) } v.Set("opencensus.compression", "gzip") @@ -79,3 +83,134 @@ func TestOpenCensusTraceExportersFromViper_Compression(t *testing.T) { t.Fatalf("Should get 1 exporter but got %d", len(exporters)) } } + +func TestOpenCensusTraceExporters_StopError(t *testing.T) { + v := viper.New() + v.Set("opencensus.endpoint", "127.0.0.1:55678") + tps, _, doneFns, err := OpenCensusTraceExportersFromViper(v) + doneFnCalled := false + defer func() { + if doneFnCalled { + return + } + for _, doneFn := range doneFns { + doneFn() + } + }() + if err != nil { + t.Fatalf("got = %v, want = nil", err) + } + if len(tps) != 1 { + t.Fatalf("got %d trace exporters, want 1", len(tps)) + } + if len(doneFns) != 1 { + t.Fatalf("got %d close functions, want 1", len(doneFns)) + } + + err = doneFns[0]() + doneFnCalled = true + if err != nil { + t.Fatalf("doneFn[0]() got = %v, want = nil", err) + } + err = tps[0].ConsumeTraceData(context.Background(), data.TraceData{}) + if errorCode(err) != errAlreadyStopped { + t.Fatalf("Expected to get errAlreadyStopped but got %v", err) + } +} + +// TestOpenCensusTraceExporterConfigsViaViper validates with specific parts of the configuration +// are correctly read via Viper. It ensures that instances can be created with that config, however, +// it doesn't validate that the settings were actually applied (that requires changes to better expose +// internal data from the exporter or use of reflection). +func TestOpenCensusTraceExporterConfigsViaViper(t *testing.T) { + const defaultTestEndPoint = "127.0.0.1:55678" + tests := []struct { + name string + configMap map[string]string + want opencensusConfig + }{ + { + name: "UseSecure", + configMap: map[string]string{ + "opencensus.endpoint": defaultTestEndPoint, + "opencensus.secure": "true", + }, + want: opencensusConfig{ + Endpoint: defaultTestEndPoint, + UseSecure: true, + }, + }, + { + name: "ReconnectionDelay", + configMap: map[string]string{ + "opencensus.endpoint": defaultTestEndPoint, + "opencensus.reconnection-delay": "5s", + }, + want: opencensusConfig{ + Endpoint: defaultTestEndPoint, + ReconnectionDelay: 5 * time.Second, + }, + }, + { + name: "KeepaliveParameters", + configMap: map[string]string{ + "opencensus.endpoint": defaultTestEndPoint, + "opencensus.keepalive.time": "30s", + "opencensus.keepalive.timeout": "25s", + "opencensus.keepalive.permit-without-stream": "true", + }, + want: opencensusConfig{ + Endpoint: defaultTestEndPoint, + KeepaliveParameters: &keepaliveConfig{ + Time: 30 * time.Second, + Timeout: 25 * time.Second, + PermitWithoutStream: true, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + v := viper.New() + for key, value := range tt.configMap { + v.Set(key, value) + } + + // Ensure that the settings are being read, via UnmarshalExact. + var got opencensusConfig + if err := v.Sub("opencensus").UnmarshalExact(&got); err != nil { + t.Fatalf("UnmarshalExact() error: %v", err) + } + + if diff := cmp.Diff(got, tt.want); diff != "" { + t.Errorf("Mismatched configs\n-Got +Want:\n\t%s", diff) + } + + // Ensure creation happens as expected. + tps, _, doneFns, err := OpenCensusTraceExportersFromViper(v) + defer func() { + for _, doneFn := range doneFns { + doneFn() + } + }() + if err != nil { + t.Fatalf("got = %v, want = nil", err) + } + if len(tps) != 1 { + t.Fatalf("got %d trace exporters, want 1", len(tps)) + } + if len(doneFns) != 1 { + t.Fatalf("got %d close functions, want 1", len(doneFns)) + } + }) + } +} + +func errorCode(err error) ocTraceExporterErrorCode { + ocErr, ok := err.(*ocTraceExporterError) + if !ok { + return ocTraceExporterErrorCode(0) + } + return ocErr.code +}