Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMLII-2146] logs check feature #30924

Draft
wants to merge 30 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
928f5f5
Add initial files for logs check feature prototype (WIP)
andrewqian2001datadog Nov 3, 2024
4051007
fix arguments
andrewqian2001datadog Nov 4, 2024
f4f6459
add source provider WIP
andrewqian2001datadog Nov 5, 2024
d6e6611
Merge branch 'main' into AMLII-2146
andrewqian2001datadog Nov 5, 2024
73a89cc
add command logs-analyze, and add function addFileSource
andrewqian2001datadog Nov 6, 2024
7481a1f
fix logs analyze to not instantitate config_sources repeatedly
andrewqian2001datadog Nov 6, 2024
03a3b48
remove unecessary code
andrewqian2001datadog Nov 6, 2024
a27ae34
add logs-analyze to list of subcommands
andrewqian2001datadog Nov 6, 2024
610d9cd
add print statements
andrewqian2001datadog Nov 6, 2024
f735759
change path to absolute path
andrewqian2001datadog Nov 6, 2024
8ef0a84
add source provider to launcher
andrewqian2001datadog Nov 6, 2024
306daba
apply singleton pattern to sourceProvider so that command.go and logs…
andrewqian2001datadog Nov 6, 2024
735c95f
add print statements
andrewqian2001datadog Nov 7, 2024
d8b370f
call set up launchers
andrewqian2001datadog Nov 7, 2024
87d027a
print statements
andrewqian2001datadog Nov 7, 2024
91655b8
remove unecessary code from ProcessorOnlyProvider, struct now only co…
andrewqian2001datadog Nov 7, 2024
f9156de
remove singleton thingy, doesnt work
andrewqian2001datadog Nov 7, 2024
c03612b
WIP
andrewqian2001datadog Nov 7, 2024
1a58640
WIP
andrewqian2001datadog Nov 7, 2024
e80c5f1
WIP
andrewqian2001datadog Nov 7, 2024
8f97449
remove code that requires the agent to be launched
andrewqian2001datadog Nov 11, 2024
93e6e9b
remove more code that relies on agent being started
andrewqian2001datadog Nov 11, 2024
980dc6e
remove more code that relies on agent being started
andrewqian2001datadog Nov 11, 2024
47045a2
remove more code that relies on agent being started
andrewqian2001datadog Nov 11, 2024
5b7cdee
remove more code that relies on agent being started
andrewqian2001datadog Nov 11, 2024
90b786c
remove all code that relies on log agent being started
andrewqian2001datadog Nov 11, 2024
73c6f80
TEST
andrewqian2001datadog Nov 11, 2024
a8797d3
working (i think) version of logs check feature (no print no stdout yet)
andrewqian2001datadog Nov 12, 2024
1be05de
reduce GetInstance calls
andrewqian2001datadog Nov 12, 2024
531d747
remove print
andrewqian2001datadog Nov 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions cmd/agent/subcommands/logsanalyze/command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Package logscheck implements 'agent logs-analyze'.
package logscheck

import (
"fmt"

"go.uber.org/fx"

"github.com/spf13/cobra"

"github.com/DataDog/datadog-agent/cmd/agent/command"

"github.com/DataDog/datadog-agent/comp/core"
"github.com/DataDog/datadog-agent/comp/core/config"
"github.com/DataDog/datadog-agent/comp/logs/agent/agentimpl"
"github.com/DataDog/datadog-agent/pkg/logs/sources"
"github.com/DataDog/datadog-agent/pkg/util/fxutil"
)

const defaultCoreConfigPath = "bin/agent/dist/datadog.yaml"

// const defaultCoreConfigPath = "dev/dist/conf.d/random_logs.d/conf.yaml"

// CliParams holds the command-line arguments for the logs-analyze subcommand.
type CliParams struct {
*command.GlobalParams

// LogConfigPath represents the path to the logs configuration file.
LogConfigPath string

// CoreConfigPath represents the path to the core configuration file.
CoreConfigPath string

ConfigSource *sources.ConfigSources
}

// Commands returns a slice of subcommands for the 'agent' command.
func Commands(globalParams *command.GlobalParams) []*cobra.Command {
cliParams := &CliParams{
GlobalParams: globalParams,
CoreConfigPath: defaultCoreConfigPath, // Set default path
ConfigSource: sources.GetInstance(),
}

cmd := &cobra.Command{
Use: "logs-analyze",
Short: "Analyze logs configuration in isolation",
Long: `Run the Datadog agent in logs check mode with a specific configuration file.`,
RunE: func(_ *cobra.Command, args []string) error {
if len(args) < 1 {
return fmt.Errorf("log config file path is required")
}
cliParams.LogConfigPath = args[0]
return fxutil.OneShot(runLogsAnalyze,
core.Bundle(),
fx.Supply(cliParams),
fx.Supply(command.GetDefaultCoreBundleParams(cliParams.GlobalParams)),
)
},
}

// Add flag for core config (optional)
cmd.Flags().StringVarP(&cliParams.CoreConfigPath, "core-config", "C", defaultCoreConfigPath, "Path to the core configuration file (optional)")

return []*cobra.Command{cmd}
}

// runLogsAnalyze handles the logs check operation.
func runLogsAnalyze(config config.Component, cliParams *CliParams) error {
agentimpl.SetUpLaunchers(config)

//send paths to source provider
if err := cliParams.ConfigSource.AddFileSource(cliParams.LogConfigPath); err != nil {
return fmt.Errorf("failed to add log config source: %w", err)
}

// Add core config source
if err := cliParams.ConfigSource.AddFileSource(cliParams.CoreConfigPath); err != nil {
return fmt.Errorf("failed to add core config source: %w", err)
}

return nil
}
2 changes: 2 additions & 0 deletions cmd/agent/subcommands/subcommands.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
cmdintegrations "github.com/DataDog/datadog-agent/cmd/agent/subcommands/integrations"
cmdjmx "github.com/DataDog/datadog-agent/cmd/agent/subcommands/jmx"
cmdlaunchgui "github.com/DataDog/datadog-agent/cmd/agent/subcommands/launchgui"
cmdlogsanalyze "github.com/DataDog/datadog-agent/cmd/agent/subcommands/logsanalyze"
cmdprocesschecks "github.com/DataDog/datadog-agent/cmd/agent/subcommands/processchecks"
cmdremoteconfig "github.com/DataDog/datadog-agent/cmd/agent/subcommands/remoteconfig"
cmdrun "github.com/DataDog/datadog-agent/cmd/agent/subcommands/run"
Expand Down Expand Up @@ -56,6 +57,7 @@ func AgentSubcommands() []command.SubcommandFactory {
cmdhostname.Commands,
cmdimport.Commands,
cmdlaunchgui.Commands,
cmdlogsanalyze.Commands,
cmdremoteconfig.Commands,
cmdrun.Commands,
cmdsecret.Commands,
Expand Down
1 change: 1 addition & 0 deletions comp/logs/agent/agentimpl/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ func (a *logAgent) startPipeline() {
a.diagnosticMessageReceiver,
a.launchers,
)
fmt.Println("andrewq agentimpl/agent.go startPipeline XDDDD?")
starter.Start()

if !sds.ShouldBlockCollectionUntilSDSConfiguration(a.config) {
Expand Down
2 changes: 1 addition & 1 deletion comp/logs/agent/agentimpl/agent_core_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
// NewAgent returns a new Logs Agent
func (a *logAgent) SetupPipeline(processingRules []*config.ProcessingRule, wmeta optional.Option[workloadmeta.Component], integrationsLogs integrations.Component) {
health := health.RegisterLiveness("logs-agent")

// 1.
// setup the auditor
// We pass the health handle to the auditor because it's the end of the pipeline and the most
// critical part. Arguably it could also be plugged to the destination.
Expand Down
50 changes: 50 additions & 0 deletions comp/logs/agent/agentimpl/logs_analyze_init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

//go:build !serverless

package agentimpl

import (
"fmt"
"time"

configComponent "github.com/DataDog/datadog-agent/comp/core/config"
"github.com/DataDog/datadog-agent/comp/logs/agent/config"
"github.com/DataDog/datadog-agent/pkg/logs/launchers"
filelauncher "github.com/DataDog/datadog-agent/pkg/logs/launchers/file"
"github.com/DataDog/datadog-agent/pkg/logs/pipeline"
"github.com/DataDog/datadog-agent/pkg/logs/sources"
"github.com/DataDog/datadog-agent/pkg/logs/tailers"
)

// SetUpLaunchers creates launchers set up tailers to tail files provided by the logs-analyze command
func SetUpLaunchers(conf configComponent.Component) {
processingRules, err := config.GlobalProcessingRules(conf)
if err != nil {
return
}

pipelineProvider := pipeline.NewProcessorOnlyProvider(nil, processingRules, conf, nil)
// setup the launchers
lnchrs := launchers.NewLaunchers(nil, pipelineProvider, nil, nil)
fileLimits := 500
fileValidatePodContainer := false
fileScanPeriod := time.Duration(10.0 * float64(time.Second))
fileWildcardSelectionMode := "by_name"
fileLauncher := filelauncher.NewLauncher(
fileLimits,
filelauncher.DefaultSleepDuration,
fileValidatePodContainer,
fileScanPeriod,
fileWildcardSelectionMode,
nil,
nil)
sourceProvider := sources.GetInstance()
fmt.Printf("logs_analyze_init SourcerProvider address is %p \n", sourceProvider)
tracker := tailers.NewTailerTracker()
fileLauncher.Start(sourceProvider, pipelineProvider, nil, tracker)
lnchrs.AddLauncher(fileLauncher)
}
5 changes: 5 additions & 0 deletions pkg/logs/launchers/file/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package file

import (
"fmt"
"regexp"
"time"

Expand Down Expand Up @@ -89,8 +90,11 @@ func NewLauncher(tailingLimit int, tailerSleepDuration time.Duration, validatePo
func (s *Launcher) Start(sourceProvider launchers.SourceProvider, pipelineProvider pipeline.Provider, registry auditor.Registry, tracker *tailers.TailerTracker) {
s.pipelineProvider = pipelineProvider
s.addedSources, s.removedSources = sourceProvider.SubscribeForType(config.FileType)
fmt.Println("launcher added sources is ", s.addedSources)
fmt.Printf("launcher added sources channel address is %p \n", s.addedSources)
s.registry = registry
tracker.Add(s.tailers)
fmt.Println("launcher.go tailers are ", s.tailers)
go s.run()
}

Expand Down Expand Up @@ -249,6 +253,7 @@ func (s *Launcher) cleanUpRotatedTailers() {

// addSource keeps track of the new source and launch new tailers for this source.
func (s *Launcher) addSource(source *sources.LogSource) {
fmt.Println("andrewq launcher.go: addSource", source)
s.activeSources = append(s.activeSources, source)
s.launchTailers(source)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/logs/launchers/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

// Launcher implementations launch logs pipelines in response to sources, and
// mange those pipelines' lifetime.
// manage those pipelines' lifetime.
//
// Launchers are started when the logs-agent starts, or when they are added to
// the agent, and stopped when it stops.
Expand Down
78 changes: 78 additions & 0 deletions pkg/logs/pipeline/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/DataDog/datadog-agent/pkg/logs/client"
"github.com/DataDog/datadog-agent/pkg/logs/diagnostic"
"github.com/DataDog/datadog-agent/pkg/logs/message"
"github.com/DataDog/datadog-agent/pkg/logs/processor"
"github.com/DataDog/datadog-agent/pkg/logs/sds"
"github.com/DataDog/datadog-agent/pkg/logs/status/statusinterface"
"github.com/DataDog/datadog-agent/pkg/util/log"
Expand Down Expand Up @@ -56,6 +57,10 @@ type provider struct {
cfg pkgconfigmodel.Reader
}

type processorOnlyProvider struct {
processor *processor.Processor
}

// NewProvider returns a new Provider
func NewProvider(numberOfPipelines int, auditor auditor.Auditor, diagnosticMessageReceiver diagnostic.MessageReceiver, processingRules []*config.ProcessingRule, endpoints *config.Endpoints, destinationsContext *client.DestinationsContext, status statusinterface.Status, hostname hostnameinterface.Component, cfg pkgconfigmodel.Reader) Provider {
return newProvider(numberOfPipelines, auditor, diagnosticMessageReceiver, processingRules, endpoints, destinationsContext, false, status, hostname, cfg)
Expand All @@ -66,6 +71,20 @@ func NewServerlessProvider(numberOfPipelines int, auditor auditor.Auditor, diagn
return newProvider(numberOfPipelines, auditor, diagnosticMessageReceiver, processingRules, endpoints, destinationsContext, true, status, hostname, cfg)
}

// NewProcessorOnlyProvider returns a new Provider with only the processor
func NewProcessorOnlyProvider(diagnosticMessageReceiver diagnostic.MessageReceiver, processingRules []*config.ProcessingRule, cfg pkgconfigmodel.Reader, hostname hostnameinterface.Component) Provider {
strategyInput := make(chan *message.Message, config.ChanSize)
encoder := processor.JSONServerlessEncoder
inputChan := make(chan *message.Message, config.ChanSize)
pipelineId := 0
processor := processor.New(cfg, inputChan, strategyInput, processingRules,
encoder, diagnosticMessageReceiver, hostname, pipelineId)

return &processorOnlyProvider{
processor: processor,
}
}

// NewMockProvider creates a new provider that will not provide any pipelines.
func NewMockProvider() Provider {
return &provider{}
Expand Down Expand Up @@ -100,6 +119,10 @@ func (p *provider) Start() {
}
}

func (p *processorOnlyProvider) Start() {
p.processor.Start()
}

// Stop stops all pipelines in parallel,
// this call blocks until all pipelines are stopped
func (p *provider) Stop() {
Expand All @@ -112,6 +135,10 @@ func (p *provider) Stop() {
p.outputChan = nil
}

func (p *processorOnlyProvider) Stop() {
p.processor.Stop()
}

// return true if all processor SDS scanners are active.
func (p *provider) reconfigureSDS(config []byte, orderType sds.ReconfigureOrderType) (bool, error) {
var responses []chan sds.ReconfigureResponse
Expand Down Expand Up @@ -151,25 +178,67 @@ func (p *provider) reconfigureSDS(config []byte, orderType sds.ReconfigureOrderT
return allScannersActive, rerr
}

// return true if processor SDS scanners are active.
func (p *processorOnlyProvider) reconfigureSDS(config []byte, orderType sds.ReconfigureOrderType) (bool, error) {
// Send a reconfiguration order to the running pipeline
order := sds.ReconfigureOrder{
Type: orderType,
Config: config,
ResponseChan: make(chan sds.ReconfigureResponse),
}

log.Debug("Sending SDS reconfiguration order:", string(order.Type))
p.processor.ReconfigChan <- order

// Receive response and determine if any errors occurred
resp := <-order.ResponseChan
scannerActive := resp.IsActive
var rerr error
if resp.Err != nil {
rerr = multierror.Append(rerr, resp.Err)
}

return scannerActive, rerr

}

// ReconfigureSDSStandardRules stores the SDS standard rules for the given provider.
func (p *provider) ReconfigureSDSStandardRules(standardRules []byte) (bool, error) {
return p.reconfigureSDS(standardRules, sds.StandardRules)
}

func (p *processorOnlyProvider) ReconfigureSDSStandardRules(standardRules []byte) (bool, error) {
return p.reconfigureSDS(standardRules, sds.StandardRules)
}

// ReconfigureSDSAgentConfig reconfigures the pipeline with the given
// configuration received through Remote Configuration.
// Return true if all SDS scanners are active after applying this configuration.
func (p *provider) ReconfigureSDSAgentConfig(config []byte) (bool, error) {
return p.reconfigureSDS(config, sds.AgentConfig)
}

// ReconfigureSDSAgentConfig reconfigures the pipeline with the given
// configuration received through Remote Configuration.
// Return true if all SDS scanners are active after applying this configuration.
func (p *processorOnlyProvider) ReconfigureSDSAgentConfig(config []byte) (bool, error) {
return p.reconfigureSDS(config, sds.AgentConfig)
}

// StopSDSProcessing reconfigures the pipeline removing the SDS scanning
// from the processing steps.
func (p *provider) StopSDSProcessing() error {
_, err := p.reconfigureSDS(nil, sds.StopProcessing)
return err
}

// StopSDSProcessing reconfigures the pipeline removing the SDS scanning
// from the processing steps.
func (p *processorOnlyProvider) StopSDSProcessing() error {
_, err := p.reconfigureSDS(nil, sds.StopProcessing)
return err
}

// NextPipelineChan returns the next pipeline input channel
func (p *provider) NextPipelineChan() chan *message.Message {
pipelinesLen := len(p.pipelines)
Expand All @@ -181,6 +250,10 @@ func (p *provider) NextPipelineChan() chan *message.Message {
return nextPipeline.InputChan
}

func (p *processorOnlyProvider) NextPipelineChan() chan *message.Message {
return nil
}

// Flush flushes synchronously all the contained pipeline of this provider.
func (p *provider) Flush(ctx context.Context) {
for _, p := range p.pipelines {
Expand All @@ -192,3 +265,8 @@ func (p *provider) Flush(ctx context.Context) {
}
}
}

// Flush flushes synchronously all the contained pipeline of this provider.
func (p *processorOnlyProvider) Flush(ctx context.Context) {
p.processor.Flush(ctx)
}
Loading
Loading