Skip to content

Commit 9cec816

Browse files
authored
feat(apm-ssi): Add a Remote Config receiver for Workload Selection (#41312)
### What does this PR do? - Adds a Remote Config receiver for Workload Selection - Embed the `dd-compile-policy` (1MB) into the Linux builds of the Agent (Deb/RPM/OCI) ### Motivation ### Describe how you validated your changes ### Additional Notes
1 parent 58f819f commit 9cec816

File tree

16 files changed

+1127
-1
lines changed

16 files changed

+1127
-1
lines changed

.github/CODEOWNERS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,7 @@
382382
/comp/snmpscan @DataDog/ndm-core
383383
/comp/softwareinventory @DataDog/windows-products
384384
/comp/syntheticstestscheduler @DataDog/synthetics-executing
385+
/comp/workloadselection @DataDog/injection-platform
385386
# END COMPONENTS
386387

387388
# Additional notification to @iglendd about Agent Telemetry changes for optional approval and governance acknowledgement

Dockerfiles/agent/Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ RUN --mount=from=artifacts,target=/artifacts \
161161
&& rm -rf usr etc/init lib \
162162
go/ \
163163
opt/datadog-agent/embedded/bin/installer \
164+
opt/datadog-agent/embedded/bin/dd-compile-policy \
164165
opt/datadog-agent/sources \
165166
opt/datadog-agent/embedded/share/doc \
166167
opt/datadog-agent/embedded/share/man \

cmd/agent/subcommands/run/command.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
agenttelemetryfx "github.com/DataDog/datadog-agent/comp/core/agenttelemetry/fx"
3535
"github.com/DataDog/datadog-agent/comp/core/autodiscovery/providers/datastreams"
3636
ssistatusfx "github.com/DataDog/datadog-agent/comp/updater/ssistatus/fx"
37+
workloadselectionfx "github.com/DataDog/datadog-agent/comp/workloadselection/fx"
3738

3839
haagentfx "github.com/DataDog/datadog-agent/comp/haagent/fx"
3940
snmpscanfx "github.com/DataDog/datadog-agent/comp/snmpscan/fx"
@@ -534,6 +535,7 @@ func getSharedFxOption() fx.Option {
534535
diagnosefx.Module(),
535536
ipcfx.ModuleReadWrite(),
536537
ssistatusfx.Module(),
538+
workloadselectionfx.Module(),
537539
workloadfilterfx.Module(),
538540
connectivitycheckerfx.Module(),
539541
configstreamfx.Module(),

comp/README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -806,3 +806,9 @@ versions, installation dates, and other relevant details for inventory tracking.
806806

807807
Package syntheticstestscheduler defines a synthetics scheduler component to run
808808
network tests based on remote config.
809+
810+
### [comp/workloadselection](https://pkg.go.dev/github.com/DataDog/datadog-agent/comp/workloadselection)
811+
812+
*Datadog Team*: injection-platform
813+
814+
Package workloadselection listens to Remote Config to receive & apply workload selection configuration

comp/core/config/setup.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,3 +88,8 @@ func setupConfig(config pkgconfigmodel.BuildableConfig, secretComp secrets.Compo
8888

8989
return nil
9090
}
91+
92+
// GetInstallPath returns the install path for the agent
93+
func GetInstallPath() string {
94+
return pkgconfigsetup.InstallPath
95+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2025-present Datadog, Inc.
5+
6+
// Package workloadselection listens to Remote Config to receive & apply workload selection configuration
7+
package workloadselection
8+
9+
// team: injection-platform
10+
11+
// Component is the component type.
12+
type Component interface{}

comp/workloadselection/fx/fx.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2025-present Datadog, Inc.
5+
6+
// Package fx provides the fx module for the workloadselection component
7+
package fx
8+
9+
import (
10+
uberfx "go.uber.org/fx"
11+
12+
workloadselection "github.com/DataDog/datadog-agent/comp/workloadselection/def"
13+
workloadselectionimpl "github.com/DataDog/datadog-agent/comp/workloadselection/impl"
14+
"github.com/DataDog/datadog-agent/pkg/util/fxutil"
15+
)
16+
17+
// Module defines the fx options for this component
18+
func Module() fxutil.Module {
19+
return fxutil.Component(
20+
fxutil.ProvideComponentConstructor(
21+
workloadselectionimpl.NewComponent,
22+
),
23+
fxutil.ProvideOptional[workloadselection.Component](),
24+
25+
// workloadselection is a component with no public method, therefore nobody depends on it and FX only instantiates
26+
// components when they're needed. Adding a dummy function that takes our Component as a parameter forces
27+
// the instantiation of workloadselection. This means that simply using 'workloadselectionfx.Module()' will run our
28+
// component (which is the expected behavior).
29+
//
30+
// This prevents silent corner case where including 'workloadselection' in the main function would not actually
31+
// instantiate it. This also removes the need for every main using workloadselection to add the line below.
32+
uberfx.Invoke(func(_ workloadselection.Component) {}),
33+
)
34+
}
Lines changed: 244 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,244 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2025-present Datadog, Inc.
5+
6+
// Package workloadselectionimpl implements the workloadselection component interface
7+
package workloadselectionimpl
8+
9+
import (
10+
"bytes"
11+
"encoding/json"
12+
"fmt"
13+
"os"
14+
"os/exec"
15+
"path/filepath"
16+
"regexp"
17+
"sort"
18+
"strconv"
19+
"strings"
20+
21+
"github.com/DataDog/datadog-agent/comp/core/config"
22+
log "github.com/DataDog/datadog-agent/comp/core/log/def"
23+
rctypes "github.com/DataDog/datadog-agent/comp/remote-config/rcclient/types"
24+
workloadselection "github.com/DataDog/datadog-agent/comp/workloadselection/def"
25+
"github.com/DataDog/datadog-agent/pkg/remoteconfig/state"
26+
)
27+
28+
var (
29+
configPath = filepath.Join(config.DefaultConfPath, "managed", "rc-orgwide-wls-policy.bin")
30+
ddPolicyCompileRelativePath = filepath.Join("embedded", "bin", "dd-compile-policy")
31+
// Pattern to extract policy ID from config path: datadog/\d+/<product>/<config_id>/<hash>
32+
policyIDPattern = regexp.MustCompile(`^datadog/\d+/[^/]+/([^/]+)/`)
33+
// Pattern to extract numeric prefix from policy ID: N.<name>
34+
policyPrefixPattern = regexp.MustCompile(`^(\d+)\.`)
35+
36+
// getInstallPath is a variable that can be overridden in tests
37+
getInstallPath = config.GetInstallPath
38+
)
39+
40+
// Requires defines the dependencies for the workloadselection component
41+
type Requires struct {
42+
Log log.Component
43+
Config config.Component
44+
}
45+
46+
// Provides defines the output of the workloadselection component
47+
type Provides struct {
48+
Comp workloadselection.Component
49+
RCListener rctypes.ListenerProvider
50+
}
51+
52+
// NewComponent creates a new workloadselection component
53+
func NewComponent(reqs Requires) (Provides, error) {
54+
wls := &workloadselectionComponent{
55+
log: reqs.Log,
56+
config: reqs.Config,
57+
}
58+
59+
var rcListener rctypes.ListenerProvider
60+
if reqs.Config.GetBool("apm_config.workload_selection") && wls.isCompilePolicyBinaryAvailable() {
61+
reqs.Log.Debug("Enabling APM SSI Workload Selection listener")
62+
rcListener.ListenerProvider = rctypes.RCListener{
63+
state.ProductApmPolicies: wls.onConfigUpdate,
64+
}
65+
} else {
66+
reqs.Log.Debug("Disabling APM SSI Workload Selection listener as the compile policy binary is not available or workload selection is disabled")
67+
}
68+
69+
provides := Provides{
70+
Comp: wls,
71+
RCListener: rcListener,
72+
}
73+
return provides, nil
74+
}
75+
76+
type workloadselectionComponent struct {
77+
log log.Component
78+
config config.Component
79+
}
80+
81+
// isCompilePolicyBinaryAvailable checks if the compile policy binary is available
82+
// and executable
83+
func (c *workloadselectionComponent) isCompilePolicyBinaryAvailable() bool {
84+
compilePath := filepath.Join(getInstallPath(), ddPolicyCompileRelativePath)
85+
info, err := os.Stat(compilePath)
86+
if err != nil {
87+
if !os.IsNotExist(err) {
88+
c.log.Warnf("failed to stat APM workload selection compile policy binary: %v", err)
89+
}
90+
return false
91+
}
92+
return info.Mode().IsRegular() && info.Mode()&0111 != 0
93+
}
94+
95+
// compilePolicyBinary compiles the policy binary into a binary file
96+
// readable by the injector
97+
func (c *workloadselectionComponent) compileAndWriteConfig(rawConfig []byte) error {
98+
if err := os.MkdirAll(filepath.Dir(configPath), 0755); err != nil {
99+
return err
100+
}
101+
cmd := exec.Command(filepath.Join(getInstallPath(), ddPolicyCompileRelativePath), "--input-string", string(rawConfig), "--output-file", configPath)
102+
var stdoutBuf, stderrBuf bytes.Buffer
103+
cmd.Stdout = &stdoutBuf
104+
cmd.Stderr = &stderrBuf
105+
if err := cmd.Run(); err != nil {
106+
return fmt.Errorf("error executing dd-policy-compile (%w); out: '%s'; err: '%s'", err, stdoutBuf.String(), stderrBuf.String())
107+
}
108+
return nil
109+
}
110+
111+
// policyConfig represents a config with its ordering information
112+
type policyConfig struct {
113+
path string
114+
order int
115+
config []byte
116+
}
117+
118+
// extractPolicyID extracts the policy ID from a config path
119+
// Path format: configs/\d+/<ID>/<gibberish>
120+
func extractPolicyID(path string) string {
121+
matches := policyIDPattern.FindStringSubmatch(path)
122+
if len(matches) > 1 {
123+
return matches[1]
124+
}
125+
return ""
126+
}
127+
128+
// extractOrderFromPolicyID extracts the numeric order from a policy ID
129+
// If policy ID is in format N.<name>, returns N. Otherwise returns 0.
130+
func extractOrderFromPolicyID(policyID string) int {
131+
matches := policyPrefixPattern.FindStringSubmatch(policyID)
132+
if len(matches) > 1 {
133+
if order, err := strconv.Atoi(matches[1]); err == nil {
134+
return order
135+
}
136+
}
137+
return 0
138+
}
139+
140+
// mergeConfigs merges multiple configs by concatenating their policies in order
141+
func mergeConfigs(configs []policyConfig) ([]byte, error) {
142+
type policyJSON struct {
143+
Policies []json.RawMessage `json:"policies"`
144+
}
145+
146+
allPolicies := make([]json.RawMessage, 0)
147+
148+
for _, cfg := range configs {
149+
var parsed policyJSON
150+
if err := json.Unmarshal(cfg.config, &parsed); err != nil {
151+
return nil, fmt.Errorf("failed to parse config from %s: %w", cfg.path, err)
152+
}
153+
allPolicies = append(allPolicies, parsed.Policies...)
154+
}
155+
156+
merged := policyJSON{Policies: allPolicies}
157+
return json.Marshal(merged)
158+
}
159+
160+
// onConfigUpdate is the callback function called by Remote Config when the workload selection config is updated
161+
func (c *workloadselectionComponent) onConfigUpdate(updates map[string]state.RawConfig, applyStateCallback func(string, state.ApplyStatus)) {
162+
c.log.Debugf("workload selection config update received: %d", len(updates))
163+
if len(updates) == 0 {
164+
err := c.removeConfig() // No config received, we have to remove the file. Nothing to acknowledge.
165+
if err != nil {
166+
c.log.Errorf("failed to remove workload selection config: %v", err)
167+
}
168+
return
169+
}
170+
171+
// Build a list of configs with their ordering information
172+
var configs []policyConfig
173+
for path, rawConfig := range updates {
174+
policyID := extractPolicyID(path)
175+
order := extractOrderFromPolicyID(policyID)
176+
177+
c.log.Debugf("Processing config path=%s policyID=%s order=%d", path, policyID, order)
178+
179+
configs = append(configs, policyConfig{
180+
path: path,
181+
order: order,
182+
config: rawConfig.Config,
183+
})
184+
}
185+
186+
// Sort configs by order, then alphabetically by path for deterministic ordering
187+
sort.SliceStable(configs, func(i, j int) bool {
188+
if configs[i].order != configs[j].order {
189+
return configs[i].order < configs[j].order
190+
}
191+
// Secondary sort by path for deterministic ordering when order values are equal
192+
return configs[i].path < configs[j].path
193+
})
194+
195+
// Track error state and apply callbacks on function exit
196+
var processingErr error
197+
defer func() {
198+
for _, cfg := range configs {
199+
if processingErr != nil {
200+
applyStateCallback(cfg.path, state.ApplyStatus{
201+
State: state.ApplyStateError,
202+
Error: processingErr.Error(),
203+
})
204+
} else {
205+
applyStateCallback(cfg.path, state.ApplyStatus{
206+
State: state.ApplyStateAcknowledged,
207+
})
208+
}
209+
}
210+
}()
211+
212+
// Log the ordering for debugging
213+
var orderInfo []string
214+
for _, cfg := range configs {
215+
policyID := extractPolicyID(cfg.path)
216+
orderInfo = append(orderInfo, fmt.Sprintf("%s (order=%d)", policyID, cfg.order))
217+
}
218+
c.log.Debugf("Merging %d workload selection configs in order: %s", len(configs), strings.Join(orderInfo, ", "))
219+
220+
// Merge all configs into one
221+
mergedConfig, err := mergeConfigs(configs)
222+
if err != nil {
223+
c.log.Errorf("failed to merge workload selection configs: %v", err)
224+
processingErr = err
225+
return
226+
}
227+
228+
// Compile and write the merged config
229+
err = c.compileAndWriteConfig(mergedConfig)
230+
if err != nil {
231+
c.log.Errorf("failed to compile workload selection config: %v", err)
232+
processingErr = err
233+
return
234+
}
235+
}
236+
237+
func (c *workloadselectionComponent) removeConfig() error {
238+
// os.RemoveAll does not fail if the path doesn't exist, it returns nil
239+
c.log.Debugf("Removing workload selection config")
240+
if err := os.RemoveAll(configPath); err != nil {
241+
return fmt.Errorf("failed to remove workload selection binary policy: %w", err)
242+
}
243+
return nil
244+
}

0 commit comments

Comments
 (0)