Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions x-pack/filebeat/fbreceiver/receiver_leak_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,6 @@ func TestLeak(t *testing.T) {
},
},
},
"output": map[string]any{
"otelconsumer": map[string]any{},
},
"logging": map[string]any{
"level": "debug",
"selectors": []string{
Expand Down
18 changes: 0 additions & 18 deletions x-pack/filebeat/fbreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,6 @@ func TestNewReceiver(t *testing.T) {
},
},
},
"output": map[string]any{
"otelconsumer": map[string]any{},
},
"logging": map[string]any{
"level": "debug",
"selectors": []string{
Expand Down Expand Up @@ -139,9 +136,6 @@ func benchmarkFactoryWithLogLevel(b *testing.B, level zapcore.Level) {
},
},
},
"output": map[string]any{
"otelconsumer": map[string]any{},
},
"logging": map[string]any{
"level": level.String(),
"selectors": []string{
Expand Down Expand Up @@ -203,9 +197,6 @@ func TestMultipleReceivers(t *testing.T) {
},
},
},
"output": map[string]any{
"otelconsumer": map[string]any{},
},
"logging": map[string]any{
"level": "info",
"selectors": []string{
Expand Down Expand Up @@ -370,9 +361,6 @@ func TestReceiverDegraded(t *testing.T) {
},
},
},
"output": map[string]any{
"otelconsumer": map[string]any{},
},
"logging": map[string]any{
"level": "debug",
"selectors": []string{
Expand Down Expand Up @@ -569,9 +557,6 @@ func TestConsumeContract(t *testing.T) {
},
},
},
"output": map[string]any{
"otelconsumer": map[string]any{},
},
"logging": map[string]any{
"level": "debug",
"selectors": []string{
Expand Down Expand Up @@ -607,9 +592,6 @@ func TestReceiverHook(t *testing.T) {
},
},
},
"output": map[string]any{
"otelconsumer": map[string]any{},
},
"management.otel.enabled": true,
"path.home": t.TempDir(),
},
Expand Down
4 changes: 0 additions & 4 deletions x-pack/filebeat/tests/integration/otel_lsexporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,6 @@ processors:
- %s
prospector.scanner.fingerprint.enabled: false
file_identity.native: ~
output:
otelconsumer:
processors:
- add_host_metadata: ~
- add_fields:
Expand Down Expand Up @@ -209,8 +207,6 @@ func TestLogstashExporterProxyURL(t *testing.T) {
- %s
prospector.scanner.fingerprint.enabled: false
file_identity.native: ~
output:
otelconsumer:
processors:
- add_host_metadata: ~
- add_fields:
Expand Down
12 changes: 0 additions & 12 deletions x-pack/filebeat/tests/integration/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ func TestFilebeatOTelE2E(t *testing.T) {
- %s
prospector.scanner.fingerprint.enabled: false
file_identity.native: ~
output:
otelconsumer:
processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
Expand Down Expand Up @@ -249,8 +247,6 @@ processors:
- type: httpjson
id: httpjson-e2e-otel
request.url: http://localhost:8090/test
output:
otelconsumer:
processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
Expand Down Expand Up @@ -449,8 +445,6 @@ func TestFilebeatOTelReceiverE2E(t *testing.T) {
- {{.InputFile}}
prospector.scanner.fingerprint.enabled: false
file_identity.native: ~
output:
otelconsumer:
logging:
level: info
selectors:
Expand Down Expand Up @@ -638,8 +632,6 @@ func TestFilebeatOTelMultipleReceiversE2E(t *testing.T) {
- {{$receiver.InputFile}}
prospector.scanner.fingerprint.enabled: false
file_identity.native: ~
output:
otelconsumer:
logging:
level: info
selectors:
Expand Down Expand Up @@ -987,8 +979,6 @@ func TestFilebeatOTelDocumentLevelRetries(t *testing.T) {
- {{.InputFile}}
prospector.scanner.fingerprint.enabled: false
file_identity.native: ~
output:
otelconsumer:
logging:
level: debug
queue.mem.flush.timeout: 0s
Expand Down Expand Up @@ -1170,8 +1160,6 @@ func TestFileBeatKerberos(t *testing.T) {
- {{.InputFile}}
prospector.scanner.fingerprint.enabled: false
file_identity.native: ~
output:
otelconsumer:
queue.mem.flush.timeout: 0s
management.otel.enabled: true
path.home: {{.PathHome}}
Expand Down
81 changes: 46 additions & 35 deletions x-pack/libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,24 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an
ucfg.VarExp,
}

err = setLogger(b, receiverConfig, core)
if err != nil {
return nil, fmt.Errorf("error configuring beats logger: %w", err)
}

// extracting it here for ease of use
logger := b.Info.Logger

// if output is set and if output is not otelconsumer, inform users
if receiverConfig["output"] != nil && receiverConfig["output"].(map[string]any)["otelconsumer"] == nil { //nolint: errcheck // output will always be of map type
logger.Debugf("configured output does not work with beatreceiver, please use appropriate exporter instead")
}

// all beatreceivers will use otelconsumer output by default
receiverConfig["output"] = map[string]any{
"otelconsumer": map[string]any{},
}

tmp, err := ucfg.NewFrom(receiverConfig, cfOpts...)
if err != nil {
return nil, fmt.Errorf("error converting receiver config to ucfg: %w", err)
Expand Down Expand Up @@ -129,36 +147,12 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an
return nil, fmt.Errorf("error unpacking config data: %w", err)
}

logpConfig := logp.Config{}
logpConfig.AddCaller = true
logpConfig.Beat = b.Info.Beat
logpConfig.Files.MaxSize = 1

if b.Config.Logging == nil {
b.Config.Logging = config.NewConfig()
}

if err := b.Config.Logging.Unpack(&logpConfig); err != nil {
return nil, fmt.Errorf("error unpacking beats logging config: %w\n%v", err, b.Config.Logging)
}

b.Info.Logger, err = logp.ConfigureWithCoreLocal(logpConfig, core)
if err != nil {
return nil, fmt.Errorf("error configuring beats logp: %w", err)
}
// extracting it here for ease of use
logger := b.Info.Logger

instrumentation, err := instrumentation.New(cfg, b.Info.Beat, b.Info.Version, logger)
if err != nil {
return nil, fmt.Errorf("error setting up instrumentation: %w", err)
}
b.Instrumentation = instrumentation

if err := instance.PromoteOutputQueueSettings(b); err != nil {
return nil, fmt.Errorf("could not promote output queue settings: %w", err)
}

if err := features.UpdateFromConfig(b.RawConfig); err != nil {
return nil, fmt.Errorf("could not parse features: %w", err)
}
Expand Down Expand Up @@ -261,17 +255,6 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an
}
b.SetProcessors(processors)

// This should be replaced with static config for otel consumer
// but need to figure out if we want the Queue settings from here.
outputEnabled := b.Config.Output.IsSet() && b.Config.Output.Config().Enabled()
if !outputEnabled {
if b.Manager.Enabled() {
logger.Info("Output is configured through Central Management")
} else {
return nil, fmt.Errorf("no outputs are defined, please define one under the output section")
}
}

reg := b.Monitoring.StatsRegistry().GetOrCreateRegistry("libbeat")

monitors := pipeline.Monitors{
Expand All @@ -298,3 +281,31 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an

return b, nil
}

// setLogger configures a logp logger and sets it on b.Info.Logger
func setLogger(b *instance.Beat, receiverConfig map[string]any, core zapcore.Core) error {

var err error
logpConfig := logp.Config{}
logpConfig.AddCaller = true
logpConfig.Beat = b.Info.Beat
logpConfig.Files.MaxSize = 1

var logCfg *config.C
if _, ok := receiverConfig["logging"]; !ok {
logCfg = config.NewConfig()
} else {
logCfg = config.MustNewConfigFrom(receiverConfig["logging"])
}

if err := logCfg.Unpack(&logpConfig); err != nil {
return fmt.Errorf("error unpacking beats logging config: %w\n%v", err, b.Config.Logging)
}

b.Info.Logger, err = logp.ConfigureWithCoreLocal(logpConfig, core)
if err != nil {
return fmt.Errorf("error configuring beats logp: %w", err)
}

return nil
}
3 changes: 0 additions & 3 deletions x-pack/libbeat/cmd/instance/beat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@ func TestManager(t *testing.T) {
},
},
},
"output": map[string]any{
"otelconsumer": map[string]any{},
},
"path.home": tmpDir,
}
t.Run("otel management disabled - key missing", func(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ func TestNew(t *testing.T) {
message: "test message"
count: 1
processors: ~
output:
otelconsumer:
logging:
level: debug
queue.mem.flush.timeout: 0s
Expand Down
3 changes: 0 additions & 3 deletions x-pack/metricbeat/mbreceiver/receiver_leak_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@ func TestLeak(t *testing.T) {
},
},
},
"output": map[string]any{
"otelconsumer": map[string]any{},
},
"logging": map[string]any{
"level": "debug",
"selectors": []string{
Expand Down
20 changes: 1 addition & 19 deletions x-pack/metricbeat/mbreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@ func TestNewReceiver(t *testing.T) {
},
},
},
"output": map[string]any{
"otelconsumer": map[string]any{},
},
"logging": map[string]any{
"level": "debug",
"selectors": []string{
Expand Down Expand Up @@ -139,9 +136,6 @@ func TestMultipleReceivers(t *testing.T) {
},
},
},
"output": map[string]any{
"otelconsumer": map[string]any{},
},
"logging": map[string]any{
"level": "debug",
"selectors": []string{
Expand All @@ -167,9 +161,6 @@ func TestMultipleReceivers(t *testing.T) {
},
},
},
"output": map[string]any{
"otelconsumer": map[string]any{},
},
"logging": map[string]any{
"level": "debug",
"selectors": []string{
Expand Down Expand Up @@ -336,9 +327,6 @@ func BenchmarkFactory(b *testing.B) {
},
},
},
"output": map[string]any{
"otelconsumer": map[string]any{},
},
"logging": map[string]any{
"level": "info",
"selectors": []string{
Expand Down Expand Up @@ -407,9 +395,6 @@ func TestReceiverDegraded(t *testing.T) {
},
},
},
"output": map[string]any{
"otelconsumer": map[string]any{},
},
"path.home": t.TempDir(),
},
}
Expand Down Expand Up @@ -444,10 +429,7 @@ func TestReceiverHook(t *testing.T) {
},
},
"management.otel.enabled": true,
"output": map[string]any{
"otelconsumer": map[string]any{},
},
"path.home": t.TempDir(),
"path.home": t.TempDir(),
},
}
receiverSettings := receiver.Settings{
Expand Down
6 changes: 0 additions & 6 deletions x-pack/metricbeat/tests/integration/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ func TestMetricbeatOTelE2E(t *testing.T) {
- '.*'
metricsets:
- cpu
output:
otelconsumer:
processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
Expand Down Expand Up @@ -265,8 +263,6 @@ func TestMetricbeatOTelReceiverE2E(t *testing.T) {
- '.*'
metricsets:
- cpu
output:
otelconsumer:
processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
Expand Down Expand Up @@ -461,8 +457,6 @@ func TestMetricbeatOTelMultipleReceiversE2E(t *testing.T) {
target: ''
fields:
receiverid: "{{$i}}"
output:
otelconsumer:
logging:
level: info
selectors:
Expand Down
Loading