Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 197 additions & 7 deletions internal/pkg/api/handleOpAMP.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ import (
"errors"
"fmt"
"net/http"
"strings"
"time"

"gopkg.in/yaml.v3"

"github.com/gofrs/uuid/v5"
"github.com/open-telemetry/opamp-go/protobufs"
oaServer "github.com/open-telemetry/opamp-go/server"
Expand All @@ -38,13 +41,17 @@ type OpAMPT struct {
cfg *config.Server
bulk bulk.Bulk
cache cache.Cache
bc *checkin.Bulk
bc checkinBulk

srv oaServer.OpAMPServer
handler oaServer.HTTPHandlerFunc
connCtx oaServer.ConnContext
}

type checkinBulk interface {
CheckIn(id string, opts ...checkin.Option) error
}

func NewOpAMPT(
ctx context.Context,
cfg *config.Server,
Expand Down Expand Up @@ -233,12 +240,16 @@ func (oa *OpAMPT) enrollAgent(zlog zerolog.Logger, agentID string, aToS *protobu
// description is only sent if any of its fields change.
meta := localMetadata{}
meta.Elastic.Agent.ID = agentID
agentType := ""
if aToS.AgentDescription != nil {
// Extract agent version
for _, ia := range aToS.AgentDescription.IdentifyingAttributes {
switch attribute.Key(ia.Key) {
case semconv.ServiceVersionKey:
meta.Elastic.Agent.Version = ia.GetValue().GetStringValue()
case semconv.ServiceNameKey:
agentType = ia.GetValue().GetStringValue()
meta.Elastic.Agent.Name = agentType
}
}
zlog.Debug().Str("opamp.agent.version", meta.Elastic.Agent.Version).Msg("extracted agent version")
Expand All @@ -250,6 +261,9 @@ func (oa *OpAMPT) enrollAgent(zlog zerolog.Logger, agentID string, aToS *protobu
hostname := nia.GetValue().GetStringValue()
meta.Host.Name = hostname
meta.Host.Hostname = hostname
case semconv.OSTypeKey:
osType := nia.GetValue().GetStringValue()
meta.Os.Platform = osType
}
}
zlog.Debug().Str("hostname", meta.Host.Hostname).Msg("extracted hostname")
Expand All @@ -261,15 +275,32 @@ func (oa *OpAMPT) enrollAgent(zlog zerolog.Logger, agentID string, aToS *protobu
return nil, fmt.Errorf("failed to marshal local metadata: %w", err)
}

identifyingAttributes, err := ProtobufKVToRawMessage(aToS.AgentDescription.IdentifyingAttributes)
if err != nil {
return nil, fmt.Errorf("failed to marshal identifying attributes: %w", err)
}

nonIdentifyingAttributes, err := ProtobufKVToRawMessage(aToS.AgentDescription.NonIdentifyingAttributes)
if err != nil {
return nil, fmt.Errorf("failed to marshal non-identifying attributes: %w", err)
}

agent := model.Agent{
ESDocument: model.ESDocument{Id: agentID},
Active: true,
EnrolledAt: now.UTC().Format(time.RFC3339),
PolicyID: rec.PolicyID,
Agent: &model.AgentMetadata{
ID: agentID,
ID: agentID,
Version: meta.Elastic.Agent.Version,
Type: agentType,
},
LocalMetadata: data,
LocalMetadata: data,
PolicyRevisionIdx: 1,
IdentifyingAttributes: identifyingAttributes,
NonIdentifyingAttributes: nonIdentifyingAttributes,
Type: "OPAMP",
Tags: []string{agentType},
}

data, err = json.Marshal(agent)
Expand All @@ -291,14 +322,42 @@ func (oa *OpAMPT) updateAgent(zlog zerolog.Logger, agent *model.Agent, aToS *pro

initialOpts := make([]checkin.Option, 0)

status := "online"

// Extract the health status from the health message if it exists.
if aToS.Health != nil {
initialOpts = append(initialOpts, checkin.WithStatus(aToS.Health.Status))
if !aToS.Health.Healthy {
status = "error"
} else if aToS.Health.Status == "StatusRecoverableError" {
status = "degraded"
}

// Extract the unhealthy reason from the health message if it exists.
// Extract the last_checkin_message from the health message if it exists.
if aToS.Health.LastError != "" {
unhealthyReason := []string{aToS.Health.LastError}
initialOpts = append(initialOpts, checkin.WithUnhealthyReason(&unhealthyReason))
initialOpts = append(initialOpts, checkin.WithMessage(aToS.Health.LastError))
} else {
initialOpts = append(initialOpts, checkin.WithMessage(aToS.Health.Status))
}
healthBytes, err := json.Marshal(aToS.Health)
if err != nil {
return fmt.Errorf("failed to marshal health: %w", err)
}
initialOpts = append(initialOpts, checkin.WithHealth(healthBytes))
}

initialOpts = append(initialOpts, checkin.WithStatus(status))
initialOpts = append(initialOpts, checkin.WithSequenceNum(aToS.SequenceNum))

capabilities := decodeCapabilities(aToS.Capabilities)
initialOpts = append(initialOpts, checkin.WithCapabilities(capabilities))

if aToS.EffectiveConfig != nil {
effectiveConfigBytes, err := ParseEffectiveConfig(aToS.EffectiveConfig)
if err != nil {
return fmt.Errorf("failed to parse effective config: %w", err)
}
if effectiveConfigBytes != nil {
initialOpts = append(initialOpts, checkin.WithEffectiveConfig(effectiveConfigBytes))
}
}

Expand All @@ -310,10 +369,141 @@ type localMetadata struct {
Agent struct {
ID string `json:"id,omitempty"`
Version string `json:"version,omitempty"`
Name string `json:"name,omitempty"`
} `json:"agent,omitempty"`
} `json:"elastic,omitempty"`
Host struct {
Hostname string `json:"hostname,omitempty"`
Name string `json:"name,omitempty"`
} `json:"host,omitempty"`
Os struct {
Platform string `json:"platform,omitempty"`
} `json:"os,omitempty"`
}

func ParseEffectiveConfig(effectiveConfig *protobufs.EffectiveConfig) ([]byte, error) {
if effectiveConfig.ConfigMap != nil && effectiveConfig.ConfigMap.ConfigMap[""] != nil {
configMap := effectiveConfig.ConfigMap.ConfigMap[""]

if len(configMap.Body) != 0 {
bodyBytes := configMap.Body

obj := make(map[string]interface{})
if err := yaml.Unmarshal(bodyBytes, &obj); err != nil {
return nil, fmt.Errorf("unmarshal effective config failure: %w", err)
}
redactSensitive(obj)
effectiveConfigBytes, err := json.Marshal(obj)
if err != nil {
return nil, fmt.Errorf("failed to marshal effective config: %w", err)
}
return effectiveConfigBytes, nil
}
}
return nil, nil
}

func redactSensitive(v interface{}) {
const redacted = "[REDACTED]"
switch typed := v.(type) {
case map[string]interface{}:
for key, val := range typed {
if isSensitiveKey(key) {
typed[key] = redacted
continue
}
redactSensitive(val)
}
case map[interface{}]interface{}:
for rawKey, val := range typed {
key, ok := rawKey.(string)
if ok && isSensitiveKey(key) {
typed[rawKey] = redacted
continue
}
redactSensitive(val)
}
case []interface{}:
for i := range typed {
redactSensitive(typed[i])
}
}
}

func isSensitiveKey(key string) bool {
key = strings.ToLower(strings.TrimSpace(key))
if key == "" {
return false
}
for _, token := range []string{
"password",
"passwd",
"pass",
"secret",
"token",
"apikey",
"api_key",
"access_key",
"private_key",
"credential",
"credentials",
} {
if key == token || strings.Contains(key, token) {
return true
}
}
return false
}

func ProtobufKVToRawMessage(kv []*protobufs.KeyValue) (json.RawMessage, error) {
// 1. Build an intermediate map to represent the JSON object
data := make(map[string]interface{}, len(kv))
for _, item := range kv {
if item.Value == nil {
continue
}
switch v := item.Value.GetValue().(type) {
case *protobufs.AnyValue_StringValue:
if v.StringValue != "" {
data[item.Key] = v.StringValue
}
case *protobufs.AnyValue_IntValue:
data[item.Key] = v.IntValue
case *protobufs.AnyValue_DoubleValue:
data[item.Key] = v.DoubleValue
case *protobufs.AnyValue_BoolValue:
data[item.Key] = v.BoolValue
case *protobufs.AnyValue_BytesValue:
if len(v.BytesValue) > 0 {
data[item.Key] = v.BytesValue
}
}
}

// 2. Marshal the map into bytes
b, err := json.Marshal(data)
if err != nil {
return nil, err
}

return json.RawMessage(b), nil
}

// decodeCapabilities converts capability bitmask to human-readable strings
func decodeCapabilities(caps uint64) []string {
var result []string
capMap := map[uint64]string{
uint64(protobufs.AgentCapabilities_AgentCapabilities_ReportsStatus): "ReportsStatus",
uint64(protobufs.AgentCapabilities_AgentCapabilities_AcceptsRemoteConfig): "AcceptsRemoteConfig",
uint64(protobufs.AgentCapabilities_AgentCapabilities_ReportsEffectiveConfig): "ReportsEffectiveConfig",
uint64(protobufs.AgentCapabilities_AgentCapabilities_ReportsHealth): "ReportsHealth",
uint64(protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents): "ReportsAvailableComponents",
uint64(protobufs.AgentCapabilities_AgentCapabilities_AcceptsRestartCommand): "AcceptsRestartCommand",
}
for mask, name := range capMap {
if caps&mask != 0 {
result = append(result, name)
}
}
return result
}
Loading