Skip to content

Commit 0dfc329

Browse files
VihasMakwanaVihasMakwana
authored andcommitted
move processor initialization in Start
1 parent f3a2ea1 commit 0dfc329

File tree

6 files changed

+69
-54
lines changed

6 files changed

+69
-54
lines changed

x-pack/filebeat/fbreceiver/factory.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func createReceiver(ctx context.Context, set receiver.Settings, baseCfg componen
4545
settings.ElasticLicensed = true
4646
settings.Initialize = append(settings.Initialize, include.InitializeModule)
4747

48-
b, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, consumer, set.ID.String(), set.Logger.Core())
48+
b, cb, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, consumer, set.ID.String(), set.Logger.Core())
4949
if err != nil {
5050
return nil, fmt.Errorf("error creating %s: %w", Name, err)
5151
}
@@ -56,7 +56,7 @@ func createReceiver(ctx context.Context, set receiver.Settings, baseCfg componen
5656
return nil, fmt.Errorf("error creating %s:%w", Name, err)
5757
}
5858

59-
return &filebeatReceiver{BeatReceiver: br}, nil
59+
return &filebeatReceiver{BeatReceiver: br, cb: cb}, nil
6060
}
6161

6262
// copied from filebeat cmd.

x-pack/filebeat/fbreceiver/receiver.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,13 @@ import (
1818
type filebeatReceiver struct {
1919
xpInstance.BeatReceiver
2020
wg sync.WaitGroup
21+
cb xpInstance.Callback
2122
}
2223

2324
func (fb *filebeatReceiver) Start(ctx context.Context, host component.Host) error {
25+
if err := fb.cb(host); err != nil {
26+
return err
27+
}
2428
fb.wg.Add(1)
2529
go func() {
2630
defer fb.wg.Done()

x-pack/libbeat/cmd/instance/beat.go

Lines changed: 54 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import (
99
"fmt"
1010
"time"
1111

12+
"go.opentelemetry.io/collector/component"
13+
"go.opentelemetry.io/collector/component/componentstatus"
1214
"go.opentelemetry.io/collector/consumer"
1315
"go.uber.org/zap/zapcore"
1416

@@ -33,28 +35,30 @@ import (
3335
"github.com/elastic/go-ucfg"
3436
)
3537

38+
type Callback func(component.Host) error
39+
3640
// This is the timeout for the beat's internal publishing pipeline to close when shutting down the receiver. Closing
3741
// requires flushing the event queue, and if this doesn't happen within the timeout, data may be lost depending on
3842
// input type.
3943
const receiverPublisherCloseTimeout = 5 * time.Second
4044

4145
// NewBeatForReceiver creates a Beat that will be used in the context of an otel receiver
42-
func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]any, consumer consumer.Logs, componentID string, core zapcore.Core) (*instance.Beat, error) {
46+
func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]any, consumer consumer.Logs, componentID string, core zapcore.Core) (*instance.Beat, Callback, error) {
4347
b, err := instance.NewBeat(settings.Name,
4448
settings.IndexPrefix,
4549
settings.Version,
4650
settings.ElasticLicensed,
4751
settings.Initialize)
4852
if err != nil {
49-
return nil, err
53+
return nil, nil, err
5054
}
5155

5256
b.Info.ComponentID = componentID
5357
b.Info.LogConsumer = consumer
5458

5559
// begin code similar to configure
5660
if err = plugin.Initialize(); err != nil {
57-
return nil, fmt.Errorf("error initializing plugins: %w", err)
61+
return nil, nil, fmt.Errorf("error initializing plugins: %w", err)
5862
}
5963

6064
b.InputQueueSize = settings.InputQueueSize
@@ -67,7 +71,7 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an
6771

6872
tmp, err := ucfg.NewFrom(receiverConfig, cfOpts...)
6973
if err != nil {
70-
return nil, fmt.Errorf("error converting receiver config to ucfg: %w", err)
74+
return nil, nil, fmt.Errorf("error converting receiver config to ucfg: %w", err)
7175
}
7276

7377
cfg := (*config.C)(tmp)
@@ -77,16 +81,16 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an
7781
}{}
7882

7983
if err := cfg.Unpack(&partialConfig); err != nil {
80-
return nil, fmt.Errorf("error extracting default paths: %w", err)
84+
return nil, nil, fmt.Errorf("error extracting default paths: %w", err)
8185
}
8286
p := paths.New()
8387
if err := p.InitPaths(&partialConfig.Path); err != nil {
84-
return nil, fmt.Errorf("error initializing default paths: %w", err)
88+
return nil, nil, fmt.Errorf("error initializing default paths: %w", err)
8589
}
8690
b.Paths = p
8791
} else {
8892
if err := instance.InitPaths(cfg); err != nil {
89-
return nil, fmt.Errorf("error initializing paths: %w", err)
93+
return nil, nil, fmt.Errorf("error initializing paths: %w", err)
9094
}
9195
b.Paths = paths.Paths
9296
}
@@ -95,7 +99,7 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an
9599
// options.
96100
store, err := instance.LoadKeystore(cfg, b.Info.Beat, b.Paths)
97101
if err != nil {
98-
return nil, fmt.Errorf("could not initialize the keystore: %w", err)
102+
return nil, nil, fmt.Errorf("could not initialize the keystore: %w", err)
99103
}
100104

101105
if settings.DisableConfigResolver {
@@ -120,13 +124,13 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an
120124
b.Beat.Keystore = store
121125
err = cloudid.OverwriteSettings(cfg)
122126
if err != nil {
123-
return nil, fmt.Errorf("error overwriting cloudid settings: %w", err)
127+
return nil, nil, fmt.Errorf("error overwriting cloudid settings: %w", err)
124128
}
125129

126130
b.RawConfig = cfg
127131
err = cfg.Unpack(&b.Config)
128132
if err != nil {
129-
return nil, fmt.Errorf("error unpacking config data: %w", err)
133+
return nil, nil, fmt.Errorf("error unpacking config data: %w", err)
130134
}
131135

132136
logpConfig := logp.Config{}
@@ -139,28 +143,28 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an
139143
}
140144

141145
if err := b.Config.Logging.Unpack(&logpConfig); err != nil {
142-
return nil, fmt.Errorf("error unpacking beats logging config: %w\n%v", err, b.Config.Logging)
146+
return nil, nil, fmt.Errorf("error unpacking beats logging config: %w\n%v", err, b.Config.Logging)
143147
}
144148

145149
b.Info.Logger, err = logp.ConfigureWithCoreLocal(logpConfig, core)
146150
if err != nil {
147-
return nil, fmt.Errorf("error configuring beats logp: %w", err)
151+
return nil, nil, fmt.Errorf("error configuring beats logp: %w", err)
148152
}
149153
// extracting it here for ease of use
150154
logger := b.Info.Logger
151155

152156
instrumentation, err := instrumentation.New(cfg, b.Info.Beat, b.Info.Version, logger)
153157
if err != nil {
154-
return nil, fmt.Errorf("error setting up instrumentation: %w", err)
158+
return nil, nil, fmt.Errorf("error setting up instrumentation: %w", err)
155159
}
156160
b.Instrumentation = instrumentation
157161

158162
if err := instance.PromoteOutputQueueSettings(b); err != nil {
159-
return nil, fmt.Errorf("could not promote output queue settings: %w", err)
163+
return nil, nil, fmt.Errorf("could not promote output queue settings: %w", err)
160164
}
161165

162166
if err := features.UpdateFromConfig(b.RawConfig); err != nil {
163-
return nil, fmt.Errorf("could not parse features: %w", err)
167+
return nil, nil, fmt.Errorf("could not parse features: %w", err)
164168
}
165169
b.RegisterHostname(features.FQDN())
166170

@@ -171,7 +175,7 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an
171175
}
172176

173177
if err := common.SetTimestampPrecision(b.Config.TimestampPrecision); err != nil {
174-
return nil, fmt.Errorf("error setting timestamp precision: %w", err)
178+
return nil, nil, fmt.Errorf("error setting timestamp precision: %w", err)
175179
}
176180

177181
// log paths values to help with troubleshooting
@@ -180,15 +184,15 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an
180184
metaPath := b.Paths.Resolve(paths.Data, "meta.json")
181185
err = b.LoadMeta(metaPath)
182186
if err != nil {
183-
return nil, fmt.Errorf("error loading meta data: %w", err)
187+
return nil, nil, fmt.Errorf("error loading meta data: %w", err)
184188
}
185189

186190
logger.Infof("Beat ID: %v", b.Info.ID)
187191

188192
// Try to get the host's FQDN and set it.
189193
h, err := sysinfo.Host()
190194
if err != nil {
191-
return nil, fmt.Errorf("failed to get host information: %w", err)
195+
return nil, nil, fmt.Errorf("failed to get host information: %w", err)
192196
}
193197

194198
fqdnLookupCtx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
@@ -211,7 +215,7 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an
211215
oCfg, _ := cfg.Child("management.otel", -1)
212216
m, err := management.NewManager(oCfg, b.Registry, logger)
213217
if err != nil {
214-
return nil, fmt.Errorf("error creating new manager: %w", err)
218+
return nil, nil, fmt.Errorf("error creating new manager: %w", err)
215219
}
216220
b.Manager = m
217221

@@ -233,12 +237,12 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an
233237
b.GenerateUserAgent()
234238

235239
if err := b.Manager.CheckRawConfig(b.RawConfig); err != nil {
236-
return nil, fmt.Errorf("error checking raw config: %w", err)
240+
return nil, nil, fmt.Errorf("error checking raw config: %w", err)
237241
}
238242

239243
b.Beat.BeatConfig, err = b.BeatConfig()
240244
if err != nil {
241-
return nil, fmt.Errorf("error setting BeatConfig: %w", err)
245+
return nil, nil, fmt.Errorf("error setting BeatConfig: %w", err)
242246
}
243247

244248
imFactory := settings.IndexManagement
@@ -247,28 +251,17 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an
247251
}
248252
b.IdxSupporter, err = imFactory(logger, b.Info, b.RawConfig)
249253
if err != nil {
250-
return nil, fmt.Errorf("error setting index supporter: %w", err)
251-
}
252-
253-
processingFactory := settings.Processing
254-
if processingFactory == nil {
255-
processingFactory = processing.MakeDefaultBeatSupport(true)
254+
return nil, nil, fmt.Errorf("error setting index supporter: %w", err)
256255
}
257256

258-
processors, err := processingFactory(b.Info, logger.Named("processors"), b.RawConfig)
259-
if err != nil {
260-
return nil, fmt.Errorf("error creating processors: %w", err)
261-
}
262-
b.SetProcessors(processors)
263-
264257
// This should be replaced with static config for otel consumer
265258
// but need to figure out if we want the Queue settings from here.
266259
outputEnabled := b.Config.Output.IsSet() && b.Config.Output.Config().Enabled()
267260
if !outputEnabled {
268261
if b.Manager.Enabled() {
269262
logger.Info("Output is configured through Central Management")
270263
} else {
271-
return nil, fmt.Errorf("no outputs are defined, please define one under the output section")
264+
return nil, nil, fmt.Errorf("no outputs are defined, please define one under the output section")
272265
}
273266
}
274267

@@ -283,18 +276,32 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an
283276

284277
outputFactory := b.MakeOutputFactory(b.Config.Output)
285278

286-
pipelineSettings := pipeline.Settings{
287-
Processors: b.GetProcessors(),
288-
InputQueueSize: b.InputQueueSize,
289-
WaitCloseMode: pipeline.WaitOnPipelineCloseThenForce,
290-
WaitClose: receiverPublisherCloseTimeout,
291-
}
292-
publisher, err := pipeline.LoadWithSettings(b.Info, monitors, b.Config.Pipeline, outputFactory, pipelineSettings)
293-
if err != nil {
294-
return nil, fmt.Errorf("error initializing publisher: %w", err)
295-
}
296-
b.Registry.MustRegisterOutput(b.MakeOutputReloader(publisher.OutputReloader()))
297-
b.Publisher = publisher
279+
return b, func(h component.Host) error {
280+
processingFactory := settings.Processing
281+
if processingFactory == nil {
282+
processingFactory = processing.MakeDefaultBeatSupport(true)
283+
}
284+
285+
processors, err := processingFactory(b.Info, logger.Named("processors"), b.RawConfig)
286+
if err != nil {
287+
err := fmt.Errorf("error creating processors: %w", err)
288+
componentstatus.ReportStatus(h, componentstatus.NewFatalErrorEvent(err))
289+
return err
290+
}
291+
b.SetProcessors(processors)
292+
pipelineSettings := pipeline.Settings{
293+
Processors: b.GetProcessors(),
294+
InputQueueSize: b.InputQueueSize,
295+
WaitCloseMode: pipeline.WaitOnPipelineCloseThenForce,
296+
WaitClose: receiverPublisherCloseTimeout,
297+
}
298+
publisher, err := pipeline.LoadWithSettings(b.Info, monitors, b.Config.Pipeline, outputFactory, pipelineSettings)
299+
if err != nil {
300+
return fmt.Errorf("error initializing publisher: %w", err)
301+
}
302+
b.Registry.MustRegisterOutput(b.MakeOutputReloader(publisher.OutputReloader()))
303+
b.Publisher = publisher
298304

299-
return b, nil
305+
return nil
306+
}, nil
300307
}

x-pack/libbeat/cmd/instance/beat_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func TestManager(t *testing.T) {
3939
"path.home": tmpDir,
4040
}
4141
t.Run("otel management disabled - key missing", func(t *testing.T) {
42-
beat, err := NewBeatForReceiver(cmd.FilebeatSettings("filebeat"), cfg, consumertest.NewNop(), "testcomponent", zapcore.NewNopCore())
42+
beat, _, err := NewBeatForReceiver(cmd.FilebeatSettings("filebeat"), cfg, consumertest.NewNop(), "testcomponent", zapcore.NewNopCore())
4343
assert.NoError(t, err)
4444
assert.NotNil(t, beat.Manager)
4545
// it should fallback to FallbackManager if key is missing
@@ -53,7 +53,7 @@ func TestManager(t *testing.T) {
5353
defer func() {
5454
management.SetUnderAgent(false) // reset to false
5555
}()
56-
beat, err := NewBeatForReceiver(cmd.FilebeatSettings("filebeat"), tmpCfg, consumertest.NewNop(), "testcomponent", zapcore.NewNopCore())
56+
beat, _, err := NewBeatForReceiver(cmd.FilebeatSettings("filebeat"), tmpCfg, consumertest.NewNop(), "testcomponent", zapcore.NewNopCore())
5757
assert.NoError(t, err)
5858
assert.NotNil(t, beat.Manager)
5959
assert.IsType(t, beat.Manager, &otelmanager.OtelManager{})
@@ -72,7 +72,7 @@ type: "log"`)
7272
defer func() {
7373
management.SetUnderAgent(false) // reset to false
7474
}()
75-
beat, err := NewBeatForReceiver(cmd.FilebeatSettings("filebeat"), tmpCfg, consumertest.NewNop(), "testcomponent", zapcore.NewNopCore())
75+
beat, _, err := NewBeatForReceiver(cmd.FilebeatSettings("filebeat"), tmpCfg, consumertest.NewNop(), "testcomponent", zapcore.NewNopCore())
7676
assert.NoError(t, err)
7777
assert.NotNil(t, beat.Manager)
7878
assert.IsType(t, beat.Manager, &management.FallbackManager{})

x-pack/metricbeat/mbreceiver/factory.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func createReceiver(ctx context.Context, set receiver.Settings, baseCfg componen
5151
settings.ElasticLicensed = true
5252
settings.Initialize = append(settings.Initialize, include.InitializeModule)
5353

54-
b, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, consumer, set.ID.String(), set.Logger.Core())
54+
b, cb, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, consumer, set.ID.String(), set.Logger.Core())
5555
if err != nil {
5656
return nil, fmt.Errorf("error creating %s: %w", Name, err)
5757
}
@@ -61,7 +61,7 @@ func createReceiver(ctx context.Context, set receiver.Settings, baseCfg componen
6161
if err != nil {
6262
return nil, fmt.Errorf("error creating %s: %w", Name, err)
6363
}
64-
return &metricbeatReceiver{BeatReceiver: br}, nil
64+
return &metricbeatReceiver{BeatReceiver: br, cb: cb}, nil
6565
}
6666

6767
// copied from metricbeat cmd.

x-pack/metricbeat/mbreceiver/receiver.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,13 @@ import (
1818
type metricbeatReceiver struct {
1919
xpInstance.BeatReceiver
2020
wg sync.WaitGroup
21+
cb xpInstance.Callback
2122
}
2223

2324
func (mb *metricbeatReceiver) Start(ctx context.Context, host component.Host) error {
25+
if err := mb.cb(host); err != nil {
26+
return err
27+
}
2428
mb.wg.Add(1)
2529
go func() {
2630
defer mb.wg.Done()

0 commit comments

Comments
 (0)