Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ spec:
- {{ .Values.dcgm.dcgmK8sServiceEnabled | quote }}
- --metadata-path
- {{ .Values.global.metadataPath | quote }}
- --processing-strategy
- {{ .Values.processingStrategy | quote }}
securityContext:
runAsUser: 0
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default ((.Values.global).image).tag | default .Chart.AppVersion }}-dcgm-3.x"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ spec:
- {{ .Values.dcgm.dcgmK8sServiceEnabled | quote }}
- --metadata-path
- {{ .Values.global.metadataPath | quote }}
- --processing-strategy
- {{ .Values.processingStrategy | quote }}
securityContext:
runAsUser: 0
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default ((.Values.global).image).tag | default .Chart.AppVersion }}-dcgm-4.x"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,10 @@ tolerations: []
podAnnotations: {}

verbose: "False"

# Processing strategy for health events
# valid values: EXECUTE_REMEDIATION, STORE_ONLY
# default: EXECUTE_REMEDIATION
# EXECUTE_REMEDIATION: normal behavior; downstream modules may update cluster state.
# STORE_ONLY: observability-only behavior; event should be persisted/exported but should not modify cluster resources (i.e., no node conditions, no quarantine, no drain, no remediation).
processingStrategy: EXECUTE_REMEDIATION
22 changes: 21 additions & 1 deletion health-monitors/gpu-health-monitor/gpu_health_monitor/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import csv
from .dcgm_watcher import dcgm
from .platform_connector import platform_connector
from gpu_health_monitor.protos import health_event_pb2
from gpu_health_monitor.protos import health_event_pb2 as platformconnector_pb2
from gpu_health_monitor.logger import set_default_structured_logger_with_level


Expand All @@ -34,6 +34,7 @@ def _init_event_processor(
state_file_path: str,
dcgm_health_conditions_categorization_mapping_config: dict[str, str],
metadata_path: str,
processing_strategy: platformconnector_pb2.ProcessingStrategy,
):
platform_connector_config = config["eventprocessors.platformconnector"]
match event_processor_name:
Expand All @@ -46,6 +47,7 @@ def _init_event_processor(
state_file_path=state_file_path,
dcgm_health_conditions_categorization_mapping_config=dcgm_health_conditions_categorization_mapping_config,
metadata_path=metadata_path,
processing_strategy=processing_strategy,
)
case _:
log.fatal(f"Unknown event processor {event_processor_name}")
Expand All @@ -69,6 +71,13 @@ def _init_event_processor(
help="Path to GPU metadata JSON file",
required=False,
)
@click.option(
"--processing-strategy",
type=str,
default="EXECUTE_REMEDIATION",
help="Event processing strategy: EXECUTE_REMEDIATION or STORE_ONLY",
required=False,
)
def cli(
dcgm_addr,
dcgm_error_mapping_config_file,
Expand All @@ -78,6 +87,7 @@ def cli(
state_file,
dcgm_k8s_service_enabled,
metadata_path,
processing_strategy,
):
exit = Event()
config = configparser.ConfigParser()
Expand Down Expand Up @@ -112,6 +122,15 @@ def cli(
f"dcgm error {row[0]} dcgm_error_name {dcgm_errors_info_dict[row[0]]} dcgm_error_recommended_action {row[1]}"
)

try:
processing_strategy_value = platformconnector_pb2.ProcessingStrategy.Value(processing_strategy)
except ValueError:
valid_strategies = list(platformconnector_pb2.ProcessingStrategy.keys())
log.fatal(f"Invalid processing_strategy '{processing_strategy}'. " f"Valid options are: {valid_strategies}")
sys.exit(1)

log.info(f"Event handling strategy configured to: {processing_strategy_value}")

log.info("Initialization completed")
enabled_event_processor_names = cli_config["EnabledEventProcessors"].split(",")
enabled_event_processors = []
Expand All @@ -126,6 +145,7 @@ def cli(
state_file_path,
dcgm_health_conditions_categorization_mapping_config,
metadata_path,
processing_strategy_value,
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def __init__(
state_file_path: str,
dcgm_health_conditions_categorization_mapping_config: dict[str, str],
metadata_path: str,
processing_strategy: platformconnector_pb2.ProcessingStrategy,
) -> None:
self._exit = exit
self._socket_path = socket_path
Expand All @@ -62,6 +63,7 @@ def __init__(
self.entity_cache: dict[str, CachedEntityState] = {}
self.dcgm_health_conditions_categorization_mapping_config = dcgm_health_conditions_categorization_mapping_config
self._metadata_reader = MetadataReader(metadata_path)
self._processing_strategy = processing_strategy

def read_old_system_bootid_from_state_file(self) -> str:
bootid = ""
Expand Down Expand Up @@ -115,6 +117,7 @@ def clear_dcgm_connectivity_failure(self, timestamp: Timestamp):
recommendedAction=platformconnector_pb2.NONE,
nodeName=self._node_name,
metadata=event_metadata,
processingStrategy=self._processing_strategy,
)
health_events.append(health_event)

Expand Down Expand Up @@ -215,6 +218,7 @@ def health_event_occurred(
recommendedAction=recommended_action,
nodeName=self._node_name,
metadata=event_metadata,
processingStrategy=self._processing_strategy,
)
)
severity = (
Expand Down Expand Up @@ -278,6 +282,7 @@ def health_event_occurred(
recommendedAction=platformconnector_pb2.NONE,
nodeName=self._node_name,
metadata=event_metadata,
processingStrategy=self._processing_strategy,
)
)
severity = (
Expand Down Expand Up @@ -372,6 +377,7 @@ def dcgm_connectivity_failed(self):
recommendedAction=platformconnector_pb2.CONTACT_SUPPORT,
nodeName=self._node_name,
metadata=event_metadata,
processingStrategy=self._processing_strategy,
)
health_events.append(health_event)
metrics.dcgm_health_active_events.labels(event_type=check_name, gpu_id="", severity="fatal").set(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ def test_health_event_occurred(self):
"statefile",
dcgm_health_conditions_categorization_mapping_config,
"/tmp/test_metadata.json",
platformconnector_pb2.STORE_ONLY,
)
dcgm_health_events = watcher._get_health_status_dict()
dcgm_health_events["DCGM_HEALTH_WATCH_INFOROM"] = dcgmtypes.HealthDetails(
Expand Down Expand Up @@ -238,6 +239,7 @@ def test_health_event_multiple_failures_same_gpu(self):
"statefile",
dcgm_health_conditions_categorization_mapping_config,
"/tmp/test_metadata.json",
platformconnector_pb2.STORE_ONLY,
)

# Simulate multiple NvLink failures for GPU 0 (4 links down: 8, 9, 14, 15)
Expand Down Expand Up @@ -296,6 +298,8 @@ def test_health_event_multiple_failures_same_gpu(self):
# Verify the complete aggregated message is preserved
assert nvlink_failure_event.message == aggregated_message

assert nvlink_failure_event.processingStrategy == platformconnector_pb2.STORE_ONLY

server.stop(0)

def test_health_event_multiple_gpus_multiple_failures_each(self):
Expand Down Expand Up @@ -352,6 +356,7 @@ def test_health_event_multiple_gpus_multiple_failures_each(self):
"statefile",
dcgm_health_conditions_categorization_mapping_config,
"/tmp/test_metadata.json",
platformconnector_pb2.STORE_ONLY,
)

# Simulate multiple NvLink failures for GPU 0 and GPU 1
Expand Down Expand Up @@ -413,6 +418,7 @@ def test_health_event_multiple_gpus_multiple_failures_each(self):
assert "link 15" in gpu0_event.message
assert gpu0_event.message.count(";") == 3
assert gpu0_event.message == gpu0_message
assert gpu0_event.processingStrategy == platformconnector_pb2.STORE_ONLY

# Verify GPU 1 event
assert gpu1_event is not None, "NvLink failure event for GPU 1 not found"
Expand All @@ -427,6 +433,7 @@ def test_health_event_multiple_gpus_multiple_failures_each(self):
assert "link 13" in gpu1_event.message
assert gpu1_event.message.count(";") == 3
assert gpu1_event.message == gpu1_message
assert gpu1_event.processingStrategy == platformconnector_pb2.STORE_ONLY

server.stop(0)

Expand Down Expand Up @@ -463,6 +470,7 @@ def test_dcgm_connectivity_failed(self):
state_file_path=state_file_path,
dcgm_health_conditions_categorization_mapping_config=dcgm_health_conditions_categorization_mapping_config,
metadata_path="/tmp/test_metadata.json",
processing_strategy=platformconnector_pb2.STORE_ONLY,
)

# Trigger connectivity failure
Expand All @@ -482,6 +490,7 @@ def test_dcgm_connectivity_failed(self):
assert event.recommendedAction == platformconnector_pb2.CONTACT_SUPPORT
assert event.nodeName == node_name
assert event.entitiesImpacted == []
assert event.processingStrategy == platformconnector_pb2.STORE_ONLY

server.stop(0)
finally:
Expand Down Expand Up @@ -511,6 +520,7 @@ def test_dcgm_connectivity_restored(self):
state_file_path="statefile",
dcgm_health_conditions_categorization_mapping_config=dcgm_health_conditions_categorization_mapping_config,
metadata_path="/tmp/test_metadata.json",
processing_strategy=platformconnector_pb2.EXECUTE_REMEDIATION,
)

timestamp = Timestamp()
Expand All @@ -536,6 +546,7 @@ def test_dcgm_connectivity_restored(self):
assert restored_event.errorCode == []
assert restored_event.message == "DCGM connectivity reported no errors"
assert restored_event.recommendedAction == platformconnector_pb2.NONE
assert restored_event.processingStrategy == platformconnector_pb2.EXECUTE_REMEDIATION

server.stop(0)

Expand Down Expand Up @@ -589,6 +600,7 @@ def test_event_retry_and_cache_cleanup_when_platform_connector_down(self):
state_file_path=state_file_path,
dcgm_health_conditions_categorization_mapping_config=dcgm_health_conditions_categorization_mapping_config,
metadata_path="/tmp/test_metadata.json",
processing_strategy=platformconnector_pb2.STORE_ONLY,
)

# Verify cache is empty initially
Expand Down
113 changes: 107 additions & 6 deletions tests/gpu_health_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,19 @@ import (
)

const (
dcgmServiceHost = "nvidia-dcgm.gpu-operator.svc"
dcgmServicePort = "5555"
gpuOperatorNamespace = "gpu-operator"
dcgmServiceName = "nvidia-dcgm"
dcgmOriginalPort = 5555
dcgmBrokenPort = 1555
dcgmServiceHost = "nvidia-dcgm.gpu-operator.svc"
dcgmServicePort = "5555"
gpuOperatorNamespace = "gpu-operator"
dcgmServiceName = "nvidia-dcgm"
dcgmOriginalPort = 5555
dcgmBrokenPort = 1555
GPUHealthMonitorContainerName = "gpu-health-monitor"
GPUHealthMonitorDaemonSetName = "gpu-health-monitor-dcgm-4.x"
)

const (
keyGpuHealthMonitorPodName contextKey = "gpuHealthMonitorPodName"
keyGpuHealthMonitorOriginalArgs contextKey = "originalArgs"
)

// TestGPUHealthMonitorMultipleErrors verifies GPU health monitor handles multiple concurrent errors
Expand Down Expand Up @@ -646,3 +653,97 @@ func TestGpuNvlinkWatchSemicolonMessageParsing(t *testing.T) {

testEnv.Test(t, feature.Feature())
}

func TestGpuHealthMonitorStoreOnlyEvents(t *testing.T) {
feature := features.New("GPU Health Monitor - Store Only Events").
WithLabel("suite", "gpu-health-monitor").
WithLabel("component", "store-only-events")

var testNodeName string
var gpuHealthMonitorPodName string

feature.Setup(func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context {
client, err := c.NewClient()
require.NoError(t, err, "failed to create kubernetes client")

originalArgs, err := helpers.UpdateDaemonSetArgs(ctx, t, client, GPUHealthMonitorDaemonSetName, GPUHealthMonitorContainerName, map[string]string{
"--processing-strategy": "STORE_ONLY"})
require.NoError(t, err, "failed to update GPU health monitor processing strategy")

gpuHealthMonitorPod, err := helpers.GetDaemonSetPodOnWorkerNode(ctx, t, client, GPUHealthMonitorDaemonSetName, "gpu-health-monitor-dcgm-4.x")
require.NoError(t, err, "failed to find GPU health monitor pod on worker node")
require.NotNil(t, gpuHealthMonitorPod, "GPU health monitor pod should exist on worker node")

testNodeName = gpuHealthMonitorPod.Spec.NodeName
gpuHealthMonitorPodName = gpuHealthMonitorPod.Name
t.Logf("Using GPU health monitor pod: %s on node: %s", gpuHealthMonitorPodName, testNodeName)

metadata := helpers.CreateTestMetadata(testNodeName)
helpers.InjectMetadata(t, ctx, client, helpers.NVSentinelNamespace, testNodeName, metadata)

ctx = context.WithValue(ctx, keyNodeName, testNodeName)
ctx = context.WithValue(ctx, keyGpuHealthMonitorPodName, gpuHealthMonitorPodName)
ctx = context.WithValue(ctx, keyGpuHealthMonitorOriginalArgs, originalArgs)

restConfig := client.RESTConfig()

nodeName := ctx.Value(keyNodeName).(string)
podName := ctx.Value(keyGpuHealthMonitorPodName).(string)

t.Logf("Injecting Inforom error on node %s", nodeName)
cmd := []string{"/bin/sh", "-c",
fmt.Sprintf("dcgmi test --host %s:%s --inject --gpuid 0 -f 84 -v 0",
dcgmServiceHost, dcgmServicePort)}

stdout, stderr, execErr := helpers.ExecInPod(ctx, restConfig, helpers.NVSentinelNamespace, podName, "", cmd)
require.NoError(t, execErr, "failed to inject Inforom error: %s", stderr)
require.Contains(t, stdout, "Successfully injected", "Inforom error injection failed")

return ctx
})

feature.Assess("Cluster state remains unaffected", func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context {
client, err := c.NewClient()
require.NoError(t, err, "failed to create kubernetes client")

nodeName := ctx.Value(keyNodeName).(string)

t.Logf("Checking node condition is not applied on node %s", nodeName)
helpers.EnsureNodeConditionNotPresent(ctx, t, client, nodeName, "GpuInforomWatch")

t.Log("Verifying node was not cordoned")
helpers.AssertQuarantineState(ctx, t, client, nodeName, helpers.QuarantineAssertion{
ExpectCordoned: false,
ExpectAnnotation: false,
})

return ctx
})

feature.Teardown(func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context {
client, err := c.NewClient()
require.NoError(t, err, "failed to create kubernetes client")

nodeName := ctx.Value(keyNodeName).(string)
originalArgs := ctx.Value(keyGpuHealthMonitorOriginalArgs).([]string)
podName := ctx.Value(keyGpuHealthMonitorPodName).(string)

restConfig := client.RESTConfig()

t.Logf("Clearing injected errors on node %s before restoring DaemonSet", nodeName)
cmd := []string{"/bin/sh", "-c",
fmt.Sprintf("dcgmi test --host %s:%s --inject --gpuid 0 -f %s -v %s",
dcgmServiceHost, dcgmServicePort, "84", "1")}
_, _, _ = helpers.ExecInPod(ctx, restConfig, helpers.NVSentinelNamespace, podName, "", cmd)

helpers.RestoreDaemonSetArgs(ctx, t, client, GPUHealthMonitorDaemonSetName, GPUHealthMonitorContainerName, originalArgs)

t.Logf("Cleaning up metadata from node %s", nodeName)
helpers.DeleteMetadata(t, ctx, client, helpers.NVSentinelNamespace, nodeName)

return ctx

})

testEnv.Test(t, feature.Feature())
}
6 changes: 3 additions & 3 deletions tests/helpers/syslog-health-monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const (
// helper function to set up syslog health monitor and port forward to it.
// Returns the node name, pod, stop channel, and original args (for restoration during teardown).
func SetUpSyslogHealthMonitor(ctx context.Context, t *testing.T,
client klient.Client, args map[string]string) (string, *v1.Pod, chan struct{}, []string) {
client klient.Client, args map[string]string, setManagedByNVSentinel bool) (string, *v1.Pod, chan struct{}, []string) {
var err error

var syslogPod *v1.Pod
Expand Down Expand Up @@ -66,8 +66,8 @@ func SetUpSyslogHealthMonitor(ctx context.Context, t *testing.T,
<-readyChan
t.Log("Port-forward ready")

t.Logf("Setting ManagedByNVSentinel=false on node %s", testNodeName)
err = SetNodeManagedByNVSentinel(ctx, client, testNodeName, false)
t.Logf("Setting ManagedByNVSentinel=%t on node %s", setManagedByNVSentinel, testNodeName)
err = SetNodeManagedByNVSentinel(ctx, client, testNodeName, setManagedByNVSentinel)
require.NoError(t, err, "failed to set ManagedByNVSentinel label")

return testNodeName, syslogPod, stopChan, originalArgs
Expand Down
Loading
Loading