Skip to content

Commit b80d1e9

Browse files
authored
[beatreceiver] Do not require specifiying otelconsumer output (#47693)
* do not require explicit config
1 parent 183016c commit b80d1e9

File tree

12 files changed

+47
-117
lines changed

12 files changed

+47
-117
lines changed

x-pack/filebeat/fbreceiver/receiver_leak_test.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,6 @@ func TestLeak(t *testing.T) {
5050
},
5151
},
5252
},
53-
"output": map[string]any{
54-
"otelconsumer": map[string]any{},
55-
},
5653
"logging": map[string]any{
5754
"level": "debug",
5855
"selectors": []string{

x-pack/filebeat/fbreceiver/receiver_test.go

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,6 @@ func TestNewReceiver(t *testing.T) {
6464
},
6565
},
6666
},
67-
"output": map[string]any{
68-
"otelconsumer": map[string]any{},
69-
},
7067
"logging": map[string]any{
7168
"level": "debug",
7269
"selectors": []string{
@@ -139,9 +136,6 @@ func benchmarkFactoryWithLogLevel(b *testing.B, level zapcore.Level) {
139136
},
140137
},
141138
},
142-
"output": map[string]any{
143-
"otelconsumer": map[string]any{},
144-
},
145139
"logging": map[string]any{
146140
"level": level.String(),
147141
"selectors": []string{
@@ -203,9 +197,6 @@ func TestMultipleReceivers(t *testing.T) {
203197
},
204198
},
205199
},
206-
"output": map[string]any{
207-
"otelconsumer": map[string]any{},
208-
},
209200
"logging": map[string]any{
210201
"level": "info",
211202
"selectors": []string{
@@ -370,9 +361,6 @@ func TestReceiverDegraded(t *testing.T) {
370361
},
371362
},
372363
},
373-
"output": map[string]any{
374-
"otelconsumer": map[string]any{},
375-
},
376364
"logging": map[string]any{
377365
"level": "debug",
378366
"selectors": []string{
@@ -569,9 +557,6 @@ func TestConsumeContract(t *testing.T) {
569557
},
570558
},
571559
},
572-
"output": map[string]any{
573-
"otelconsumer": map[string]any{},
574-
},
575560
"logging": map[string]any{
576561
"level": "debug",
577562
"selectors": []string{
@@ -607,9 +592,6 @@ func TestReceiverHook(t *testing.T) {
607592
},
608593
},
609594
},
610-
"output": map[string]any{
611-
"otelconsumer": map[string]any{},
612-
},
613595
"management.otel.enabled": true,
614596
"path.home": t.TempDir(),
615597
},

x-pack/filebeat/tests/integration/otel_lsexporter_test.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,6 @@ processors:
103103
- %s
104104
prospector.scanner.fingerprint.enabled: false
105105
file_identity.native: ~
106-
output:
107-
otelconsumer:
108106
processors:
109107
- add_host_metadata: ~
110108
- add_fields:
@@ -209,8 +207,6 @@ func TestLogstashExporterProxyURL(t *testing.T) {
209207
- %s
210208
prospector.scanner.fingerprint.enabled: false
211209
file_identity.native: ~
212-
output:
213-
otelconsumer:
214210
processors:
215211
- add_host_metadata: ~
216212
- add_fields:

x-pack/filebeat/tests/integration/otel_test.go

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,6 @@ func TestFilebeatOTelE2E(t *testing.T) {
6565
- %s
6666
prospector.scanner.fingerprint.enabled: false
6767
file_identity.native: ~
68-
output:
69-
otelconsumer:
7068
processors:
7169
- add_host_metadata: ~
7270
- add_cloud_metadata: ~
@@ -249,8 +247,6 @@ processors:
249247
- type: httpjson
250248
id: httpjson-e2e-otel
251249
request.url: http://localhost:8090/test
252-
output:
253-
otelconsumer:
254250
processors:
255251
- add_host_metadata: ~
256252
- add_cloud_metadata: ~
@@ -449,8 +445,6 @@ func TestFilebeatOTelReceiverE2E(t *testing.T) {
449445
- {{.InputFile}}
450446
prospector.scanner.fingerprint.enabled: false
451447
file_identity.native: ~
452-
output:
453-
otelconsumer:
454448
logging:
455449
level: info
456450
selectors:
@@ -638,8 +632,6 @@ func TestFilebeatOTelMultipleReceiversE2E(t *testing.T) {
638632
- {{$receiver.InputFile}}
639633
prospector.scanner.fingerprint.enabled: false
640634
file_identity.native: ~
641-
output:
642-
otelconsumer:
643635
logging:
644636
level: info
645637
selectors:
@@ -987,8 +979,6 @@ func TestFilebeatOTelDocumentLevelRetries(t *testing.T) {
987979
- {{.InputFile}}
988980
prospector.scanner.fingerprint.enabled: false
989981
file_identity.native: ~
990-
output:
991-
otelconsumer:
992982
logging:
993983
level: debug
994984
queue.mem.flush.timeout: 0s
@@ -1170,8 +1160,6 @@ func TestFileBeatKerberos(t *testing.T) {
11701160
- {{.InputFile}}
11711161
prospector.scanner.fingerprint.enabled: false
11721162
file_identity.native: ~
1173-
output:
1174-
otelconsumer:
11751163
queue.mem.flush.timeout: 0s
11761164
management.otel.enabled: true
11771165
path.home: {{.PathHome}}

x-pack/libbeat/cmd/instance/beat.go

Lines changed: 46 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,24 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an
6565
ucfg.VarExp,
6666
}
6767

68+
err = setLogger(b, receiverConfig, core)
69+
if err != nil {
70+
return nil, fmt.Errorf("error configuring beats logger: %w", err)
71+
}
72+
73+
// extracting it here for ease of use
74+
logger := b.Info.Logger
75+
76+
// if output is set and if output is not otelconsumer, inform users
77+
if receiverConfig["output"] != nil && receiverConfig["output"].(map[string]any)["otelconsumer"] == nil { //nolint: errcheck // output will always be of map type
78+
logger.Debugf("configured output does not work with beatreceiver, please use appropriate exporter instead")
79+
}
80+
81+
// all beatreceivers will use otelconsumer output by default
82+
receiverConfig["output"] = map[string]any{
83+
"otelconsumer": map[string]any{},
84+
}
85+
6886
tmp, err := ucfg.NewFrom(receiverConfig, cfOpts...)
6987
if err != nil {
7088
return nil, fmt.Errorf("error converting receiver config to ucfg: %w", err)
@@ -129,36 +147,12 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an
129147
return nil, fmt.Errorf("error unpacking config data: %w", err)
130148
}
131149

132-
logpConfig := logp.Config{}
133-
logpConfig.AddCaller = true
134-
logpConfig.Beat = b.Info.Beat
135-
logpConfig.Files.MaxSize = 1
136-
137-
if b.Config.Logging == nil {
138-
b.Config.Logging = config.NewConfig()
139-
}
140-
141-
if err := b.Config.Logging.Unpack(&logpConfig); err != nil {
142-
return nil, fmt.Errorf("error unpacking beats logging config: %w\n%v", err, b.Config.Logging)
143-
}
144-
145-
b.Info.Logger, err = logp.ConfigureWithCoreLocal(logpConfig, core)
146-
if err != nil {
147-
return nil, fmt.Errorf("error configuring beats logp: %w", err)
148-
}
149-
// extracting it here for ease of use
150-
logger := b.Info.Logger
151-
152150
instrumentation, err := instrumentation.New(cfg, b.Info.Beat, b.Info.Version, logger)
153151
if err != nil {
154152
return nil, fmt.Errorf("error setting up instrumentation: %w", err)
155153
}
156154
b.Instrumentation = instrumentation
157155

158-
if err := instance.PromoteOutputQueueSettings(b); err != nil {
159-
return nil, fmt.Errorf("could not promote output queue settings: %w", err)
160-
}
161-
162156
if err := features.UpdateFromConfig(b.RawConfig); err != nil {
163157
return nil, fmt.Errorf("could not parse features: %w", err)
164158
}
@@ -261,17 +255,6 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an
261255
}
262256
b.SetProcessors(processors)
263257

264-
// This should be replaced with static config for otel consumer
265-
// but need to figure out if we want the Queue settings from here.
266-
outputEnabled := b.Config.Output.IsSet() && b.Config.Output.Config().Enabled()
267-
if !outputEnabled {
268-
if b.Manager.Enabled() {
269-
logger.Info("Output is configured through Central Management")
270-
} else {
271-
return nil, fmt.Errorf("no outputs are defined, please define one under the output section")
272-
}
273-
}
274-
275258
reg := b.Monitoring.StatsRegistry().GetOrCreateRegistry("libbeat")
276259

277260
monitors := pipeline.Monitors{
@@ -298,3 +281,31 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an
298281

299282
return b, nil
300283
}
284+
285+
// setLogger configures a logp logger and sets it on b.Info.Logger
286+
func setLogger(b *instance.Beat, receiverConfig map[string]any, core zapcore.Core) error {
287+
288+
var err error
289+
logpConfig := logp.Config{}
290+
logpConfig.AddCaller = true
291+
logpConfig.Beat = b.Info.Beat
292+
logpConfig.Files.MaxSize = 1
293+
294+
var logCfg *config.C
295+
if _, ok := receiverConfig["logging"]; !ok {
296+
logCfg = config.NewConfig()
297+
} else {
298+
logCfg = config.MustNewConfigFrom(receiverConfig["logging"])
299+
}
300+
301+
if err := logCfg.Unpack(&logpConfig); err != nil {
302+
return fmt.Errorf("error unpacking beats logging config: %w\n%v", err, b.Config.Logging)
303+
}
304+
305+
b.Info.Logger, err = logp.ConfigureWithCoreLocal(logpConfig, core)
306+
if err != nil {
307+
return fmt.Errorf("error configuring beats logp: %w", err)
308+
}
309+
310+
return nil
311+
}

x-pack/libbeat/cmd/instance/beat_test.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,6 @@ func TestManager(t *testing.T) {
3333
},
3434
},
3535
},
36-
"output": map[string]any{
37-
"otelconsumer": map[string]any{},
38-
},
3936
"path.home": tmpDir,
4037
}
4138
t.Run("otel management disabled - key missing", func(t *testing.T) {

x-pack/libbeat/common/otelbeat/oteltestcol/collector_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ func TestNew(t *testing.T) {
2121
message: "test message"
2222
count: 1
2323
processors: ~
24-
output:
25-
otelconsumer:
2624
logging:
2725
level: debug
2826
queue.mem.flush.timeout: 0s

x-pack/metricbeat/mbreceiver/receiver_leak_test.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,6 @@ func TestLeak(t *testing.T) {
4545
},
4646
},
4747
},
48-
"output": map[string]any{
49-
"otelconsumer": map[string]any{},
50-
},
5148
"logging": map[string]any{
5249
"level": "debug",
5350
"selectors": []string{

x-pack/metricbeat/mbreceiver/receiver_test.go

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,6 @@ func TestNewReceiver(t *testing.T) {
5656
},
5757
},
5858
},
59-
"output": map[string]any{
60-
"otelconsumer": map[string]any{},
61-
},
6259
"logging": map[string]any{
6360
"level": "debug",
6461
"selectors": []string{
@@ -139,9 +136,6 @@ func TestMultipleReceivers(t *testing.T) {
139136
},
140137
},
141138
},
142-
"output": map[string]any{
143-
"otelconsumer": map[string]any{},
144-
},
145139
"logging": map[string]any{
146140
"level": "debug",
147141
"selectors": []string{
@@ -167,9 +161,6 @@ func TestMultipleReceivers(t *testing.T) {
167161
},
168162
},
169163
},
170-
"output": map[string]any{
171-
"otelconsumer": map[string]any{},
172-
},
173164
"logging": map[string]any{
174165
"level": "debug",
175166
"selectors": []string{
@@ -336,9 +327,6 @@ func BenchmarkFactory(b *testing.B) {
336327
},
337328
},
338329
},
339-
"output": map[string]any{
340-
"otelconsumer": map[string]any{},
341-
},
342330
"logging": map[string]any{
343331
"level": "info",
344332
"selectors": []string{
@@ -407,9 +395,6 @@ func TestReceiverDegraded(t *testing.T) {
407395
},
408396
},
409397
},
410-
"output": map[string]any{
411-
"otelconsumer": map[string]any{},
412-
},
413398
"path.home": t.TempDir(),
414399
},
415400
}
@@ -444,10 +429,7 @@ func TestReceiverHook(t *testing.T) {
444429
},
445430
},
446431
"management.otel.enabled": true,
447-
"output": map[string]any{
448-
"otelconsumer": map[string]any{},
449-
},
450-
"path.home": t.TempDir(),
432+
"path.home": t.TempDir(),
451433
},
452434
}
453435
receiverSettings := receiver.Settings{

x-pack/metricbeat/tests/integration/otel_test.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,6 @@ func TestMetricbeatOTelE2E(t *testing.T) {
6969
- '.*'
7070
metricsets:
7171
- cpu
72-
output:
73-
otelconsumer:
7472
processors:
7573
- add_host_metadata: ~
7674
- add_cloud_metadata: ~
@@ -265,8 +263,6 @@ func TestMetricbeatOTelReceiverE2E(t *testing.T) {
265263
- '.*'
266264
metricsets:
267265
- cpu
268-
output:
269-
otelconsumer:
270266
processors:
271267
- add_host_metadata: ~
272268
- add_cloud_metadata: ~
@@ -461,8 +457,6 @@ func TestMetricbeatOTelMultipleReceiversE2E(t *testing.T) {
461457
target: ''
462458
fields:
463459
receiverid: "{{$i}}"
464-
output:
465-
otelconsumer:
466460
logging:
467461
level: info
468462
selectors:

0 commit comments

Comments
 (0)