This repository has been archived by the owner on Nov 7, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 60
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Build exporters based on new configuration (#569)
- Introduced a new unisvc binary and make target. Unisvc is planned to be the implementation of the unified agent and collector that uses the new configuration format. - Refactored existing Application.execute() code to make it more readable and also reusable. The functionality is not changed. Reviews: please check this carefully! - Introduced new Application.executeUnified() function that will call all new agent/collector execution logic that is different from old logic. Application.executeUnified() is currently partial implementation, which only builds the exporters. I plan to add building of processor pipelines and receivers in future PRs. - Introduced ExportersBuilder which builds runtime exporters based on provided configuration. - Added ability for App telemetry shutdown. This is required to be able to run multiple tests that involve App start and shutdown. Previously there was only one test - TestApplication_Start - and it was not correctly cleaning up telemetry resources at the end, making impossible to add more tests against App.
- Loading branch information
1 parent
00fa8ee
commit 0747a83
Showing
20 changed files
with
1,016 additions
and
36 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,241 @@ | ||
// Copyright 2019, OpenCensus Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package builder | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
|
||
"go.uber.org/zap" | ||
|
||
"github.com/census-instrumentation/opencensus-service/consumer" | ||
"github.com/census-instrumentation/opencensus-service/data" | ||
"github.com/census-instrumentation/opencensus-service/exporter" | ||
"github.com/census-instrumentation/opencensus-service/internal" | ||
"github.com/census-instrumentation/opencensus-service/internal/configmodels" | ||
"github.com/census-instrumentation/opencensus-service/internal/factories" | ||
) | ||
|
||
// exporterImpl is a running exporter that is built based on a config. It can have | ||
// a trace and/or a metrics consumer and have a stop function. | ||
type exporterImpl struct { | ||
tc consumer.TraceConsumer | ||
mc consumer.MetricsConsumer | ||
stop func() error | ||
} | ||
|
||
// Check that exporterImpl implements Exporter interface. | ||
var _ exporter.Exporter = (*exporterImpl)(nil) | ||
|
||
// ConsumeTraceData receives data.TraceData for processing by the TraceConsumer. | ||
func (exp *exporterImpl) ConsumeTraceData(ctx context.Context, td data.TraceData) error { | ||
return exp.tc.ConsumeTraceData(ctx, td) | ||
} | ||
|
||
// ConsumeMetricsData receives data.MetricsData for processing by the MetricsConsumer. | ||
func (exp *exporterImpl) ConsumeMetricsData(ctx context.Context, md data.MetricsData) error { | ||
return exp.mc.ConsumeMetricsData(ctx, md) | ||
} | ||
|
||
// TraceExportFormat is unneeded, we need to get rid of it after we cleanup | ||
// exporter.TraceExporter interface. | ||
func (exp *exporterImpl) TraceExportFormat() string { | ||
return "" | ||
} | ||
|
||
// MetricsExportFormat is unneeded, we need to get rid of it after we cleanup | ||
// exporter.MetricsExporter interface. | ||
func (exp *exporterImpl) MetricsExportFormat() string { | ||
return "" | ||
} | ||
|
||
// Stop the exporter. | ||
func (exp *exporterImpl) Stop() error { | ||
return exp.stop() | ||
} | ||
|
||
// Exporters is a map of exporters created from exporter configs. | ||
type Exporters map[configmodels.Exporter]*exporterImpl | ||
|
||
// StopAll stops all exporters. | ||
func (exps Exporters) StopAll() { | ||
for _, exp := range exps { | ||
exp.Stop() | ||
} | ||
} | ||
|
||
type dataTypeRequirement struct { | ||
// Pipeline that requires the data type. | ||
requiredBy *configmodels.Pipeline | ||
} | ||
|
||
// Map of data type requirements. | ||
type dataTypeRequirements map[configmodels.DataType]dataTypeRequirement | ||
|
||
// Data type requirements for all exporters. | ||
type exportersRequiredDataTypes map[configmodels.Exporter]dataTypeRequirements | ||
|
||
// ExportersBuilder builds exporters from config. | ||
type ExportersBuilder struct { | ||
logger *zap.Logger | ||
config *configmodels.ConfigV2 | ||
} | ||
|
||
// NewExportersBuilder creates a new ExportersBuilder. Call Build() on the returned value. | ||
func NewExportersBuilder(logger *zap.Logger, config *configmodels.ConfigV2) *ExportersBuilder { | ||
return &ExportersBuilder{logger, config} | ||
} | ||
|
||
// Build exporters from config. | ||
func (eb *ExportersBuilder) Build() (Exporters, error) { | ||
exporters := make(Exporters) | ||
|
||
// We need to calculate required input data types for each exporter so that we know | ||
// which data type must be started for each exporter. | ||
exporterInputDataTypes := eb.calcExportersRequiredDataTypes() | ||
|
||
// Build exporters based on configuration and required input data types. | ||
for _, cfg := range eb.config.Exporters { | ||
exp, err := eb.buildExporter(cfg, exporterInputDataTypes) | ||
if err != nil { | ||
return nil, err | ||
} | ||
exporters[cfg] = exp | ||
} | ||
|
||
return exporters, nil | ||
} | ||
|
||
func (eb *ExportersBuilder) calcExportersRequiredDataTypes() exportersRequiredDataTypes { | ||
|
||
// Go over all pipelines. The data type of the pipeline defines what data type | ||
// each exporter is expected to receive. Collect all required types for each | ||
// exporter. | ||
// | ||
// We also remember the last pipeline that requested the particular data type. | ||
// This is only needed for logging purposes in error cases when we need to | ||
// print that a particular exporter does not support the data type required for | ||
// a particular pipeline. | ||
|
||
result := make(exportersRequiredDataTypes) | ||
|
||
// Iterate over pipelines. | ||
for _, pipeline := range eb.config.Pipelines { | ||
// Iterate over all exporters for this pipeline. | ||
for _, expName := range pipeline.Exporters { | ||
// Find the exporter config by name. | ||
exporter := eb.config.Exporters[expName] | ||
|
||
// Create the data type requirement for the exporter if it does not exist. | ||
if result[exporter] == nil { | ||
result[exporter] = make(dataTypeRequirements) | ||
} | ||
|
||
// Remember that this data type is required for the exporter and also which | ||
// pipeline the requirement is coming from. | ||
result[exporter][pipeline.InputType] = dataTypeRequirement{pipeline} | ||
} | ||
} | ||
return result | ||
} | ||
|
||
// combineStopFunc combines 2 functions and returns one function | ||
// that can be called and which in turn will call both functions | ||
// and then combine any errors that the 2 functions return. | ||
// Safe to use if any of the 2 functions are nil. If both functions | ||
// are nil then returns nil. | ||
func combineStopFunc(f1, f2 factories.StopFunc) factories.StopFunc { | ||
if f1 == nil { | ||
return f2 | ||
} | ||
if f2 == nil { | ||
return f1 | ||
} | ||
|
||
return func() error { | ||
var errs []error | ||
if err := f1(); err != nil { | ||
errs = append(errs, err) | ||
} | ||
if err := f2(); err != nil { | ||
errs = append(errs, err) | ||
} | ||
return internal.CombineErrors(errs) | ||
} | ||
} | ||
|
||
func (eb *ExportersBuilder) buildExporter( | ||
config configmodels.Exporter, | ||
exportersInputDataTypes exportersRequiredDataTypes, | ||
) (*exporterImpl, error) { | ||
|
||
factory := factories.GetExporterFactory(config.Type()) | ||
|
||
exporter := &exporterImpl{} | ||
|
||
inputDataTypes := exportersInputDataTypes[config] | ||
if inputDataTypes == nil { | ||
// No data types where requested for this exporter. This can only happen | ||
// if there are no pipelines associated with the exporter. | ||
eb.logger.Warn("Exporter " + config.Name() + | ||
" is not associated with any pipeline and will not export data.") | ||
return exporter, nil | ||
} | ||
|
||
if requirement, ok := inputDataTypes[configmodels.TracesDataType]; ok { | ||
// Traces data type is required. Create a trace exporter based on config. | ||
tc, stopFunc, err := factory.CreateTraceExporter(config) | ||
if err != nil { | ||
if err == factories.ErrDataTypeIsNotSupported { | ||
// Could not create because this exporter does not support this data type. | ||
return nil, typeMismatchErr(config, requirement.requiredBy, configmodels.TracesDataType) | ||
} | ||
return nil, fmt.Errorf("error creating %q exporter: %v", config.Name(), err) | ||
} | ||
|
||
exporter.tc = tc | ||
exporter.stop = stopFunc | ||
} | ||
|
||
if requirement, ok := inputDataTypes[configmodels.MetricsDataType]; ok { | ||
// Metrics data type is required. Create a trace exporter based on config. | ||
mc, stopFunc, err := factory.CreateMetricsExporter(config) | ||
if err != nil { | ||
if err == factories.ErrDataTypeIsNotSupported { | ||
// Could not create because this exporter does not support this data type. | ||
return nil, typeMismatchErr(config, requirement.requiredBy, configmodels.MetricsDataType) | ||
} | ||
return nil, fmt.Errorf("error creating %q exporter: %v", config.Name(), err) | ||
} | ||
|
||
exporter.mc = mc | ||
exporter.stop = combineStopFunc(exporter.stop, stopFunc) | ||
} | ||
|
||
return exporter, nil | ||
} | ||
|
||
func typeMismatchErr( | ||
config configmodels.Exporter, | ||
requiredByPipeline *configmodels.Pipeline, | ||
dataType configmodels.DataType, | ||
) error { | ||
return fmt.Errorf( | ||
"pipeline %q produces %q to exporter %s which does not support %q "+ | ||
"telemetry data. exporter will be detached from pipeline", | ||
requiredByPipeline.Name, dataType.GetDataTypeStr(), | ||
config.Name(), dataType.GetDataTypeStr(), | ||
) | ||
} |
Oops, something went wrong.