diff --git a/distros/kubernetes/nvsentinel/charts/gpu-health-monitor/templates/daemonset-dcgm-3.x.yaml b/distros/kubernetes/nvsentinel/charts/gpu-health-monitor/templates/daemonset-dcgm-3.x.yaml index b3448d377..dcfefc303 100644 --- a/distros/kubernetes/nvsentinel/charts/gpu-health-monitor/templates/daemonset-dcgm-3.x.yaml +++ b/distros/kubernetes/nvsentinel/charts/gpu-health-monitor/templates/daemonset-dcgm-3.x.yaml @@ -58,6 +58,8 @@ spec: - {{ .Values.dcgm.dcgmK8sServiceEnabled | quote }} - --metadata-path - {{ .Values.global.metadataPath | quote }} + - --processing-strategy + - {{ .Values.processingStrategy | quote }} securityContext: runAsUser: 0 image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default ((.Values.global).image).tag | default .Chart.AppVersion }}-dcgm-3.x" diff --git a/distros/kubernetes/nvsentinel/charts/gpu-health-monitor/templates/daemonset-dcgm-4.x.yaml b/distros/kubernetes/nvsentinel/charts/gpu-health-monitor/templates/daemonset-dcgm-4.x.yaml index d277eaf3c..c82071276 100644 --- a/distros/kubernetes/nvsentinel/charts/gpu-health-monitor/templates/daemonset-dcgm-4.x.yaml +++ b/distros/kubernetes/nvsentinel/charts/gpu-health-monitor/templates/daemonset-dcgm-4.x.yaml @@ -58,6 +58,8 @@ spec: - {{ .Values.dcgm.dcgmK8sServiceEnabled | quote }} - --metadata-path - {{ .Values.global.metadataPath | quote }} + - --processing-strategy + - {{ .Values.processingStrategy | quote }} securityContext: runAsUser: 0 image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default ((.Values.global).image).tag | default .Chart.AppVersion }}-dcgm-4.x" diff --git a/distros/kubernetes/nvsentinel/charts/gpu-health-monitor/values.yaml b/distros/kubernetes/nvsentinel/charts/gpu-health-monitor/values.yaml index f7041a5d2..9f5ee7588 100644 --- a/distros/kubernetes/nvsentinel/charts/gpu-health-monitor/values.yaml +++ b/distros/kubernetes/nvsentinel/charts/gpu-health-monitor/values.yaml @@ -53,3 +53,10 @@ tolerations: [] podAnnotations: {} verbose: "False" + +# Processing strategy for health events +# valid values: EXECUTE_REMEDIATION, STORE_ONLY +# default: EXECUTE_REMEDIATION +# EXECUTE_REMEDIATION: normal behavior; downstream modules may update cluster state. +# STORE_ONLY: observability-only behavior; event should be persisted/exported but should not modify cluster resources (i.e., no node conditions, no quarantine, no drain, no remediation). +processingStrategy: EXECUTE_REMEDIATION diff --git a/health-monitors/gpu-health-monitor/gpu_health_monitor/cli.py b/health-monitors/gpu-health-monitor/gpu_health_monitor/cli.py index 47d9d6765..f63659846 100644 --- a/health-monitors/gpu-health-monitor/gpu_health_monitor/cli.py +++ b/health-monitors/gpu-health-monitor/gpu_health_monitor/cli.py @@ -21,7 +21,7 @@ import csv from .dcgm_watcher import dcgm from .platform_connector import platform_connector -from gpu_health_monitor.protos import health_event_pb2 +from gpu_health_monitor.protos import health_event_pb2 as platformconnector_pb2 from gpu_health_monitor.logger import set_default_structured_logger_with_level @@ -34,6 +34,7 @@ def _init_event_processor( state_file_path: str, dcgm_health_conditions_categorization_mapping_config: dict[str, str], metadata_path: str, + processing_strategy: platformconnector_pb2.ProcessingStrategy, ): platform_connector_config = config["eventprocessors.platformconnector"] match event_processor_name: @@ -46,6 +47,7 @@ def _init_event_processor( state_file_path=state_file_path, dcgm_health_conditions_categorization_mapping_config=dcgm_health_conditions_categorization_mapping_config, metadata_path=metadata_path, + processing_strategy=processing_strategy, ) case _: log.fatal(f"Unknown event processor {event_processor_name}") @@ -69,6 +71,13 @@ def _init_event_processor( help="Path to GPU metadata JSON file", required=False, ) +@click.option( + "--processing-strategy", + type=str, + default="EXECUTE_REMEDIATION", + help="Event processing strategy: EXECUTE_REMEDIATION or STORE_ONLY", + required=False, +) def cli( dcgm_addr, dcgm_error_mapping_config_file, @@ -78,6 +87,7 @@ def cli( state_file, dcgm_k8s_service_enabled, metadata_path, + processing_strategy, ): exit = Event() config = configparser.ConfigParser() @@ -112,6 +122,15 @@ def cli( f"dcgm error {row[0]} dcgm_error_name {dcgm_errors_info_dict[row[0]]} dcgm_error_recommended_action {row[1]}" ) + try: + processing_strategy_value = platformconnector_pb2.ProcessingStrategy.Value(processing_strategy) + except ValueError: + valid_strategies = list(platformconnector_pb2.ProcessingStrategy.keys()) + log.fatal(f"Invalid processing_strategy '{processing_strategy}'. " f"Valid options are: {valid_strategies}") + sys.exit(1) + + log.info(f"Event handling strategy configured to: {processing_strategy_value}") + log.info("Initialization completed") enabled_event_processor_names = cli_config["EnabledEventProcessors"].split(",") enabled_event_processors = [] @@ -126,6 +145,7 @@ def cli( state_file_path, dcgm_health_conditions_categorization_mapping_config, metadata_path, + processing_strategy_value, ) ) diff --git a/health-monitors/gpu-health-monitor/gpu_health_monitor/platform_connector/platform_connector.py b/health-monitors/gpu-health-monitor/gpu_health_monitor/platform_connector/platform_connector.py index 53274c696..a69100d92 100644 --- a/health-monitors/gpu-health-monitor/gpu_health_monitor/platform_connector/platform_connector.py +++ b/health-monitors/gpu-health-monitor/gpu_health_monitor/platform_connector/platform_connector.py @@ -48,6 +48,7 @@ def __init__( state_file_path: str, dcgm_health_conditions_categorization_mapping_config: dict[str, str], metadata_path: str, + processing_strategy: platformconnector_pb2.ProcessingStrategy, ) -> None: self._exit = exit self._socket_path = socket_path @@ -62,6 +63,7 @@ def __init__( self.entity_cache: dict[str, CachedEntityState] = {} self.dcgm_health_conditions_categorization_mapping_config = dcgm_health_conditions_categorization_mapping_config self._metadata_reader = MetadataReader(metadata_path) + self._processing_strategy = processing_strategy def read_old_system_bootid_from_state_file(self) -> str: bootid = "" @@ -115,6 +117,7 @@ def clear_dcgm_connectivity_failure(self, timestamp: Timestamp): recommendedAction=platformconnector_pb2.NONE, nodeName=self._node_name, metadata=event_metadata, + processingStrategy=self._processing_strategy, ) health_events.append(health_event) @@ -215,6 +218,7 @@ def health_event_occurred( recommendedAction=recommended_action, nodeName=self._node_name, metadata=event_metadata, + processingStrategy=self._processing_strategy, ) ) severity = ( @@ -278,6 +282,7 @@ def health_event_occurred( recommendedAction=platformconnector_pb2.NONE, nodeName=self._node_name, metadata=event_metadata, + processingStrategy=self._processing_strategy, ) ) severity = ( @@ -372,6 +377,7 @@ def dcgm_connectivity_failed(self): recommendedAction=platformconnector_pb2.CONTACT_SUPPORT, nodeName=self._node_name, metadata=event_metadata, + processingStrategy=self._processing_strategy, ) health_events.append(health_event) metrics.dcgm_health_active_events.labels(event_type=check_name, gpu_id="", severity="fatal").set(1) diff --git a/health-monitors/gpu-health-monitor/gpu_health_monitor/tests/test_platform_connector/test_platform_connector.py b/health-monitors/gpu-health-monitor/gpu_health_monitor/tests/test_platform_connector/test_platform_connector.py index ed9e96f6e..b2c74ba5c 100644 --- a/health-monitors/gpu-health-monitor/gpu_health_monitor/tests/test_platform_connector/test_platform_connector.py +++ b/health-monitors/gpu-health-monitor/gpu_health_monitor/tests/test_platform_connector/test_platform_connector.py @@ -104,6 +104,7 @@ def test_health_event_occurred(self): "statefile", dcgm_health_conditions_categorization_mapping_config, "/tmp/test_metadata.json", + platformconnector_pb2.STORE_ONLY, ) dcgm_health_events = watcher._get_health_status_dict() dcgm_health_events["DCGM_HEALTH_WATCH_INFOROM"] = dcgmtypes.HealthDetails( @@ -238,6 +239,7 @@ def test_health_event_multiple_failures_same_gpu(self): "statefile", dcgm_health_conditions_categorization_mapping_config, "/tmp/test_metadata.json", + platformconnector_pb2.STORE_ONLY, ) # Simulate multiple NvLink failures for GPU 0 (4 links down: 8, 9, 14, 15) @@ -296,6 +298,8 @@ def test_health_event_multiple_failures_same_gpu(self): # Verify the complete aggregated message is preserved assert nvlink_failure_event.message == aggregated_message + assert nvlink_failure_event.processingStrategy == platformconnector_pb2.STORE_ONLY + server.stop(0) def test_health_event_multiple_gpus_multiple_failures_each(self): @@ -352,6 +356,7 @@ def test_health_event_multiple_gpus_multiple_failures_each(self): "statefile", dcgm_health_conditions_categorization_mapping_config, "/tmp/test_metadata.json", + platformconnector_pb2.STORE_ONLY, ) # Simulate multiple NvLink failures for GPU 0 and GPU 1 @@ -413,6 +418,7 @@ def test_health_event_multiple_gpus_multiple_failures_each(self): assert "link 15" in gpu0_event.message assert gpu0_event.message.count(";") == 3 assert gpu0_event.message == gpu0_message + assert gpu0_event.processingStrategy == platformconnector_pb2.STORE_ONLY # Verify GPU 1 event assert gpu1_event is not None, "NvLink failure event for GPU 1 not found" @@ -427,6 +433,7 @@ def test_health_event_multiple_gpus_multiple_failures_each(self): assert "link 13" in gpu1_event.message assert gpu1_event.message.count(";") == 3 assert gpu1_event.message == gpu1_message + assert gpu1_event.processingStrategy == platformconnector_pb2.STORE_ONLY server.stop(0) @@ -463,6 +470,7 @@ def test_dcgm_connectivity_failed(self): state_file_path=state_file_path, dcgm_health_conditions_categorization_mapping_config=dcgm_health_conditions_categorization_mapping_config, metadata_path="/tmp/test_metadata.json", + processing_strategy=platformconnector_pb2.STORE_ONLY, ) # Trigger connectivity failure @@ -482,6 +490,7 @@ def test_dcgm_connectivity_failed(self): assert event.recommendedAction == platformconnector_pb2.CONTACT_SUPPORT assert event.nodeName == node_name assert event.entitiesImpacted == [] + assert event.processingStrategy == platformconnector_pb2.STORE_ONLY server.stop(0) finally: @@ -511,6 +520,7 @@ def test_dcgm_connectivity_restored(self): state_file_path="statefile", dcgm_health_conditions_categorization_mapping_config=dcgm_health_conditions_categorization_mapping_config, metadata_path="/tmp/test_metadata.json", + processing_strategy=platformconnector_pb2.EXECUTE_REMEDIATION, ) timestamp = Timestamp() @@ -536,6 +546,7 @@ def test_dcgm_connectivity_restored(self): assert restored_event.errorCode == [] assert restored_event.message == "DCGM connectivity reported no errors" assert restored_event.recommendedAction == platformconnector_pb2.NONE + assert restored_event.processingStrategy == platformconnector_pb2.EXECUTE_REMEDIATION server.stop(0) @@ -589,6 +600,7 @@ def test_event_retry_and_cache_cleanup_when_platform_connector_down(self): state_file_path=state_file_path, dcgm_health_conditions_categorization_mapping_config=dcgm_health_conditions_categorization_mapping_config, metadata_path="/tmp/test_metadata.json", + processing_strategy=platformconnector_pb2.STORE_ONLY, ) # Verify cache is empty initially diff --git a/tests/gpu_health_monitor_test.go b/tests/gpu_health_monitor_test.go index 0a76cef11..83b99ce07 100644 --- a/tests/gpu_health_monitor_test.go +++ b/tests/gpu_health_monitor_test.go @@ -32,12 +32,19 @@ import ( ) const ( - dcgmServiceHost = "nvidia-dcgm.gpu-operator.svc" - dcgmServicePort = "5555" - gpuOperatorNamespace = "gpu-operator" - dcgmServiceName = "nvidia-dcgm" - dcgmOriginalPort = 5555 - dcgmBrokenPort = 1555 + dcgmServiceHost = "nvidia-dcgm.gpu-operator.svc" + dcgmServicePort = "5555" + gpuOperatorNamespace = "gpu-operator" + dcgmServiceName = "nvidia-dcgm" + dcgmOriginalPort = 5555 + dcgmBrokenPort = 1555 + GPUHealthMonitorContainerName = "gpu-health-monitor" + GPUHealthMonitorDaemonSetName = "gpu-health-monitor-dcgm-4.x" +) + +const ( + keyGpuHealthMonitorPodName contextKey = "gpuHealthMonitorPodName" + keyGpuHealthMonitorOriginalArgs contextKey = "originalArgs" ) // TestGPUHealthMonitorMultipleErrors verifies GPU health monitor handles multiple concurrent errors @@ -646,3 +653,97 @@ func TestGpuNvlinkWatchSemicolonMessageParsing(t *testing.T) { testEnv.Test(t, feature.Feature()) } + +func TestGpuHealthMonitorStoreOnlyEvents(t *testing.T) { + feature := features.New("GPU Health Monitor - Store Only Events"). + WithLabel("suite", "gpu-health-monitor"). + WithLabel("component", "store-only-events") + + var testNodeName string + var gpuHealthMonitorPodName string + + feature.Setup(func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context { + client, err := c.NewClient() + require.NoError(t, err, "failed to create kubernetes client") + + originalArgs, err := helpers.UpdateDaemonSetArgs(ctx, t, client, GPUHealthMonitorDaemonSetName, GPUHealthMonitorContainerName, map[string]string{ + "--processing-strategy": "STORE_ONLY"}) + require.NoError(t, err, "failed to update GPU health monitor processing strategy") + + gpuHealthMonitorPod, err := helpers.GetDaemonSetPodOnWorkerNode(ctx, t, client, GPUHealthMonitorDaemonSetName, "gpu-health-monitor-dcgm-4.x") + require.NoError(t, err, "failed to find GPU health monitor pod on worker node") + require.NotNil(t, gpuHealthMonitorPod, "GPU health monitor pod should exist on worker node") + + testNodeName = gpuHealthMonitorPod.Spec.NodeName + gpuHealthMonitorPodName = gpuHealthMonitorPod.Name + t.Logf("Using GPU health monitor pod: %s on node: %s", gpuHealthMonitorPodName, testNodeName) + + metadata := helpers.CreateTestMetadata(testNodeName) + helpers.InjectMetadata(t, ctx, client, helpers.NVSentinelNamespace, testNodeName, metadata) + + ctx = context.WithValue(ctx, keyNodeName, testNodeName) + ctx = context.WithValue(ctx, keyGpuHealthMonitorPodName, gpuHealthMonitorPodName) + ctx = context.WithValue(ctx, keyGpuHealthMonitorOriginalArgs, originalArgs) + + restConfig := client.RESTConfig() + + nodeName := ctx.Value(keyNodeName).(string) + podName := ctx.Value(keyGpuHealthMonitorPodName).(string) + + t.Logf("Injecting Inforom error on node %s", nodeName) + cmd := []string{"/bin/sh", "-c", + fmt.Sprintf("dcgmi test --host %s:%s --inject --gpuid 0 -f 84 -v 0", + dcgmServiceHost, dcgmServicePort)} + + stdout, stderr, execErr := helpers.ExecInPod(ctx, restConfig, helpers.NVSentinelNamespace, podName, "", cmd) + require.NoError(t, execErr, "failed to inject Inforom error: %s", stderr) + require.Contains(t, stdout, "Successfully injected", "Inforom error injection failed") + + return ctx + }) + + feature.Assess("Cluster state remains unaffected", func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context { + client, err := c.NewClient() + require.NoError(t, err, "failed to create kubernetes client") + + nodeName := ctx.Value(keyNodeName).(string) + + t.Logf("Checking node condition is not applied on node %s", nodeName) + helpers.EnsureNodeConditionNotPresent(ctx, t, client, nodeName, "GpuInforomWatch") + + t.Log("Verifying node was not cordoned") + helpers.AssertQuarantineState(ctx, t, client, nodeName, helpers.QuarantineAssertion{ + ExpectCordoned: false, + ExpectAnnotation: false, + }) + + return ctx + }) + + feature.Teardown(func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context { + client, err := c.NewClient() + require.NoError(t, err, "failed to create kubernetes client") + + nodeName := ctx.Value(keyNodeName).(string) + originalArgs := ctx.Value(keyGpuHealthMonitorOriginalArgs).([]string) + podName := ctx.Value(keyGpuHealthMonitorPodName).(string) + + restConfig := client.RESTConfig() + + t.Logf("Clearing injected errors on node %s before restoring DaemonSet", nodeName) + cmd := []string{"/bin/sh", "-c", + fmt.Sprintf("dcgmi test --host %s:%s --inject --gpuid 0 -f %s -v %s", + dcgmServiceHost, dcgmServicePort, "84", "1")} + _, _, _ = helpers.ExecInPod(ctx, restConfig, helpers.NVSentinelNamespace, podName, "", cmd) + + helpers.RestoreDaemonSetArgs(ctx, t, client, GPUHealthMonitorDaemonSetName, GPUHealthMonitorContainerName, originalArgs) + + t.Logf("Cleaning up metadata from node %s", nodeName) + helpers.DeleteMetadata(t, ctx, client, helpers.NVSentinelNamespace, nodeName) + + return ctx + + }) + + testEnv.Test(t, feature.Feature()) +} diff --git a/tests/helpers/syslog-health-monitor.go b/tests/helpers/syslog-health-monitor.go index 16acb1ef1..88c94d21d 100644 --- a/tests/helpers/syslog-health-monitor.go +++ b/tests/helpers/syslog-health-monitor.go @@ -32,7 +32,7 @@ const ( // helper function to set up syslog health monitor and port forward to it. // Returns the node name, pod, stop channel, and original args (for restoration during teardown). func SetUpSyslogHealthMonitor(ctx context.Context, t *testing.T, - client klient.Client, args map[string]string) (string, *v1.Pod, chan struct{}, []string) { + client klient.Client, args map[string]string, setManagedByNVSentinel bool) (string, *v1.Pod, chan struct{}, []string) { var err error var syslogPod *v1.Pod @@ -66,8 +66,8 @@ func SetUpSyslogHealthMonitor(ctx context.Context, t *testing.T, <-readyChan t.Log("Port-forward ready") - t.Logf("Setting ManagedByNVSentinel=false on node %s", testNodeName) - err = SetNodeManagedByNVSentinel(ctx, client, testNodeName, false) + t.Logf("Setting ManagedByNVSentinel=%t on node %s", setManagedByNVSentinel, testNodeName) + err = SetNodeManagedByNVSentinel(ctx, client, testNodeName, setManagedByNVSentinel) require.NoError(t, err, "failed to set ManagedByNVSentinel label") return testNodeName, syslogPod, stopChan, originalArgs diff --git a/tests/syslog_health_monitor_test.go b/tests/syslog_health_monitor_test.go index 993574d6e..5b8cf784b 100644 --- a/tests/syslog_health_monitor_test.go +++ b/tests/syslog_health_monitor_test.go @@ -21,6 +21,7 @@ import ( "context" "strings" "testing" + "time" "tests/helpers" @@ -49,7 +50,7 @@ func TestSyslogHealthMonitorXIDDetection(t *testing.T) { client, err := c.NewClient() require.NoError(t, err, "failed to create kubernetes client") - testNodeName, syslogPod, stopChan, originalArgs := helpers.SetUpSyslogHealthMonitor(ctx, t, client, nil) + testNodeName, syslogPod, stopChan, originalArgs := helpers.SetUpSyslogHealthMonitor(ctx, t, client, nil, false) ctx = context.WithValue(ctx, keySyslogNodeName, testNodeName) ctx = context.WithValue(ctx, keySyslogPodName, syslogPod.Name) @@ -350,7 +351,7 @@ func TestSyslogHealthMonitorSXIDDetection(t *testing.T) { client, err := c.NewClient() require.NoError(t, err, "failed to create kubernetes client") - testNodeName, syslogPod, stopChan, originalArgs := helpers.SetUpSyslogHealthMonitor(ctx, t, client, nil) + testNodeName, syslogPod, stopChan, originalArgs := helpers.SetUpSyslogHealthMonitor(ctx, t, client, nil, false) ctx = context.WithValue(ctx, keySyslogNodeName, testNodeName) ctx = context.WithValue(ctx, keySyslogPodName, syslogPod.Name) @@ -426,7 +427,7 @@ func TestSyslogHealthMonitorStoreOnlyStrategy(t *testing.T) { testNodeName, syslogPod, stopChan, originalArgs := helpers.SetUpSyslogHealthMonitor(ctx, t, client, map[string]string{ "--processing-strategy": "STORE_ONLY", - }) + }, true) ctx = context.WithValue(ctx, keySyslogNodeName, testNodeName) ctx = context.WithValue(ctx, keyStopChan, stopChan) @@ -447,8 +448,29 @@ func TestSyslogHealthMonitorStoreOnlyStrategy(t *testing.T) { helpers.InjectSyslogMessages(t, helpers.StubJournalHTTPPort, xidMessages) - t.Log("Verifying no node condition is created") - helpers.EnsureNodeConditionNotPresent(ctx, t, client, nodeName, "SysLogsXIDError") + // The syslog monitor processes journal messages at 15sec interval. The timeout is set to + // 20 seconds to ensure the syslog monitor has sufficient time to process messages and insert + // events into the database before verifying that the node condition is not present. + require.Never(t, func() bool { + node, err := helpers.GetNodeByName(ctx, client, nodeName) + if err != nil { + t.Logf("failed to get node %s: %v", nodeName, err) + return false + } + + for _, condition := range node.Status.Conditions { + if condition.Status == v1.ConditionTrue && condition.Reason == "SysLogsXIDErrorIsNotHealthy" { + t.Logf("ERROR: Found unexpected node condition: Type=%s, Reason=%s, Status=%s, Message=%s", + condition.Type, condition.Reason, condition.Status, condition.Message) + + return true + } + } + + t.Logf("Node %s correctly does not have a condition with check name '%s'", nodeName, "SysLogsXIDError") + + return false + }, 20*time.Second, helpers.WaitInterval, "node %s should NOT have a condition with check name %s", nodeName, "SysLogsXIDError") t.Log("Verifying node was not cordoned") helpers.AssertQuarantineState(ctx, t, client, nodeName, helpers.QuarantineAssertion{