Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions x-pack/filebeat/fbreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func createReceiver(ctx context.Context, set receiver.Settings, baseCfg componen
settings.ElasticLicensed = true
settings.Initialize = append(settings.Initialize, include.InitializeModule)

b, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, consumer, set.ID.String(), set.Logger.Core())
b, cb, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, consumer, set.ID.String(), set.Logger.Core())
if err != nil {
return nil, fmt.Errorf("error creating %s: %w", Name, err)
}
Expand All @@ -56,7 +56,7 @@ func createReceiver(ctx context.Context, set receiver.Settings, baseCfg componen
return nil, fmt.Errorf("error creating %s:%w", Name, err)
}

return &filebeatReceiver{BeatReceiver: br}, nil
return &filebeatReceiver{BeatReceiver: br, cb: cb}, nil
}

// copied from filebeat cmd.
Expand Down
4 changes: 4 additions & 0 deletions x-pack/filebeat/fbreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@ import (
type filebeatReceiver struct {
xpInstance.BeatReceiver
wg sync.WaitGroup
cb xpInstance.Callback
}

func (fb *filebeatReceiver) Start(ctx context.Context, host component.Host) error {
if err := fb.cb(host); err != nil {
return err
}
fb.wg.Add(1)
go func() {
defer fb.wg.Done()
Expand Down
101 changes: 54 additions & 47 deletions x-pack/libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"fmt"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/consumer"
"go.uber.org/zap/zapcore"

Expand All @@ -33,28 +35,30 @@ import (
"github.com/elastic/go-ucfg"
)

type Callback func(component.Host) error

// This is the timeout for the beat's internal publishing pipeline to close when shutting down the receiver. Closing
// requires flushing the event queue, and if this doesn't happen within the timeout, data may be lost depending on
// input type.
const receiverPublisherCloseTimeout = 5 * time.Second

// NewBeatForReceiver creates a Beat that will be used in the context of an otel receiver
func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]any, consumer consumer.Logs, componentID string, core zapcore.Core) (*instance.Beat, error) {
func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]any, consumer consumer.Logs, componentID string, core zapcore.Core) (*instance.Beat, Callback, error) {
b, err := instance.NewBeat(settings.Name,
settings.IndexPrefix,
settings.Version,
settings.ElasticLicensed,
settings.Initialize)
if err != nil {
return nil, err
return nil, nil, err
}

b.Info.ComponentID = componentID
b.Info.LogConsumer = consumer

// begin code similar to configure
if err = plugin.Initialize(); err != nil {
return nil, fmt.Errorf("error initializing plugins: %w", err)
return nil, nil, fmt.Errorf("error initializing plugins: %w", err)
}

b.InputQueueSize = settings.InputQueueSize
Expand All @@ -67,7 +71,7 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an

tmp, err := ucfg.NewFrom(receiverConfig, cfOpts...)
if err != nil {
return nil, fmt.Errorf("error converting receiver config to ucfg: %w", err)
return nil, nil, fmt.Errorf("error converting receiver config to ucfg: %w", err)
}

cfg := (*config.C)(tmp)
Expand All @@ -77,16 +81,16 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an
}{}

if err := cfg.Unpack(&partialConfig); err != nil {
return nil, fmt.Errorf("error extracting default paths: %w", err)
return nil, nil, fmt.Errorf("error extracting default paths: %w", err)
}
p := paths.New()
if err := p.InitPaths(&partialConfig.Path); err != nil {
return nil, fmt.Errorf("error initializing default paths: %w", err)
return nil, nil, fmt.Errorf("error initializing default paths: %w", err)
}
b.Paths = p
} else {
if err := instance.InitPaths(cfg); err != nil {
return nil, fmt.Errorf("error initializing paths: %w", err)
return nil, nil, fmt.Errorf("error initializing paths: %w", err)
}
b.Paths = paths.Paths
}
Expand All @@ -95,7 +99,7 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an
// options.
store, err := instance.LoadKeystore(cfg, b.Info.Beat, b.Paths)
if err != nil {
return nil, fmt.Errorf("could not initialize the keystore: %w", err)
return nil, nil, fmt.Errorf("could not initialize the keystore: %w", err)
}

if settings.DisableConfigResolver {
Expand All @@ -120,13 +124,13 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an
b.Beat.Keystore = store
err = cloudid.OverwriteSettings(cfg)
if err != nil {
return nil, fmt.Errorf("error overwriting cloudid settings: %w", err)
return nil, nil, fmt.Errorf("error overwriting cloudid settings: %w", err)
}

b.RawConfig = cfg
err = cfg.Unpack(&b.Config)
if err != nil {
return nil, fmt.Errorf("error unpacking config data: %w", err)
return nil, nil, fmt.Errorf("error unpacking config data: %w", err)
}

logpConfig := logp.Config{}
Expand All @@ -139,28 +143,28 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an
}

if err := b.Config.Logging.Unpack(&logpConfig); err != nil {
return nil, fmt.Errorf("error unpacking beats logging config: %w\n%v", err, b.Config.Logging)
return nil, nil, fmt.Errorf("error unpacking beats logging config: %w\n%v", err, b.Config.Logging)
}

b.Info.Logger, err = logp.ConfigureWithCoreLocal(logpConfig, core)
if err != nil {
return nil, fmt.Errorf("error configuring beats logp: %w", err)
return nil, nil, fmt.Errorf("error configuring beats logp: %w", err)
}
// extracting it here for ease of use
logger := b.Info.Logger

instrumentation, err := instrumentation.New(cfg, b.Info.Beat, b.Info.Version, logger)
if err != nil {
return nil, fmt.Errorf("error setting up instrumentation: %w", err)
return nil, nil, fmt.Errorf("error setting up instrumentation: %w", err)
}
b.Instrumentation = instrumentation

if err := instance.PromoteOutputQueueSettings(b); err != nil {
return nil, fmt.Errorf("could not promote output queue settings: %w", err)
return nil, nil, fmt.Errorf("could not promote output queue settings: %w", err)
}

if err := features.UpdateFromConfig(b.RawConfig); err != nil {
return nil, fmt.Errorf("could not parse features: %w", err)
return nil, nil, fmt.Errorf("could not parse features: %w", err)
}
b.RegisterHostname(features.FQDN())

Expand All @@ -171,7 +175,7 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an
}

if err := common.SetTimestampPrecision(b.Config.TimestampPrecision); err != nil {
return nil, fmt.Errorf("error setting timestamp precision: %w", err)
return nil, nil, fmt.Errorf("error setting timestamp precision: %w", err)
}

// log paths values to help with troubleshooting
Expand All @@ -180,15 +184,15 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an
metaPath := b.Paths.Resolve(paths.Data, "meta.json")
err = b.LoadMeta(metaPath)
if err != nil {
return nil, fmt.Errorf("error loading meta data: %w", err)
return nil, nil, fmt.Errorf("error loading meta data: %w", err)
}

logger.Infof("Beat ID: %v", b.Info.ID)

// Try to get the host's FQDN and set it.
h, err := sysinfo.Host()
if err != nil {
return nil, fmt.Errorf("failed to get host information: %w", err)
return nil, nil, fmt.Errorf("failed to get host information: %w", err)
}

fqdnLookupCtx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
Expand All @@ -211,7 +215,7 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an
oCfg, _ := cfg.Child("management.otel", -1)
m, err := management.NewManager(oCfg, b.Registry, logger)
if err != nil {
return nil, fmt.Errorf("error creating new manager: %w", err)
return nil, nil, fmt.Errorf("error creating new manager: %w", err)
}
b.Manager = m

Expand All @@ -233,12 +237,12 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an
b.GenerateUserAgent()

if err := b.Manager.CheckRawConfig(b.RawConfig); err != nil {
return nil, fmt.Errorf("error checking raw config: %w", err)
return nil, nil, fmt.Errorf("error checking raw config: %w", err)
}

b.Beat.BeatConfig, err = b.BeatConfig()
if err != nil {
return nil, fmt.Errorf("error setting BeatConfig: %w", err)
return nil, nil, fmt.Errorf("error setting BeatConfig: %w", err)
}

imFactory := settings.IndexManagement
Expand All @@ -247,28 +251,17 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an
}
b.IdxSupporter, err = imFactory(logger, b.Info, b.RawConfig)
if err != nil {
return nil, fmt.Errorf("error setting index supporter: %w", err)
}

processingFactory := settings.Processing
if processingFactory == nil {
processingFactory = processing.MakeDefaultBeatSupport(true)
return nil, nil, fmt.Errorf("error setting index supporter: %w", err)
}

processors, err := processingFactory(b.Info, logger.Named("processors"), b.RawConfig)
if err != nil {
return nil, fmt.Errorf("error creating processors: %w", err)
}
b.SetProcessors(processors)

// This should be replaced with static config for otel consumer
// but need to figure out if we want the Queue settings from here.
outputEnabled := b.Config.Output.IsSet() && b.Config.Output.Config().Enabled()
if !outputEnabled {
if b.Manager.Enabled() {
logger.Info("Output is configured through Central Management")
} else {
return nil, fmt.Errorf("no outputs are defined, please define one under the output section")
return nil, nil, fmt.Errorf("no outputs are defined, please define one under the output section")
}
}

Expand All @@ -283,18 +276,32 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an

outputFactory := b.MakeOutputFactory(b.Config.Output)

pipelineSettings := pipeline.Settings{
Processors: b.GetProcessors(),
InputQueueSize: b.InputQueueSize,
WaitCloseMode: pipeline.WaitOnPipelineCloseThenForce,
WaitClose: receiverPublisherCloseTimeout,
}
publisher, err := pipeline.LoadWithSettings(b.Info, monitors, b.Config.Pipeline, outputFactory, pipelineSettings)
if err != nil {
return nil, fmt.Errorf("error initializing publisher: %w", err)
}
b.Registry.MustRegisterOutput(b.MakeOutputReloader(publisher.OutputReloader()))
b.Publisher = publisher
return b, func(h component.Host) error {
processingFactory := settings.Processing
if processingFactory == nil {
processingFactory = processing.MakeDefaultBeatSupport(true)
}

processors, err := processingFactory(b.Info, logger.Named("processors"), b.RawConfig)
if err != nil {
err := fmt.Errorf("error creating processors: %w", err)
componentstatus.ReportStatus(h, componentstatus.NewFatalErrorEvent(err))
return err
}
b.SetProcessors(processors)
pipelineSettings := pipeline.Settings{
Processors: b.GetProcessors(),
InputQueueSize: b.InputQueueSize,
WaitCloseMode: pipeline.WaitOnPipelineCloseThenForce,
WaitClose: receiverPublisherCloseTimeout,
}
publisher, err := pipeline.LoadWithSettings(b.Info, monitors, b.Config.Pipeline, outputFactory, pipelineSettings)
if err != nil {
return fmt.Errorf("error initializing publisher: %w", err)
}
b.Registry.MustRegisterOutput(b.MakeOutputReloader(publisher.OutputReloader()))
b.Publisher = publisher

return b, nil
return nil
}, nil
}
6 changes: 3 additions & 3 deletions x-pack/libbeat/cmd/instance/beat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestManager(t *testing.T) {
"path.home": tmpDir,
}
t.Run("otel management disabled - key missing", func(t *testing.T) {
beat, err := NewBeatForReceiver(cmd.FilebeatSettings("filebeat"), cfg, consumertest.NewNop(), "testcomponent", zapcore.NewNopCore())
beat, _, err := NewBeatForReceiver(cmd.FilebeatSettings("filebeat"), cfg, consumertest.NewNop(), "testcomponent", zapcore.NewNopCore())
assert.NoError(t, err)
assert.NotNil(t, beat.Manager)
// it should fallback to FallbackManager if key is missing
Expand All @@ -53,7 +53,7 @@ func TestManager(t *testing.T) {
defer func() {
management.SetUnderAgent(false) // reset to false
}()
beat, err := NewBeatForReceiver(cmd.FilebeatSettings("filebeat"), tmpCfg, consumertest.NewNop(), "testcomponent", zapcore.NewNopCore())
beat, _, err := NewBeatForReceiver(cmd.FilebeatSettings("filebeat"), tmpCfg, consumertest.NewNop(), "testcomponent", zapcore.NewNopCore())
assert.NoError(t, err)
assert.NotNil(t, beat.Manager)
assert.IsType(t, beat.Manager, &otelmanager.OtelManager{})
Expand All @@ -72,7 +72,7 @@ type: "log"`)
defer func() {
management.SetUnderAgent(false) // reset to false
}()
beat, err := NewBeatForReceiver(cmd.FilebeatSettings("filebeat"), tmpCfg, consumertest.NewNop(), "testcomponent", zapcore.NewNopCore())
beat, _, err := NewBeatForReceiver(cmd.FilebeatSettings("filebeat"), tmpCfg, consumertest.NewNop(), "testcomponent", zapcore.NewNopCore())
assert.NoError(t, err)
assert.NotNil(t, beat.Manager)
assert.IsType(t, beat.Manager, &management.FallbackManager{})
Expand Down
4 changes: 2 additions & 2 deletions x-pack/metricbeat/mbreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func createReceiver(ctx context.Context, set receiver.Settings, baseCfg componen
settings.ElasticLicensed = true
settings.Initialize = append(settings.Initialize, include.InitializeModule)

b, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, consumer, set.ID.String(), set.Logger.Core())
b, cb, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, consumer, set.ID.String(), set.Logger.Core())
if err != nil {
return nil, fmt.Errorf("error creating %s: %w", Name, err)
}
Expand All @@ -61,7 +61,7 @@ func createReceiver(ctx context.Context, set receiver.Settings, baseCfg componen
if err != nil {
return nil, fmt.Errorf("error creating %s: %w", Name, err)
}
return &metricbeatReceiver{BeatReceiver: br}, nil
return &metricbeatReceiver{BeatReceiver: br, cb: cb}, nil
}

// copied from metricbeat cmd.
Expand Down
4 changes: 4 additions & 0 deletions x-pack/metricbeat/mbreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@ import (
type metricbeatReceiver struct {
xpInstance.BeatReceiver
wg sync.WaitGroup
cb xpInstance.Callback
}

func (mb *metricbeatReceiver) Start(ctx context.Context, host component.Host) error {
if err := mb.cb(host); err != nil {
return err
}
mb.wg.Add(1)
go func() {
defer mb.wg.Done()
Expand Down
Loading