Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ data:
{{- if .healthEvent.errorCode }}
errorCode = [{{- range $index, $code := .healthEvent.errorCode }}{{- if $index }}, {{ end }}{{ $code | quote }}{{- end }}]
{{- end }}
{{- if .healthEvent.processingStrategy }}
processingStrategy = {{ .healthEvent.processingStrategy | quote }}
{{- end }}
{{- end }}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ spec:
- "--health-probe-bind-address=:8081"
- "--max-concurrent-reconciles={{ .Values.maxConcurrentReconciles }}"
- "--resync-period={{ .Values.resyncPeriod }}"
- "--processing-strategy={{ .Values.processingStrategy }}"
resources:
{{- toYaml .Values.resources | nindent 12 }}
ports:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,9 @@ volumes:
path: /var/run/nvsentinel
type: DirectoryOrCreate

# Processing strategy for health events
# valid values: EXECUTE_REMEDIATION, STORE_ONLY
# default: EXECUTE_REMEDIATION
# EXECUTE_REMEDIATION: normal behavior; downstream modules may update cluster state.
# STORE_ONLY: observability-only behavior; event should be persisted/exported but should not modify cluster resources (i.e., no node conditions, no quarantine, no drain, no remediation).
processingStrategy: EXECUTE_REMEDIATION
6 changes: 6 additions & 0 deletions health-monitors/kubernetes-object-monitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ var (
"unix:///var/run/nvsentinel.sock",
"Platform Connector gRPC socket",
)
processingStrategyFlag = flag.String(
"processing-strategy",
"EXECUTE_REMEDIATION",
"Event processing strategy: EXECUTE_REMEDIATION or STORE_ONLY",
)
)

func main() {
Expand All @@ -93,6 +98,7 @@ func run() error {
ResyncPeriod: *resyncPeriod,
MaxConcurrentReconciles: *maxConcurrentReconciles,
PlatformConnectorSocket: *platformConnectorSocket,
ProcessingStrategy: *processingStrategyFlag,
}

components, err := initializer.InitializeAll(ctx, params)
Expand Down
2 changes: 2 additions & 0 deletions health-monitors/kubernetes-object-monitor/pkg/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type HealthEventSpec struct {
Message string `toml:"message"`
RecommendedAction string `toml:"recommendedAction"`
ErrorCode []string `toml:"errorCode"`
// override the processing strategy for the policy
ProcessingStrategy string `toml:"processingStrategy"`
}

func (r *ResourceSpec) GVK() string {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type Params struct {
ResyncPeriod time.Duration
MaxConcurrentReconciles int
PlatformConnectorSocket string
ProcessingStrategy string
}

type Components struct {
Expand Down Expand Up @@ -79,7 +80,16 @@ func InitializeAll(ctx context.Context, params Params) (*Components, error) {
}

pcClient := pb.NewPlatformConnectorClient(conn)
pub := publisher.New(pcClient)

strategyValue, ok := pb.ProcessingStrategy_value[params.ProcessingStrategy]
if !ok {
conn.Close()
return nil, fmt.Errorf("unexpected processingStrategy value: %q", params.ProcessingStrategy)
}

slog.Info("Event handling strategy configured", "processingStrategy", params.ProcessingStrategy)

pub := publisher.New(pcClient, pb.ProcessingStrategy(strategyValue))

mgr, err := createManager(params)
if err != nil {
Expand All @@ -104,7 +114,8 @@ func InitializeAll(ctx context.Context, params Params) (*Components, error) {
return nil, fmt.Errorf("failed to create policy evaluator: %w", err)
}

if err := registerControllers(mgr, evaluator, pub, cfg.Policies, params.MaxConcurrentReconciles); err != nil {
if err := registerControllers(mgr, evaluator, pub, cfg.Policies,
params.MaxConcurrentReconciles); err != nil {
conn.Close()
return nil, fmt.Errorf("failed to register controllers: %w", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,30 @@ const (
)

type Publisher struct {
pcClient pb.PlatformConnectorClient
pcClient pb.PlatformConnectorClient
processingStrategy pb.ProcessingStrategy
}

func New(client pb.PlatformConnectorClient) *Publisher {
func New(client pb.PlatformConnectorClient, processingStrategy pb.ProcessingStrategy) *Publisher {
return &Publisher{
pcClient: client,
pcClient: client,
processingStrategy: processingStrategy,
}
}

func (p *Publisher) PublishHealthEvent(ctx context.Context,
policy *config.Policy, nodeName string, isHealthy bool) error {
strategy := p.processingStrategy

if policy.HealthEvent.ProcessingStrategy != "" {
value, ok := pb.ProcessingStrategy_value[policy.HealthEvent.ProcessingStrategy]
if !ok {
return fmt.Errorf("unexpected processingStrategy value: %q", policy.HealthEvent.ProcessingStrategy)
}

strategy = pb.ProcessingStrategy(value)
}

event := &pb.HealthEvent{
Version: 1,
Agent: agentName,
Expand All @@ -57,6 +70,7 @@ func (p *Publisher) PublishHealthEvent(ctx context.Context,
NodeName: nodeName,
RecommendedAction: mapRecommendedAction(policy.HealthEvent.RecommendedAction),
ErrorCode: policy.HealthEvent.ErrorCode,
ProcessingStrategy: strategy,
}

healthEvents := &pb.HealthEvents{
Expand Down
46 changes: 46 additions & 0 deletions tests/data/k8s-rule-override.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: v1
data:
config.toml: |
[[policies]]
name = "node-test-condition"
enabled = true

[policies.resource]
group = ""
version = "v1"
kind = "Node"

[policies.predicate]
expression = '''

resource.status.conditions.filter(c, c.type == "TestCondition" && c.status == "False").size() > 0
'''

[policies.healthEvent]
componentClass = "Node"
isFatal = false
message = "Node test condition is not ready"
recommendedAction = "CONTACT_SUPPORT"
errorCode = ["NODE_TEST_CONDITION_NOT_READY"]
processingStrategy = "STORE_ONLY"
kind: ConfigMap
metadata:
labels:
app.kubernetes.io/instance: nvsentinel
app.kubernetes.io/name: kubernetes-object-monitor
name: kubernetes-object-monitor
namespace: nvsentinel
2 changes: 2 additions & 0 deletions tests/health_events_analyzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1678,6 +1678,8 @@ func TestHealthEventsAnalyzerProcessingStrategyRuleOverride(t *testing.T) {
})

feature.Teardown(func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context {
helpers.SendHealthyEvent(ctx, t, testCtx.NodeName)

return helpers.TeardownHealthEventsAnalyzer(ctx, t, c, testCtx.NodeName, testCtx.ConfigMapBackup)
})

Expand Down
50 changes: 18 additions & 32 deletions tests/helpers/health_events_analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,22 @@ func SetupHealthEventsAnalyzerTest(ctx context.Context,

clearHealthEventsAnalyzerConditions(ctx, t, gpuNodeName)

t.Log("Backing up current health-events-analyzer configmap")
if configMapPath != "" {
t.Log("Backing up current health-events-analyzer configmap")

backupData, err := BackupConfigMap(ctx, client, "health-events-analyzer-config", NVSentinelNamespace)
require.NoError(t, err)
t.Log("Backup created in memory")
backupData, err := BackupConfigMap(ctx, client, "health-events-analyzer-config", NVSentinelNamespace)
require.NoError(t, err)
t.Log("Backup created in memory")

testCtx.ConfigMapBackup = backupData

testCtx.ConfigMapBackup = backupData
err = createConfigMapFromFilePath(ctx, client, configMapPath, "health-events-analyzer-config", NVSentinelNamespace)
require.NoError(t, err)
}

err = applyHealthEventsAnalyzerConfigAndRestart(ctx, t, client, configMapPath)
t.Logf("Restarting %s deployment", HEALTH_EVENTS_ANALYZER_DEPLOYMENT_NAME)

err = RestartDeployment(ctx, t, client, HEALTH_EVENTS_ANALYZER_DEPLOYMENT_NAME, NVSentinelNamespace)
require.NoError(t, err)

return ctx, testCtx
Expand Down Expand Up @@ -167,29 +174,6 @@ func clearHealthEventsAnalyzerConditions(ctx context.Context, t *testing.T, node
SendHealthEvent(ctx, t, event)
}

func applyHealthEventsAnalyzerConfigAndRestart(
ctx context.Context, t *testing.T, client klient.Client, configMapPath string,
) error {
t.Helper()
t.Logf("Applying health-events-analyzer configmap: %s", configMapPath)

err := createConfigMapFromFilePath(ctx, client, configMapPath, "health-events-analyzer-config", NVSentinelNamespace)
if err != nil {
return err
}

t.Log("Restarting health-events-analyzer deployment")

err = RestartDeployment(ctx, t, client, "health-events-analyzer", NVSentinelNamespace)
if err != nil {
return err
}

WaitForDeploymentRollout(ctx, t, client, HEALTH_EVENTS_ANALYZER_DEPLOYMENT_NAME, NVSentinelNamespace)

return nil
}

func TriggerMultipleRemediationsCycle(ctx context.Context, t *testing.T, client klient.Client, nodeName string) {
xidsToInject := []string{ERRORCODE_79, ERRORCODE_48}

Expand Down Expand Up @@ -260,10 +244,12 @@ func restoreHealthEventsAnalyzerConfig(ctx context.Context, t *testing.T, c *env
client, err := c.NewClient()
require.NoError(t, err)

t.Log("Restoring configmap from memory")
if configMapBackup != nil {
t.Log("Restoring configmap from memory")

err = createConfigMapFromBytes(ctx, client, configMapBackup, "health-events-analyzer-config", NVSentinelNamespace)
require.NoError(t, err)
err = createConfigMapFromBytes(ctx, client, configMapBackup, "health-events-analyzer-config", NVSentinelNamespace)
require.NoError(t, err)
}

err = RestartDeployment(ctx, t, client, "health-events-analyzer", NVSentinelNamespace)
require.NoError(t, err)
Expand Down
43 changes: 43 additions & 0 deletions tests/helpers/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -2545,6 +2545,13 @@ func RestoreDeploymentArgs(
t *testing.T, ctx context.Context, c klient.Client,
deploymentName, namespace, containerName string, originalArgs []string,
) error {
if originalArgs == nil {
return nil
}

t.Helper()
t.Logf("Restoring args %v for deployment %s/%s container %s", originalArgs, namespace, deploymentName, containerName)

return retry.RetryOnConflict(retry.DefaultRetry, func() error {
deployment := &appsv1.Deployment{}
if err := c.Resources().Get(ctx, deploymentName, namespace, deployment); err != nil {
Expand Down Expand Up @@ -2573,3 +2580,39 @@ func RestoreDeploymentArgs(
return c.Resources().Update(ctx, deployment)
})
}

// DeleteExistingNodeEvents deletes Kubernetes node events for a given node name and event type and reason.
// This is useful for cleaning up test events that might interfere with subsequent tests.
func DeleteExistingNodeEvents(ctx context.Context, t *testing.T, c klient.Client,
nodeName, eventType, eventReason string) error {
t.Helper()
t.Logf("Deleting events for node %s with type=%s, reason=%s", nodeName, eventType, eventReason)

eventList, err := GetNodeEvents(ctx, c, nodeName, eventType)
if err != nil {
return fmt.Errorf("failed to get events for node %s: %w", nodeName, err)
}

deletedCount := 0

for _, event := range eventList.Items {
if eventReason != "" && event.Reason != eventReason {
continue
}

// Delete the event (events are in default namespace)
err := c.Resources().WithNamespace("default").Delete(ctx, &event)
if err != nil {
t.Logf("Warning: failed to delete event %s: %v", event.Name, err)
continue
}

deletedCount++

t.Logf("Deleted event: %s (type=%s, reason=%s)", event.Name, event.Type, event.Reason)
}

t.Logf("Deleted %d event(s) for node %s", deletedCount, nodeName)

return nil
}
78 changes: 78 additions & 0 deletions tests/helpers/kubernetes_object_monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package helpers

import (
"context"
"testing"

"github.com/stretchr/testify/require"
"sigs.k8s.io/e2e-framework/klient"
"sigs.k8s.io/e2e-framework/pkg/envconf"
)

const (
K8S_DEPLOYMENT_NAME = "kubernetes-object-monitor"
K8S_CONTAINER_NAME = "kubernetes-object-monitor"
)

type KubernetesObjectMonitorTestContext struct {
NodeName string
ConfigMapBackup []byte
TestNamespace string
}

func TeardownKubernetesObjectMonitor(
ctx context.Context, t *testing.T, c *envconf.Config, configMapBackup []byte, originalArgs []string,
) {
t.Helper()

client, err := c.NewClient()
require.NoError(t, err)

if configMapBackup != nil {
t.Log("Restoring configmap from memory")

err = createConfigMapFromBytes(ctx, client, configMapBackup, "kubernetes-object-monitor", NVSentinelNamespace)
require.NoError(t, err)

err = RestartDeployment(ctx, t, client, K8S_DEPLOYMENT_NAME, NVSentinelNamespace)
require.NoError(t, err)
}

err = RestoreDeploymentArgs(t, ctx, client, K8S_DEPLOYMENT_NAME, NVSentinelNamespace, K8S_CONTAINER_NAME, originalArgs)
require.NoError(t, err)

WaitForDeploymentRollout(ctx, t, client, K8S_DEPLOYMENT_NAME, NVSentinelNamespace)
}

func UpdateKubernetesObjectMonitorConfigMap(ctx context.Context, t *testing.T, client klient.Client,
configMapPath string, configName string) {
t.Helper()

if configMapPath == "" {
t.Fatalf("configMapPath is empty")
}

t.Logf("Updating configmap %s", configName)

err := createConfigMapFromFilePath(ctx, client, configMapPath, configName, NVSentinelNamespace)
require.NoError(t, err)

t.Logf("Restarting %s deployment", K8S_DEPLOYMENT_NAME)

err = RestartDeployment(ctx, t, client, K8S_DEPLOYMENT_NAME, NVSentinelNamespace)
require.NoError(t, err)
}
Loading
Loading