diff --git a/distros/kubernetes/nvsentinel/charts/csp-health-monitor/templates/deployment.yaml b/distros/kubernetes/nvsentinel/charts/csp-health-monitor/templates/deployment.yaml index 5d64a7274..4aa3633d4 100644 --- a/distros/kubernetes/nvsentinel/charts/csp-health-monitor/templates/deployment.yaml +++ b/distros/kubernetes/nvsentinel/charts/csp-health-monitor/templates/deployment.yaml @@ -158,6 +158,7 @@ spec: - "--database-client-cert-mount-path={{ .Values.clientCertMountPath }}" - "--uds-path=/run/nvsentinel/nvsentinel.sock" - "--metrics-port=2113" + - "--processing-strategy={{ .Values.processingStrategy }}" resources: {{- toYaml .Values.quarantineTriggerEngine.resources | default .Values.resources | nindent 12 }} ports: diff --git a/distros/kubernetes/nvsentinel/charts/csp-health-monitor/values.yaml b/distros/kubernetes/nvsentinel/charts/csp-health-monitor/values.yaml index 8f69932c5..19dc5df3e 100644 --- a/distros/kubernetes/nvsentinel/charts/csp-health-monitor/values.yaml +++ b/distros/kubernetes/nvsentinel/charts/csp-health-monitor/values.yaml @@ -49,6 +49,13 @@ tolerations: [] podAnnotations: {} +# Processing strategy for health events +# valid values: EXECUTE_REMEDIATION, STORE_ONLY +# default: EXECUTE_REMEDIATION +# EXECUTE_REMEDIATION: normal behavior; downstream modules may update cluster state. +# STORE_ONLY: observability-only behavior; event should be persisted/exported but should not modify cluster resources (i.e., no node conditions, no quarantine, no drain, no remediation). +processingStrategy: EXECUTE_REMEDIATION + # Log verbosity level for the main CSP health monitor container (e.g. "debug", "info", "warn", "error") logLevel: info diff --git a/health-monitors/csp-health-monitor/cmd/maintenance-notifier/main.go b/health-monitors/csp-health-monitor/cmd/maintenance-notifier/main.go index c2c173c28..8ddbaba85 100644 --- a/health-monitors/csp-health-monitor/cmd/maintenance-notifier/main.go +++ b/health-monitors/csp-health-monitor/cmd/maintenance-notifier/main.go @@ -58,6 +58,7 @@ type appConfig struct { udsPath string databaseClientCertMountPath string metricsPort string + processingStrategy string } func parseFlags() *appConfig { @@ -71,6 +72,8 @@ func parseFlags() *appConfig { "Directory where database client tls.crt, tls.key, and ca.crt are mounted.", ) flag.StringVar(&cfg.metricsPort, "metrics-port", defaultMetricsPortSidecar, "Port for the sidecar Prometheus metrics.") + flag.StringVar(&cfg.processingStrategy, "processing-strategy", "EXECUTE_REMEDIATION", + "Event processing strategy: EXECUTE_REMEDIATION or STORE_ONLY") // Parse flags after initialising klog flag.Parse() @@ -213,7 +216,15 @@ func run() error { return fmt.Errorf("kubernetes client setup failed: %w", err) } - engine := trigger.NewEngine(cfg, store, platformConnectorClient, k8sClient) + value, ok := pb.ProcessingStrategy_value[appCfg.processingStrategy] + if !ok { + return fmt.Errorf("invalid processingStrategy %q (expected EXECUTE_REMEDIATION or STORE_ONLY)", + appCfg.processingStrategy) + } + + slog.Info("Event handling strategy configured", "processingStrategy", appCfg.processingStrategy) + + engine := trigger.NewEngine(cfg, store, platformConnectorClient, k8sClient, pb.ProcessingStrategy(value)) slog.Info("Trigger engine starting...") engine.Start(gCtx) diff --git a/health-monitors/csp-health-monitor/pkg/triggerengine/trigger.go b/health-monitors/csp-health-monitor/pkg/triggerengine/trigger.go index da3f93192..a47b78639 100644 --- a/health-monitors/csp-health-monitor/pkg/triggerengine/trigger.go +++ b/health-monitors/csp-health-monitor/pkg/triggerengine/trigger.go @@ -56,13 +56,14 @@ const ( // Engine polls the datastore for maintenance events and forwards the // corresponding health signals to NVSentinel through the UDS connector. type Engine struct { - store datastore.Store - udsClient pb.PlatformConnectorClient - config *config.Config - pollInterval time.Duration - k8sClient kubernetes.Interface - monitoredNodes sync.Map // Track which nodes are currently being monitored - monitorInterval time.Duration + store datastore.Store + udsClient pb.PlatformConnectorClient + config *config.Config + pollInterval time.Duration + k8sClient kubernetes.Interface + monitoredNodes sync.Map // Track which nodes are currently being monitored + monitorInterval time.Duration + processingStrategy pb.ProcessingStrategy } // NewEngine constructs a ready-to-run Engine instance. @@ -71,14 +72,16 @@ func NewEngine( store datastore.Store, udsClient pb.PlatformConnectorClient, k8sClient kubernetes.Interface, + processingStrategy pb.ProcessingStrategy, ) *Engine { return &Engine{ - config: cfg, - store: store, - udsClient: udsClient, - pollInterval: time.Duration(cfg.MaintenanceEventPollIntervalSeconds) * time.Second, - k8sClient: k8sClient, - monitorInterval: defaultMonitorInterval, + config: cfg, + store: store, + udsClient: udsClient, + pollInterval: time.Duration(cfg.MaintenanceEventPollIntervalSeconds) * time.Second, + k8sClient: k8sClient, + monitorInterval: defaultMonitorInterval, + processingStrategy: processingStrategy, } } @@ -343,13 +346,14 @@ func (e *Engine) mapMaintenanceEventToHealthEvent( } healthEvent := &pb.HealthEvent{ - Agent: "csp-health-monitor", // Consistent agent name - ComponentClass: event.ResourceType, // e.g., "EC2", "gce_instance" - CheckName: "CSPMaintenance", // Consistent check name - IsFatal: isFatal, - IsHealthy: isHealthy, - Message: message, - RecommendedAction: pb.RecommendedAction(actionEnum), + Agent: "csp-health-monitor", // Consistent agent name + ComponentClass: event.ResourceType, // e.g., "EC2", "gce_instance" + CheckName: "CSPMaintenance", // Consistent check name + IsFatal: isFatal, + IsHealthy: isHealthy, + ProcessingStrategy: e.processingStrategy, + Message: message, + RecommendedAction: pb.RecommendedAction(actionEnum), EntitiesImpacted: []*pb.Entity{ { EntityType: event.ResourceType, @@ -359,9 +363,6 @@ func (e *Engine) mapMaintenanceEventToHealthEvent( Metadata: event.Metadata, // Pass along metadata NodeName: event.NodeName, // K8s node name GeneratedTimestamp: timestamppb.New(time.Now()), - // TODO: Remove hardcoded processing strategy and make it configurable via the config file. - // PR: https://github.com/NVIDIA/NVSentinel/pull/641 - ProcessingStrategy: pb.ProcessingStrategy_EXECUTE_REMEDIATION, } return healthEvent, nil diff --git a/health-monitors/csp-health-monitor/pkg/triggerengine/trigger_test.go b/health-monitors/csp-health-monitor/pkg/triggerengine/trigger_test.go index 812fd0a25..97eea0ba0 100644 --- a/health-monitors/csp-health-monitor/pkg/triggerengine/trigger_test.go +++ b/health-monitors/csp-health-monitor/pkg/triggerengine/trigger_test.go @@ -190,7 +190,7 @@ func TestNewEngine(t *testing.T) { mUDSClient := new(MockUDSClient) mockClient := createMockClientWithReadyNodes() - engine := NewEngine(cfg, mStore, mUDSClient, mockClient) + engine := NewEngine(cfg, mStore, mUDSClient, mockClient, pb.ProcessingStrategy_EXECUTE_REMEDIATION) assert.NotNil(t, engine) assert.Equal(t, cfg, engine.config) @@ -204,7 +204,7 @@ func TestMapMaintenanceEventToHealthEvent(t *testing.T) { cfg := newTestConfig() mStore := new(MockDatastore) // Not strictly needed for this func, but engine needs it mUDSClient := new(MockUDSClient) // Not strictly needed for this func, but engine needs it - engine := NewEngine(cfg, mStore, mUDSClient, nil) + engine := NewEngine(cfg, mStore, mUDSClient, nil, pb.ProcessingStrategy_EXECUTE_REMEDIATION) tests := []struct { name string @@ -612,7 +612,7 @@ func TestProcessAndSendTrigger(t *testing.T) { t.Run(tc.name, func(t *testing.T) { mStore := new(MockDatastore) mUDSClient := new(MockUDSClient) - engine := NewEngine(cfg, mStore, mUDSClient, nil) + engine := NewEngine(cfg, mStore, mUDSClient, nil, pb.ProcessingStrategy_EXECUTE_REMEDIATION) tc.setupMocks(mStore, mUDSClient, tc.event, tc.targetDBStatus) @@ -793,7 +793,7 @@ func TestCheckAndTriggerEvents(t *testing.T) { mStore := new(MockDatastore) mUDSClient := new(MockUDSClient) mockClient := createMockClientWithReadyNodes("node-q1", "node-h1", "q-no-node") - engine := NewEngine(cfg, mStore, mUDSClient, mockClient) + engine := NewEngine(cfg, mStore, mUDSClient, mockClient, pb.ProcessingStrategy_EXECUTE_REMEDIATION) if tc.setupMocks != nil { tc.setupMocks(mStore, mUDSClient) @@ -839,7 +839,7 @@ func TestHealthyTriggerWaitsForNodeReady(t *testing.T) { mUDSClient.On("HealthEventOccurredV1", mock.Anything, mock.Anything, mock.Anything).Return(&emptypb.Empty{}, nil).Once() mStore.On("UpdateEventStatus", mock.AnythingOfType("*context.timerCtx"), healthyEvent.EventID, model.StatusHealthyTriggered).Return(nil).Once() - engine := NewEngine(cfg, mStore, mUDSClient, mockClient) + engine := NewEngine(cfg, mStore, mUDSClient, mockClient, pb.ProcessingStrategy_EXECUTE_REMEDIATION) engine.monitorInterval = 3 * time.Second err := engine.checkAndTriggerEvents(ctx) diff --git a/tests/csp_health_monitor_test.go b/tests/csp_health_monitor_test.go index 701d9242f..9ae13584b 100644 --- a/tests/csp_health_monitor_test.go +++ b/tests/csp_health_monitor_test.go @@ -413,3 +413,107 @@ func TestCSPHealthMonitorQuarantineThreshold(t *testing.T) { testEnv.Test(t, feature.Feature()) } + +// TestCSPHealthMonitorStoreOnlyProcessingStrategy verifies the STORE_ONLY processing strategy: +// The event is stored in the database and exported as a CloudEvent, but does not trigger any cordoning or draining. +func TestCSPHealthMonitorStoreOnlyProcessingStrategy(t *testing.T) { + feature := features.New("Processing Strategy"). + WithLabel("suite", "csp-health-monitor") + + var testCtx *helpers.CSPHealthMonitorTestContext + var injectedEventID string + var testInstanceID string + + feature.Setup(func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context { + client, err := c.NewClient() + require.NoError(t, err) + + err = helpers.SetDeploymentArgs(ctx, t, client, "csp-health-monitor", helpers.NVSentinelNamespace, "maintenance-notifier", map[string]string{ + "--processing-strategy": "STORE_ONLY", + }) + require.NoError(t, err) + + helpers.WaitForDeploymentRollout(ctx, t, client, "csp-health-monitor", helpers.NVSentinelNamespace) + + var newCtx context.Context + newCtx, testCtx = helpers.SetupCSPHealthMonitorTest(ctx, t, c, helpers.CSPGCP) + + t.Log("Clearing any existing GCP events from mock API") + require.NoError(t, testCtx.CSPClient.ClearEvents(helpers.CSPGCP), "failed to clear GCP events") + + testInstanceID = fmt.Sprintf("%d", time.Now().UnixNano()) + + t.Logf("Adding GCP instance annotation to node %s (instance_id=%s, zone=us-central1-a)", testCtx.NodeName, testInstanceID) + require.NoError(t, helpers.AddGCPInstanceIDAnnotation(ctx, client, testCtx.NodeName, testInstanceID, "us-central1-a")) + + node, err := helpers.GetNodeByName(ctx, client, testCtx.NodeName) + require.NoError(t, err) + require.Equal(t, testInstanceID, node.Annotations["container.googleapis.com/instance_id"], "GCP instance_id annotation not set") + require.Equal(t, "us-central1-a", node.Labels["topology.kubernetes.io/zone"], "zone label not set") + t.Log("Verified: node annotations and labels are set correctly") + + // Wait for the monitor to complete at least one poll cycle + helpers.WaitForCSPHealthMonitorPoll(t, testCtx.CSPClient, helpers.CSPGCP) + + return newCtx + }) + + feature.Assess("Injecting PENDING maintenance event and verifying node was not cordoned when processing STORE_ONLY strategy", func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context { + t.Log("Injecting GCP maintenance event with PENDING status into mock Cloud Logging API") + + scheduledStart := time.Now().Add(15 * time.Minute) + scheduledEnd := time.Now().Add(75 * time.Minute) + event := helpers.CSPMaintenanceEvent{ + CSP: helpers.CSPGCP, + InstanceID: testInstanceID, + NodeName: testCtx.NodeName, + Zone: "us-central1-a", + ProjectID: "test-project", + Status: "PENDING", + EventTypeCode: "compute.instances.upcomingMaintenance", + MaintenanceType: "SCHEDULED", + ScheduledStart: &scheduledStart, + ScheduledEnd: &scheduledEnd, + Description: "Scheduled maintenance for GCP instance - e2e test", + } + + var err error + injectedEventID, _, err = testCtx.CSPClient.InjectEvent(event) + require.NoError(t, err) + t.Logf("Event injected: ID=%s, instanceID=%s, scheduledStart=%s", injectedEventID, testInstanceID, scheduledStart.Format(time.RFC3339)) + + // Verify event was stored in mock + eventCount, err := testCtx.CSPClient.GetEventCount(helpers.CSPGCP) + require.NoError(t, err, "failed to get event count from mock") + require.Equal(t, 1, eventCount, "expected 1 event in mock store after injection") + t.Logf("Verified: mock store has %d GCP event(s)", eventCount) + + client, err := c.NewClient() + require.NoError(t, err) + + helpers.EnsureNodeConditionNotPresent(ctx, t, client, testCtx.NodeName, "CSPMaintenance") + t.Log("Verifying node was not cordoned when processing STORE_ONLY strategy") + helpers.AssertQuarantineState(ctx, t, client, testCtx.NodeName, helpers.QuarantineAssertion{ + ExpectCordoned: false, + ExpectAnnotation: false, + }) + + return ctx + }) + + feature.Teardown(func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context { + client, err := c.NewClient() + require.NoError(t, err) + + helpers.RemoveDeploymentArgs(ctx, client, "csp-health-monitor", helpers.NVSentinelNamespace, "maintenance-notifier", map[string]string{ + "--processing-strategy": "STORE_ONLY", + }) + + helpers.WaitForDeploymentRollout(ctx, t, client, "csp-health-monitor", helpers.NVSentinelNamespace) + + helpers.TeardownCSPHealthMonitorTest(ctx, t, c, testCtx) + + return ctx + }) + testEnv.Test(t, feature.Feature()) +} diff --git a/tests/event_exporter_test.go b/tests/event_exporter_test.go index a11295da1..260a61c60 100644 --- a/tests/event_exporter_test.go +++ b/tests/event_exporter_test.go @@ -22,6 +22,8 @@ import ( "testing" "time" + "tests/helpers" + "github.com/stretchr/testify/require" appsv1 "k8s.io/api/apps/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -29,7 +31,6 @@ import ( "sigs.k8s.io/e2e-framework/klient/wait/conditions" "sigs.k8s.io/e2e-framework/pkg/envconf" "sigs.k8s.io/e2e-framework/pkg/features" - "tests/helpers" ) func TestEventExporterChangeStream(t *testing.T) { @@ -81,7 +82,7 @@ func TestEventExporterChangeStream(t *testing.T) { t.Log("Validating received CloudEvent") require.NotNil(t, receivedEvent) - helpers.ValidateCloudEvent(t, receivedEvent, nodeName, testMessage, "GpuXidError", "79") + helpers.ValidateCloudEvent(t, receivedEvent, nodeName, testMessage, "GpuXidError", "79", "EXECUTE_REMEDIATION") return ctx }) diff --git a/tests/helpers/event_exporter.go b/tests/helpers/event_exporter.go index 523ecd2a9..af8629f74 100644 --- a/tests/helpers/event_exporter.go +++ b/tests/helpers/event_exporter.go @@ -222,6 +222,7 @@ func ValidateCloudEvent( t *testing.T, event map[string]any, expectedNodeName, expectedMessage, expectedCheckName, expectedErrorCode string, + expectedProcessingStrategy string, ) { t.Helper() t.Logf("Validating CloudEvent: %+v", event) @@ -241,5 +242,6 @@ func ValidateCloudEvent( require.Equal(t, expectedCheckName, healthEvent["checkName"]) require.Equal(t, expectedNodeName, healthEvent["nodeName"]) require.Equal(t, expectedMessage, healthEvent["message"]) + require.Equal(t, expectedProcessingStrategy, healthEvent["processingStrategy"]) require.Contains(t, healthEvent["errorCode"], expectedErrorCode) } diff --git a/tests/helpers/kube.go b/tests/helpers/kube.go index fe9fea7ee..f5b4c1ca7 100755 --- a/tests/helpers/kube.go +++ b/tests/helpers/kube.go @@ -2228,3 +2228,206 @@ func VerifyLogFilesUploaded(ctx context.Context, t *testing.T, c klient.Client, t.Logf("✓ Log files verified for node %s", nodeName) } + +// WaitForDaemonSetRollout waits for a DaemonSet to complete its rollout. +// It checks that all pods are updated and ready. +func WaitForDaemonSetRollout(ctx context.Context, t *testing.T, client klient.Client, name string) { + t.Helper() + + t.Logf("Waiting for daemonset %s/%s rollout to complete", NVSentinelNamespace, name) + + require.Eventually(t, func() bool { + daemonSet := &appsv1.DaemonSet{} + if err := client.Resources().Get(ctx, name, NVSentinelNamespace, daemonSet); err != nil { + t.Logf("Failed to get daemonset: %v", err) + return false + } + + // Check if all desired pods are scheduled, updated, and ready + if daemonSet.Status.DesiredNumberScheduled == 0 { + t.Logf("DaemonSet has no desired pods scheduled yet") + return false + } + + if daemonSet.Status.UpdatedNumberScheduled != daemonSet.Status.DesiredNumberScheduled { + t.Logf("DaemonSet rollout in progress: %d/%d pods updated", + daemonSet.Status.UpdatedNumberScheduled, daemonSet.Status.DesiredNumberScheduled) + + return false + } + + if daemonSet.Status.NumberReady != daemonSet.Status.DesiredNumberScheduled { + t.Logf("DaemonSet rollout in progress: %d/%d pods ready", + daemonSet.Status.NumberReady, daemonSet.Status.DesiredNumberScheduled) + + return false + } + + t.Logf("DaemonSet %s/%s rollout complete: %d/%d pods ready and updated", + NVSentinelNamespace, name, daemonSet.Status.NumberReady, daemonSet.Status.DesiredNumberScheduled) + + return true + }, EventuallyWaitTimeout, WaitInterval, "daemonset %s/%s rollout should complete", NVSentinelNamespace, name) + + t.Logf("DaemonSet %s/%s rollout completed successfully", NVSentinelNamespace, name) +} + +// SetDeploymentArgs sets or updates arguments for containers in a deployment. +// If containerName is empty, applies to all containers. Otherwise, applies only to the named container. +// Uses retry.RetryOnConflict for automatic retry handling. +// Args is a map of flag to value, e.g., {"--processing-strategy": "STORE_ONLY", "--verbose": ""}. +func SetDeploymentArgs( + ctx context.Context, t *testing.T, + c klient.Client, deploymentName, namespace, containerName string, args map[string]string, +) error { + return retry.RetryOnConflict(retry.DefaultRetry, func() error { + deployment := &appsv1.Deployment{} + if err := c.Resources().Get(ctx, deploymentName, namespace, deployment); err != nil { + return err + } + + if len(deployment.Spec.Template.Spec.Containers) == 0 { + return fmt.Errorf("deployment %s/%s has no containers", namespace, deploymentName) + } + + updatedContainer := false + + for i := range deployment.Spec.Template.Spec.Containers { + container := &deployment.Spec.Template.Spec.Containers[i] + + if containerName != "" && container.Name != containerName { + continue + } + + setArgsOnContainer(t, container, args) + + updatedContainer = true + } + + if containerName != "" && !updatedContainer { + return fmt.Errorf("container %q not found in deployment %s/%s", containerName, namespace, deploymentName) + } + + return c.Resources().Update(ctx, deployment) + }) +} + +func setArgsOnContainer(t *testing.T, container *v1.Container, args map[string]string) { + t.Helper() + t.Logf("Setting args %v on container %s", args, container.Name) + + for flag, value := range args { + found := false + + for j := 0; j < len(container.Args); j++ { + if tryUpdateExistingArg(container, j, flag, value) { + found = true + break + } + } + + if !found { + if value != "" { + container.Args = append(container.Args, flag+"="+value) + } else { + container.Args = append(container.Args, flag) + } + } + } +} + +// tryUpdateExistingArg attempts to update an existing argument at position j. +// Returns true if the argument was found and updated. +func tryUpdateExistingArg(container *v1.Container, j int, flag, value string) bool { + existingArg := container.Args[j] + + // Match --flag=value style + if strings.HasPrefix(existingArg, flag+"=") { + if value != "" { + container.Args[j] = flag + "=" + value + } else { + container.Args[j] = flag + } + + return true + } + + // Match --flag or --flag value style + if existingArg == flag { + if value != "" { + if j+1 < len(container.Args) && !strings.HasPrefix(container.Args[j+1], "-") { + container.Args[j+1] = value + } else { + container.Args = append(container.Args[:j+1], append([]string{value}, container.Args[j+1:]...)...) + } + } + + return true + } + + return false +} + +// RemoveDeploymentArgs removes arguments from containers in a deployment. +// If containerName is empty, removes from all containers. Otherwise, removes only from the named container. +// Uses retry.RetryOnConflict for automatic retry handling. +func RemoveDeploymentArgs( + ctx context.Context, c klient.Client, deploymentName, namespace, containerName string, args map[string]string, +) error { + return retry.RetryOnConflict(retry.DefaultRetry, func() error { + deployment := &appsv1.Deployment{} + if err := c.Resources().Get(ctx, deploymentName, namespace, deployment); err != nil { + return err + } + + if len(deployment.Spec.Template.Spec.Containers) == 0 { + return fmt.Errorf("deployment %s/%s has no containers", namespace, deploymentName) + } + + updatedContainer := false + + for i := range deployment.Spec.Template.Spec.Containers { + container := &deployment.Spec.Template.Spec.Containers[i] + + if containerName != "" && container.Name != containerName { + continue + } + + removeArgsFromContainer(container, args) + + updatedContainer = true + } + + if containerName != "" && !updatedContainer { + return fmt.Errorf("container %q not found in deployment %s/%s", containerName, namespace, deploymentName) + } + + return c.Resources().Update(ctx, deployment) + }) +} + +func removeArgsFromContainer(container *v1.Container, args map[string]string) { + for flag := range args { + for j := 0; j < len(container.Args); j++ { + existingArg := container.Args[j] + + // Match --flag=value style + if strings.HasPrefix(existingArg, flag+"=") { + container.Args = append(container.Args[:j], container.Args[j+1:]...) + break + } + + // Match --flag or --flag value style + + if existingArg == flag { + if j+1 < len(container.Args) && !strings.HasPrefix(container.Args[j+1], "-") { + container.Args = append(container.Args[:j], container.Args[j+2:]...) + } else { + container.Args = append(container.Args[:j], container.Args[j+1:]...) + } + + break + } + } + } +}