From 803c1a38e7529bb9e9d5c0b1713c834220a81f7e Mon Sep 17 00:00:00 2001 From: allencwang <9057208+allenwang28@users.noreply.github.com> Date: Thu, 4 Sep 2025 17:16:14 +0000 Subject: [PATCH 1/9] fix for slurm 24 --- torchx/schedulers/slurm_scheduler.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/torchx/schedulers/slurm_scheduler.py b/torchx/schedulers/slurm_scheduler.py index fde0fbf96..0982fb8fb 100644 --- a/torchx/schedulers/slurm_scheduler.py +++ b/torchx/schedulers/slurm_scheduler.py @@ -670,7 +670,16 @@ def _describe_squeue(self, app_id: str) -> Optional[DescribeAppResponse]: if state == AppState.PENDING: # NOTE: torchx launched jobs points to exactly one host # otherwise, scheduled_nodes could be a node list expression (eg. 'slurm-compute-node[0-20,21,45-47]') - hostname = job_resources.get("scheduled_nodes", "") + if job_resources is not None: + hostname = job_resources.get("scheduled_nodes", "") + # If scheduled_nodes not found in job_resources, try nodes.list + if not hostname and "nodes" in job_resources: + nodes_info = job_resources.get("nodes", {}) + if isinstance(nodes_info, dict): + hostname = nodes_info.get("list", "") + else: + # For pending jobs where job_resources is None, check top-level fields + hostname = job.get("nodes", "") or job.get("scheduled_nodes", "") role.num_replicas += 1 role_status.replicas.append( From 883ee81cbf17e5352dc89c3d34a305a5d20ecba0 Mon Sep 17 00:00:00 2001 From: allencwang <9057208+allenwang28@users.noreply.github.com> Date: Fri, 5 Sep 2025 14:19:26 +0000 Subject: [PATCH 2/9] a few more fixes --- torchx/schedulers/slurm_scheduler.py | 72 +++++++++++++------ .../schedulers/test/slurm_scheduler_test.py | 59 ++++++++++++++- 2 files changed, 107 insertions(+), 24 deletions(-) diff --git a/torchx/schedulers/slurm_scheduler.py b/torchx/schedulers/slurm_scheduler.py index 0982fb8fb..aff746b63 100644 --- a/torchx/schedulers/slurm_scheduler.py +++ b/torchx/schedulers/slurm_scheduler.py @@ -594,15 +594,26 @@ def _describe_sacct(self, app_id: str) -> Optional[DescribeAppResponse]: msg = "" app_state = AppState.UNKNOWN for row in reader: - job_id, *parts = row["JobID"].split("+") + # Handle both "+" (heterogeneous) and "." (regular) job ID formats + job_id_full = row["JobID"] + + # Split on both "+" and "." to handle different SLURM configurations + if "+" in job_id_full: + job_id, *parts = job_id_full.split("+") + is_subjob = len(parts) > 0 and "." in parts[0] + else: + job_id, *parts = job_id_full.split(".") + is_subjob = len(parts) > 0 + if job_id != app_id: continue - if len(parts) > 0 and "." in parts[0]: - # we only care about the worker not the child jobs + + if is_subjob: + # we only care about the main job not the child jobs (.batch, .0, etc.) continue - state = row["State"] - msg = state + msg = row["State"] + state = msg.split()[0].rstrip() app_state = appstate_from_slurm_state(state) role, _, replica_id = row["JobName"].rpartition("-") @@ -695,24 +706,28 @@ def _describe_squeue(self, app_id: str) -> Optional[DescribeAppResponse]: # where each replica is a "sub-job" so `allocated_nodes` will always be 1 # but we deal with jobs that have not been launched with torchx # which can have multiple hosts per sub-job (count them as replicas) - node_infos = job_resources.get("allocated_nodes", []) - - if not isinstance(node_infos, list): - # NOTE: in some versions of slurm jobs[].job_resources.allocated_nodes - # is not a list of individual nodes, but a map of the nodelist specs - # in this case just use jobs[].job_resources.nodes - hostname = job_resources.get("nodes") - role.num_replicas += 1 - role_status.replicas.append( - ReplicaStatus( - id=int(replica_id), - role=role_name, - state=state, - hostname=hostname, + nodes_data = job_resources.get("nodes", {}) + + if "allocation" in nodes_data and isinstance(nodes_data["allocation"], list): + # SLURM 24.11+ format: nodes.allocation is a list + for node_info in nodes_data["allocation"]: + hostname = node_info["name"] + cpu = int(node_info["cpus"]["used"]) + memMB = int(node_info["memory"]["allocated"]) // 1024 # Convert to MB + + role.resource = Resource(cpu=cpu, memMB=memMB, gpu=-1) + role.num_replicas += 1 + role_status.replicas.append( + ReplicaStatus( + id=int(replica_id), + role=role_name, + state=state, + hostname=hostname, + ) ) - ) - else: - for node_info in node_infos: + elif "allocated_nodes" in job_resources and isinstance(job_resources["allocated_nodes"], list): + # Legacy format: allocated_nodes is a list + for node_info in job_resources["allocated_nodes"]: # NOTE: we expect resource specs for all the nodes to be the same # NOTE: use allocated (not used/requested) memory since # users may only specify --cpu, in which case slurm @@ -735,6 +750,19 @@ def _describe_squeue(self, app_id: str) -> Optional[DescribeAppResponse]: hostname=hostname, ) ) + else: + # Fallback: use hostname from nodes.list + hostname = nodes_data.get("list", "") + role.num_replicas += 1 + role_status.replicas.append( + ReplicaStatus( + id=int(replica_id), + role=role_name, + state=state, + hostname=hostname, + ) + ) + return DescribeAppResponse( app_id=app_id, diff --git a/torchx/schedulers/test/slurm_scheduler_test.py b/torchx/schedulers/test/slurm_scheduler_test.py index ef7f3383e..5cac2d775 100644 --- a/torchx/schedulers/test/slurm_scheduler_test.py +++ b/torchx/schedulers/test/slurm_scheduler_test.py @@ -7,7 +7,8 @@ # pyre-strict import datetime -import importlib +from importlib import resources +import json import os import subprocess import tempfile @@ -455,7 +456,7 @@ def test_describe_sacct_running( def test_describe_squeue(self) -> None: with ( - importlib.resources.path(__package__, "slurm-squeue-output.json") as path, + resources.path(__package__, "slurm-squeue-output.json") as path, open(path) as fp, ): mock_output = fp.read() @@ -1048,3 +1049,57 @@ def test_no_gpu_resources(self) -> None: ).materialize() self.assertNotIn("--gpus-per-node", " ".join(sbatch)) self.assertNotIn("--gpus-per-task", " ".join(sbatch)) + + def test_describe_squeue_handles_none_job_resources(self): + """Test that describe handles job_resources=None without crashing (i.e. for SLURM 24.11.5).""" + + # Mock SLURM 24.11.5 response with job_resources=None + mock_job_data = { + "jobs": [{ + "name": "test-job-0", + "job_state": ["PENDING"], + "job_resources": None, # This was causing the crash + "nodes": "", + "scheduled_nodes": "", + "command": "/bin/echo", + "current_working_directory": "/tmp" + }] + } + + with patch('subprocess.check_output') as mock_subprocess: + mock_subprocess.return_value = json.dumps(mock_job_data) + + scheduler = SlurmScheduler("test") + result = scheduler._describe_squeue("123") + + # Should not crash and should return a valid response + assert result is not None + assert result.app_id == "123" + assert result.state == AppState.PENDING + + + def test_describe_sacct_handles_dot_separated_job_ids(self): + """Test that _describe_sacct handles job IDs with '.' separators (not just '+').""" + sacct_output = """JobID|JobName|Partition|Account|AllocCPUS|State|ExitCode +89|mesh0-0|all|root|8|CANCELLED by 2166|0:0 +89.batch|batch||root|8|CANCELLED|0:15 +89.0|process_allocator||root|8|CANCELLED|0:15 + """ + + with patch('subprocess.check_output') as mock_subprocess: + mock_subprocess.return_value = sacct_output + + scheduler = SlurmScheduler("test") + result = scheduler._describe_sacct("89") + print("result: ", result) + + # Should process only the main job "89", not the sub-jobs + assert result is not None + assert result.app_id == "89" + assert result.state == AppState.CANCELLED + assert result.msg == "CANCELLED by 2166" + + # Should have one role "mesh0" with one replica "0" + assert len(result.roles) == 1 + assert result.roles[0].name == "mesh0" + assert result.roles[0].num_replicas == 1 \ No newline at end of file From b3a4f14083a2e0f1c0b9a3b28d9e80d8755de037 Mon Sep 17 00:00:00 2001 From: allencwang <9057208+allenwang28@users.noreply.github.com> Date: Fri, 5 Sep 2025 14:36:39 +0000 Subject: [PATCH 3/9] add comments --- torchx/schedulers/slurm_scheduler.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/torchx/schedulers/slurm_scheduler.py b/torchx/schedulers/slurm_scheduler.py index aff746b63..a58f3aa8e 100644 --- a/torchx/schedulers/slurm_scheduler.py +++ b/torchx/schedulers/slurm_scheduler.py @@ -613,7 +613,8 @@ def _describe_sacct(self, app_id: str) -> Optional[DescribeAppResponse]: continue msg = row["State"] - state = msg.split()[0].rstrip() + # Remove truncation indicator (CANCELLED+) and extract base state from verbose formats + state = msg.split()[0].rstrip("+") app_state = appstate_from_slurm_state(state) role, _, replica_id = row["JobName"].rpartition("-") @@ -681,6 +682,8 @@ def _describe_squeue(self, app_id: str) -> Optional[DescribeAppResponse]: if state == AppState.PENDING: # NOTE: torchx launched jobs points to exactly one host # otherwise, scheduled_nodes could be a node list expression (eg. 'slurm-compute-node[0-20,21,45-47]') + + # SLURM 24.11.5+ returns job_resources=None for pending jobs (issue #1101) if job_resources is not None: hostname = job_resources.get("scheduled_nodes", "") # If scheduled_nodes not found in job_resources, try nodes.list @@ -708,6 +711,7 @@ def _describe_squeue(self, app_id: str) -> Optional[DescribeAppResponse]: # which can have multiple hosts per sub-job (count them as replicas) nodes_data = job_resources.get("nodes", {}) + # SLURM 24.11+ changed from allocated_nodes to nodes.allocation structure if "allocation" in nodes_data and isinstance(nodes_data["allocation"], list): # SLURM 24.11+ format: nodes.allocation is a list for node_info in nodes_data["allocation"]: From 65a417d5ada89204b01f33859b25712b025ec048 Mon Sep 17 00:00:00 2001 From: allencwang <9057208+allenwang28@users.noreply.github.com> Date: Fri, 5 Sep 2025 14:39:48 +0000 Subject: [PATCH 4/9] a few more comments --- torchx/schedulers/slurm_scheduler.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/torchx/schedulers/slurm_scheduler.py b/torchx/schedulers/slurm_scheduler.py index a58f3aa8e..8f43d8d0f 100644 --- a/torchx/schedulers/slurm_scheduler.py +++ b/torchx/schedulers/slurm_scheduler.py @@ -570,6 +570,8 @@ def describe(self, app_id: str) -> Optional[DescribeAppResponse]: return self._describe_sacct(app_id) def _describe_sacct(self, app_id: str) -> Optional[DescribeAppResponse]: + # NOTE: Handles multiple job ID formats due to SLURM version differences. + # Different clusters use heterogeneous (+) vs regular (.) job ID formats. try: output = subprocess.check_output( ["sacct", "--parsable2", "-j", app_id], @@ -641,6 +643,9 @@ def _describe_sacct(self, app_id: str) -> Optional[DescribeAppResponse]: ) def _describe_squeue(self, app_id: str) -> Optional[DescribeAppResponse]: + # NOTE: This method contains multiple compatibility checks for different SLURM versions + # due to API format changes across versions (20.02, 23.02, 24.05, 24.11+). + # squeue errors out with 'slurm_load_jobs error: Invalid job id specified' # if the job does not exist or is finished (e.g. not in PENDING or RUNNING state) output = subprocess.check_output( From e7289671efa49edb50b2d00a5e733aec53223278 Mon Sep 17 00:00:00 2001 From: allencwang <9057208+allenwang28@users.noreply.github.com> Date: Fri, 5 Sep 2025 14:48:26 +0000 Subject: [PATCH 5/9] linter --- torchx/schedulers/slurm_scheduler.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/torchx/schedulers/slurm_scheduler.py b/torchx/schedulers/slurm_scheduler.py index 8f43d8d0f..80d1d0245 100644 --- a/torchx/schedulers/slurm_scheduler.py +++ b/torchx/schedulers/slurm_scheduler.py @@ -598,7 +598,7 @@ def _describe_sacct(self, app_id: str) -> Optional[DescribeAppResponse]: for row in reader: # Handle both "+" (heterogeneous) and "." (regular) job ID formats job_id_full = row["JobID"] - + # Split on both "+" and "." to handle different SLURM configurations if "+" in job_id_full: job_id, *parts = job_id_full.split("+") @@ -606,10 +606,10 @@ def _describe_sacct(self, app_id: str) -> Optional[DescribeAppResponse]: else: job_id, *parts = job_id_full.split(".") is_subjob = len(parts) > 0 - + if job_id != app_id: continue - + if is_subjob: # we only care about the main job not the child jobs (.batch, .0, etc.) continue @@ -717,13 +717,17 @@ def _describe_squeue(self, app_id: str) -> Optional[DescribeAppResponse]: nodes_data = job_resources.get("nodes", {}) # SLURM 24.11+ changed from allocated_nodes to nodes.allocation structure - if "allocation" in nodes_data and isinstance(nodes_data["allocation"], list): + if "allocation" in nodes_data and isinstance( + nodes_data["allocation"], list + ): # SLURM 24.11+ format: nodes.allocation is a list for node_info in nodes_data["allocation"]: hostname = node_info["name"] cpu = int(node_info["cpus"]["used"]) - memMB = int(node_info["memory"]["allocated"]) // 1024 # Convert to MB - + memMB = ( + int(node_info["memory"]["allocated"]) // 1024 + ) # Convert to MB + role.resource = Resource(cpu=cpu, memMB=memMB, gpu=-1) role.num_replicas += 1 role_status.replicas.append( @@ -734,7 +738,9 @@ def _describe_squeue(self, app_id: str) -> Optional[DescribeAppResponse]: hostname=hostname, ) ) - elif "allocated_nodes" in job_resources and isinstance(job_resources["allocated_nodes"], list): + elif "allocated_nodes" in job_resources and isinstance( + job_resources["allocated_nodes"], list + ): # Legacy format: allocated_nodes is a list for node_info in job_resources["allocated_nodes"]: # NOTE: we expect resource specs for all the nodes to be the same @@ -772,7 +778,6 @@ def _describe_squeue(self, app_id: str) -> Optional[DescribeAppResponse]: ) ) - return DescribeAppResponse( app_id=app_id, roles=list(roles.values()), From 87bd3e4c5e3c105a8103eb811550a0f110e8c80c Mon Sep 17 00:00:00 2001 From: allencwang <9057208+allenwang28@users.noreply.github.com> Date: Fri, 5 Sep 2025 14:52:12 +0000 Subject: [PATCH 6/9] fix test linter --- .../schedulers/test/slurm_scheduler_test.py | 45 ++++++++++--------- 1 file changed, 23 insertions(+), 22 deletions(-) diff --git a/torchx/schedulers/test/slurm_scheduler_test.py b/torchx/schedulers/test/slurm_scheduler_test.py index 5cac2d775..631df2436 100644 --- a/torchx/schedulers/test/slurm_scheduler_test.py +++ b/torchx/schedulers/test/slurm_scheduler_test.py @@ -7,13 +7,13 @@ # pyre-strict import datetime -from importlib import resources import json import os import subprocess import tempfile import unittest from contextlib import contextmanager +from importlib import resources from typing import Generator from unittest.mock import call, MagicMock, patch @@ -1052,32 +1052,33 @@ def test_no_gpu_resources(self) -> None: def test_describe_squeue_handles_none_job_resources(self): """Test that describe handles job_resources=None without crashing (i.e. for SLURM 24.11.5).""" - + # Mock SLURM 24.11.5 response with job_resources=None mock_job_data = { - "jobs": [{ - "name": "test-job-0", - "job_state": ["PENDING"], - "job_resources": None, # This was causing the crash - "nodes": "", - "scheduled_nodes": "", - "command": "/bin/echo", - "current_working_directory": "/tmp" - }] + "jobs": [ + { + "name": "test-job-0", + "job_state": ["PENDING"], + "job_resources": None, # This was causing the crash + "nodes": "", + "scheduled_nodes": "", + "command": "/bin/echo", + "current_working_directory": "/tmp", + } + ] } - - with patch('subprocess.check_output') as mock_subprocess: + + with patch("subprocess.check_output") as mock_subprocess: mock_subprocess.return_value = json.dumps(mock_job_data) - + scheduler = SlurmScheduler("test") result = scheduler._describe_squeue("123") - + # Should not crash and should return a valid response assert result is not None assert result.app_id == "123" assert result.state == AppState.PENDING - def test_describe_sacct_handles_dot_separated_job_ids(self): """Test that _describe_sacct handles job IDs with '.' separators (not just '+').""" sacct_output = """JobID|JobName|Partition|Account|AllocCPUS|State|ExitCode @@ -1085,21 +1086,21 @@ def test_describe_sacct_handles_dot_separated_job_ids(self): 89.batch|batch||root|8|CANCELLED|0:15 89.0|process_allocator||root|8|CANCELLED|0:15 """ - - with patch('subprocess.check_output') as mock_subprocess: + + with patch("subprocess.check_output") as mock_subprocess: mock_subprocess.return_value = sacct_output - + scheduler = SlurmScheduler("test") result = scheduler._describe_sacct("89") print("result: ", result) - + # Should process only the main job "89", not the sub-jobs assert result is not None assert result.app_id == "89" assert result.state == AppState.CANCELLED assert result.msg == "CANCELLED by 2166" - + # Should have one role "mesh0" with one replica "0" assert len(result.roles) == 1 assert result.roles[0].name == "mesh0" - assert result.roles[0].num_replicas == 1 \ No newline at end of file + assert result.roles[0].num_replicas == 1 From 4bb027f7042ede7eafb0a163bb4bf1afd1062a25 Mon Sep 17 00:00:00 2001 From: allencwang <9057208+allenwang28@users.noreply.github.com> Date: Fri, 5 Sep 2025 14:57:32 +0000 Subject: [PATCH 7/9] pyre --- torchx/schedulers/test/slurm_scheduler_test.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/torchx/schedulers/test/slurm_scheduler_test.py b/torchx/schedulers/test/slurm_scheduler_test.py index 631df2436..fa9ebe512 100644 --- a/torchx/schedulers/test/slurm_scheduler_test.py +++ b/torchx/schedulers/test/slurm_scheduler_test.py @@ -245,7 +245,6 @@ def test_dryrun_multi_role(self, mock_version: MagicMock) -> None: ) script = req.materialize() - print(script) self.assertEqual( script, f"""#!/bin/bash @@ -1050,7 +1049,7 @@ def test_no_gpu_resources(self) -> None: self.assertNotIn("--gpus-per-node", " ".join(sbatch)) self.assertNotIn("--gpus-per-task", " ".join(sbatch)) - def test_describe_squeue_handles_none_job_resources(self): + def test_describe_squeue_handles_none_job_resources(self) -> None: """Test that describe handles job_resources=None without crashing (i.e. for SLURM 24.11.5).""" # Mock SLURM 24.11.5 response with job_resources=None @@ -1079,7 +1078,7 @@ def test_describe_squeue_handles_none_job_resources(self): assert result.app_id == "123" assert result.state == AppState.PENDING - def test_describe_sacct_handles_dot_separated_job_ids(self): + def test_describe_sacct_handles_dot_separated_job_ids(self) -> None: """Test that _describe_sacct handles job IDs with '.' separators (not just '+').""" sacct_output = """JobID|JobName|Partition|Account|AllocCPUS|State|ExitCode 89|mesh0-0|all|root|8|CANCELLED by 2166|0:0 @@ -1092,7 +1091,6 @@ def test_describe_sacct_handles_dot_separated_job_ids(self): scheduler = SlurmScheduler("test") result = scheduler._describe_sacct("89") - print("result: ", result) # Should process only the main job "89", not the sub-jobs assert result is not None From 2c91835451be2b4ab40e8e21c7f901f1adc4b863 Mon Sep 17 00:00:00 2001 From: allencwang <9057208+allenwang28@users.noreply.github.com> Date: Fri, 5 Sep 2025 15:23:17 +0000 Subject: [PATCH 8/9] fix test failure --- torchx/schedulers/slurm_scheduler.py | 10 ++++++- .../schedulers/test/slurm_scheduler_test.py | 26 +++++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/torchx/schedulers/slurm_scheduler.py b/torchx/schedulers/slurm_scheduler.py index 80d1d0245..e74afb6ac 100644 --- a/torchx/schedulers/slurm_scheduler.py +++ b/torchx/schedulers/slurm_scheduler.py @@ -767,7 +767,15 @@ def _describe_squeue(self, app_id: str) -> Optional[DescribeAppResponse]: ) else: # Fallback: use hostname from nodes.list - hostname = nodes_data.get("list", "") + if isinstance(nodes_data, str): + hostname = nodes_data + else: + hostname = ( + nodes_data.get("list", "") + if isinstance(nodes_data, dict) + else "" + ) + role.num_replicas += 1 role_status.replicas.append( ReplicaStatus( diff --git a/torchx/schedulers/test/slurm_scheduler_test.py b/torchx/schedulers/test/slurm_scheduler_test.py index fa9ebe512..fb1096454 100644 --- a/torchx/schedulers/test/slurm_scheduler_test.py +++ b/torchx/schedulers/test/slurm_scheduler_test.py @@ -1102,3 +1102,29 @@ def test_describe_sacct_handles_dot_separated_job_ids(self) -> None: assert len(result.roles) == 1 assert result.roles[0].name == "mesh0" assert result.roles[0].num_replicas == 1 + + def test_describe_squeue_nodes_as_string(self) -> None: + """Test when job_resources.nodes is a string (hostname) not a dict.""" + mock_job_data = { + "jobs": [ + { + "name": "test-job-0", + "job_state": ["RUNNING"], + "job_resources": { + "nodes": "compute-node-123" # String, not dict + # No allocated_nodes field + }, + "command": "/bin/echo", + "current_working_directory": "/tmp", + } + ] + } + + with patch("subprocess.check_output") as mock_subprocess: + mock_subprocess.return_value = json.dumps(mock_job_data) + + scheduler = SlurmScheduler("test") + result = scheduler._describe_squeue("123") + + assert result is not None + assert result.roles_statuses[0].replicas[0].hostname == "compute-node-123" From c277dae423b36b9d24dd5387b5524d77a6256813 Mon Sep 17 00:00:00 2001 From: allencwang <9057208+allenwang28@users.noreply.github.com> Date: Fri, 5 Sep 2025 15:53:42 +0000 Subject: [PATCH 9/9] CI: retrigger