Skip to content
This repository has been archived by the owner on Nov 7, 2022. It is now read-only.

Commit

Permalink
Add grpc server configs to OC receiver (#560)
Browse files Browse the repository at this point in the history
* Add grpc server configs to OC receiver

This will be only applied to the collector since it is assumed that unlike the agent it is typically located behind some loadbalancer or proxy where these settings are more useful.

* Updates to README and k8s file
  • Loading branch information
Paulo Janotti authored May 31, 2019
1 parent cd2c92d commit 65c89d8
Show file tree
Hide file tree
Showing 9 changed files with 294 additions and 18 deletions.
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,10 @@ The collector also serves as a control plane for agents/clients by supplying
them updated configuration (e.g. trace sampling policies), and reporting
agent/client health information/inventory metadata to downstream exporters.

### <a name="receivers-configuration"></a> Receivers Configuration

For detailed information about configuring receivers for the collector refer to the [receivers README.md](receiver/README.md).

### <a name="global-attributes"></a> Global Attributes

The collector also takes some global configurations that modify its behavior for all receivers / exporters.
Expand Down Expand Up @@ -398,7 +402,7 @@ Sample configuration file:
log-level: DEBUG
receivers:
opencensus: {} # Runs OpenCensus receiver with default configuration (default behavior)
opencensus: {} # Runs OpenCensus receiver with default configuration (default behavior).
queued-exporters:
jaeger-sender-test: # A friendly name for the exporter
Expand Down
32 changes: 32 additions & 0 deletions cmd/occollector/app/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"flag"
"fmt"
"strings"
"time"

"github.com/census-instrumentation/opencensus-service/internal/config"
"github.com/spf13/viper"
Expand Down Expand Up @@ -106,6 +107,37 @@ type OpenCensusReceiverCfg struct {

// TLSCredentials is a (cert_file, key_file) configuration.
TLSCredentials *config.TLSCredentials `mapstructure:"tls_credentials"`

// Keepalive anchor for all the settings related to keepalive.
Keepalive *serverParametersAndEnforcementPolicy `mapstructure:"keepalive,omitempty"`

// MaxRecvMsgSizeMiB sets the maximum size (in MiB) of messages accepted by the server.
MaxRecvMsgSizeMiB uint64 `mapstructure:"max-recv-msg-size-mib"`

// MaxConcurrentStreams sets the limit on the number of concurrent streams to each ServerTransport.
MaxConcurrentStreams uint32 `mapstructure:"max-concurrent-streams"`
}

type serverParametersAndEnforcementPolicy struct {
ServerParameters *keepaliveServerParameters `mapstructure:"server-parameters,omitempty"`
EnforcementPolicy *keepaliveEnforcementPolicy `mapstructure:"enforcement-policy,omitempty"`
}

// keepaliveServerParameters allow configuration of the keepalive.ServerParameters.
// See https://godoc.org/google.golang.org/grpc/keepalive#ServerParameters for details.
type keepaliveServerParameters struct {
MaxConnectionIdle time.Duration `mapstructure:"max-connection-idle,omitempty"`
MaxConnectionAge time.Duration `mapstructure:"max-connection-age,omitempty"`
MaxConnectionAgeGrace time.Duration `mapstructure:"max-connection-age-grace,omitempty"`
Time time.Duration `mapstructure:"time,omitempty"`
Timeout time.Duration `mapstructure:"timeout,omitempty"`
}

// keepaliveEnforcementPolicy allow configuration of the keepalive.EnforcementPolicy.
// See https://godoc.org/google.golang.org/grpc/keepalive#EnforcementPolicy for details.
type keepaliveEnforcementPolicy struct {
MinTime time.Duration `mapstructure:"min-time,omitempty"`
PermitWithoutStream bool `mapstructure:"permit-without-stream,omitempty"`
}

// OpenCensusReceiverEnabled checks if the OpenCensus receiver is enabled, via a command-line flag, environment
Expand Down
30 changes: 30 additions & 0 deletions cmd/occollector/app/builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,36 @@ func TestTailSamplingConfig(t *testing.T) {
}
}

func TestOpencensusReceiverKeepaliveSettings(t *testing.T) {
v, err := loadViperFromFile("./testdata/oc_keepalive_config.yaml")
if err != nil {
t.Fatalf("Failed to load viper from test file: %v", err)
}

wCfg := NewDefaultOpenCensusReceiverCfg()
wCfg.Keepalive = &serverParametersAndEnforcementPolicy{
ServerParameters: &keepaliveServerParameters{
Time: 30 * time.Second,
Timeout: 5 * time.Second,
},
EnforcementPolicy: &keepaliveEnforcementPolicy{
MinTime: 10 * time.Second,
PermitWithoutStream: true,
},
}

gCfg, err := NewDefaultOpenCensusReceiverCfg().InitFromViper(v)
if err != nil {
t.Fatalf("got '%v', want nil", err)
}
if !reflect.DeepEqual(*gCfg.Keepalive.ServerParameters, *wCfg.Keepalive.ServerParameters) {
t.Fatalf("Wanted ServerParameters %+v but got %+v", *wCfg.Keepalive.ServerParameters, *gCfg.Keepalive.ServerParameters)
}
if !reflect.DeepEqual(*gCfg.Keepalive.EnforcementPolicy, *wCfg.Keepalive.EnforcementPolicy) {
t.Fatalf("Wanted EnforcementPolicy %+v but got %+v", *wCfg.Keepalive.EnforcementPolicy, *gCfg.Keepalive.EnforcementPolicy)
}
}

func loadViperFromFile(file string) (*viper.Viper, error) {
v := viper.New()
v.SetConfigFile(file)
Expand Down
9 changes: 9 additions & 0 deletions cmd/occollector/app/builder/testdata/oc_keepalive_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
receivers:
opencensus:
keepalive:
server-parameters:
time: 30s
timeout: 5s
enforcement-policy:
min-time: 10s
permit-without-stream: true
7 changes: 6 additions & 1 deletion example/k8s.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,12 @@ metadata:
data:
oc-collector-config: |
receivers:
opencensus: {}
opencensus:
# keepalive settings can help load balancing, see receiver/README.md for more info.
keepalive:
server-parameters:
max-connection-age: 120s
max-connection-age-grace: 30s
jaeger: {}
zipkin: {}
# Can only use one exporter
Expand Down
1 change: 0 additions & 1 deletion internal/collector/opencensus/.nocover

This file was deleted.

80 changes: 65 additions & 15 deletions internal/collector/opencensus/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (

"github.com/spf13/viper"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"

"github.com/census-instrumentation/opencensus-service/cmd/occollector/app/builder"
"github.com/census-instrumentation/opencensus-service/consumer"
Expand All @@ -32,18 +34,12 @@ import (

// Start starts the OpenCensus receiver endpoint.
func Start(logger *zap.Logger, v *viper.Viper, traceConsumer consumer.TraceConsumer, asyncErrorChan chan<- error) (receiver.TraceReceiver, error) {
rOpts, err := builder.NewDefaultOpenCensusReceiverCfg().InitFromViper(v)
addr, opts, zapFields, err := receiverOptions(v)
if err != nil {
return nil, err
}

tlsCredsOption, hasTLSCreds, err := rOpts.TLSCredentials.ToOpenCensusReceiverServerOption()
if err != nil {
return nil, fmt.Errorf("OpenCensus receiver TLS Credentials: %v", err)
}

addr := ":" + strconv.FormatInt(int64(rOpts.Port), 10)
ocr, err := opencensusreceiver.New(addr, traceConsumer, nil, tlsCredsOption)
ocr, err := opencensusreceiver.New(addr, traceConsumer, nil, opts...)
if err != nil {
return nil, fmt.Errorf("Failed to create the OpenCensus trace receiver: %v", err)
}
Expand All @@ -52,15 +48,69 @@ func Start(logger *zap.Logger, v *viper.Viper, traceConsumer consumer.TraceConsu
return nil, fmt.Errorf("Cannot bind Opencensus receiver to address %q: %v", addr, err)
}

logger.Info("OpenCensus receiver is running.", zapFields...)

return ocr, nil
}

func receiverOptions(v *viper.Viper) (addr string, opts []opencensusreceiver.Option, zapFields []zap.Field, err error) {
rOpts, err := builder.NewDefaultOpenCensusReceiverCfg().InitFromViper(v)
if err != nil {
return addr, opts, zapFields, err
}

tlsCredsOption, hasTLSCreds, err := rOpts.TLSCredentials.ToOpenCensusReceiverServerOption()
if err != nil {
return addr, opts, zapFields, fmt.Errorf("OpenCensus receiver TLS Credentials: %v", err)
}
if hasTLSCreds {
opts = append(opts, tlsCredsOption)
tlsCreds := rOpts.TLSCredentials
logger.Info("OpenCensus receiver is running.",
zap.Int("port", rOpts.Port),
zap.String("cert_file", tlsCreds.CertFile),
zap.String("key_file", tlsCreds.KeyFile))
} else {
logger.Info("OpenCensus receiver is running.", zap.Int("port", rOpts.Port))
zapFields = append(zapFields, zap.String("cert_file", tlsCreds.CertFile), zap.String("key_file", tlsCreds.KeyFile))
}

return ocr, nil
grpcServerOptions, zapFields := grpcServerOptions(rOpts, zapFields)
if len(grpcServerOptions) > 0 {
opts = append(opts, opencensusreceiver.WithGRPCServerOptions(grpcServerOptions...))
}

addr = ":" + strconv.FormatInt(int64(rOpts.Port), 10)
zapFields = append(zapFields, zap.Int("port", rOpts.Port))

return addr, opts, zapFields, err
}

func grpcServerOptions(rOpts *builder.OpenCensusReceiverCfg, zapFields []zap.Field) ([]grpc.ServerOption, []zap.Field) {
var grpcServerOptions []grpc.ServerOption
if rOpts.MaxRecvMsgSizeMiB > 0 {
grpcServerOptions = append(grpcServerOptions, grpc.MaxRecvMsgSize(int(rOpts.MaxRecvMsgSizeMiB*1024*1024)))
zapFields = append(zapFields, zap.Uint64("max-recv-msg-size-mib", rOpts.MaxRecvMsgSizeMiB))
}
if rOpts.MaxConcurrentStreams > 0 {
grpcServerOptions = append(grpcServerOptions, grpc.MaxConcurrentStreams(rOpts.MaxConcurrentStreams))
zapFields = append(zapFields, zap.Uint32("max-concurrent-streams", rOpts.MaxConcurrentStreams))
}
if rOpts.Keepalive != nil {
if rOpts.Keepalive.ServerParameters != nil {
svrParams := rOpts.Keepalive.ServerParameters
grpcServerOptions = append(grpcServerOptions, grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionIdle: svrParams.MaxConnectionIdle,
MaxConnectionAge: svrParams.MaxConnectionAge,
MaxConnectionAgeGrace: svrParams.MaxConnectionAgeGrace,
Time: svrParams.Time,
Timeout: svrParams.Timeout,
}))
zapFields = append(zapFields, zap.Any("keepalive.server-parameters", rOpts.Keepalive.ServerParameters))
}
if rOpts.Keepalive.EnforcementPolicy != nil {
enfPol := rOpts.Keepalive.EnforcementPolicy
grpcServerOptions = append(grpcServerOptions, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: enfPol.MinTime,
PermitWithoutStream: enfPol.PermitWithoutStream,
}))
zapFields = append(zapFields, zap.Any("keepalive.enforcement-policy", rOpts.Keepalive.EnforcementPolicy))
}
}

return grpcServerOptions, zapFields
}
101 changes: 101 additions & 0 deletions internal/collector/opencensus/receiver_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2019, OpenCensus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package ocreceiver wraps the functionality to start the end-point that
// receives data directly in the OpenCensus format.
package ocreceiver

import (
"testing"
"time"

"github.com/spf13/viper"
"go.uber.org/zap"

"github.com/census-instrumentation/opencensus-service/cmd/occollector/app/builder"
"github.com/census-instrumentation/opencensus-service/processor/processortest"
"github.com/census-instrumentation/opencensus-service/receiver/opencensusreceiver"
)

func TestStart(t *testing.T) {
tests := []struct {
name string
viperFn func() *viper.Viper
wantErr bool
}{
{
name: "default_config",
viperFn: func() *viper.Viper {
v := viper.New()
v.Set("receivers.opencensus.{}", nil)
return v
},
},
{
name: "invalid_port",
viperFn: func() *viper.Viper {
v := viper.New()
v.Set("receivers.opencensus.port", -1)
return v
},
wantErr: true,
},
{
name: "missing_tls_files",
viperFn: func() *viper.Viper {
v := viper.New()
v.Set("receivers.opencensus.tls_credentials.cert_file", "foo")
return v
},
wantErr: true,
},
{
name: "grpc_settings",
viperFn: func() *viper.Viper {
v := viper.New()
v.Set("receivers.opencensus.port", 55678)
v.Set("receivers.opencensus.max-recv-msg-size-mib", 32)
v.Set("receivers.opencensus.max-concurrent-streams", 64)
v.Set("receivers.opencensus.keepalive.server-parameters.max-connection-age", 180*time.Second)
v.Set("receivers.opencensus.keepalive.server-parameters.max-connection-age-grace", 10*time.Second)
v.Set("receivers.opencensus.keepalive.enforcement-policy.min-time", 60*time.Second)
v.Set("receivers.opencensus.keepalive.enforcement-policy.permit-without-stream", true)
return v
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Enforce that all configurations are actually recognized.
v := tt.viperFn()
rOpts := builder.OpenCensusReceiverCfg{}
if err := v.Sub("receivers.opencensus").UnmarshalExact(&rOpts); err != nil {
t.Errorf("UnmarshalExact error: %v", err)
return
}
nopProcessor := processortest.NewNopTraceProcessor(nil)
asyncErrChan := make(chan error, 1)
got, err := Start(zap.NewNop(), v, nopProcessor, asyncErrChan)
if (err != nil) != tt.wantErr {
t.Errorf("Start() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != nil {
// TODO: (@pjanotti) current StopTraceReception, stop the whole receiver.
// See https://github.com/census-instrumentation/opencensus-service/issues/559
got.(*opencensusreceiver.Receiver).Stop()
}
})
}
}
46 changes: 46 additions & 0 deletions receiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,52 @@ using `--receive-oc-trace=false`. On the Collector only the port can be configur
receivers:
opencensus:
port: 55678
# Settings below are only available on collector.
# Changes the maximum msg size that can be received (default is 4MiB).
# See https://godoc.org/google.golang.org/grpc#MaxRecvMsgSize for more information.
max-recv-msg-size-mib: 32
# Limits the maximum number of concurrent streams for each receiver transport (default is 100).
# See https://godoc.org/google.golang.org/grpc#MaxConcurrentStreams for more information.
max-concurrent-streams: 20
# Controls the keepalive settings, typically used to help scenarios in which the senders have
# load-balancers or proxies between them and the collectors.
keepalive:
# This section controls the https://godoc.org/google.golang.org/grpc/keepalive#ServerParameters.
# These are typically used to help load balancers by periodically terminating connections, or keeping
# connections alive (preventing RSTs by proxies) when needed for bursts of data following periods of
# inactivity.
server-parameters:
# max-connection-idle is the amount of time after which an idle connection would be closed,
# the default is infinity.
max-connection-idle: 90s
# max-connection-age is the maximum amount of time a connection may exist before it is closed,
# the default is infinity.
max-connection-age: 180s
# max-connection-age-grace is an additive period after max-connection-age for which the connection
# will be forcibly closed. The default is infinity.
max-connection-age-grace: 10s
# time is a duration for which, if the server doesn't see any activity it pings the client to see
# if the transport is still alive. The default is 2 hours.
time: 30s
# timeout is the wait time after a ping that the server waits for the response before closing the
# connection. The default is 20 seconds.
timeout: 5s
# This section controls the https://godoc.org/google.golang.org/grpc/keepalive#EnforcementPolicy.
# It is used to set keepalive enforcement policy on the server-side. Server will close connection
# with a client that violates this policy.
enforcement-policy:
# min-time is the minimum amount of time a client should wait before sending a keepalive ping.
# The default value is 5 minutes.
min-time: 10s
# permit-without-stream if true, server allows keepalive pings even when there are no active
# streams(RPCs). The default is false.
permit-without-stream: true
```

## Jaeger
Expand Down

0 comments on commit 65c89d8

Please sign in to comment.