From 95fdb4ad3bb823b9d9d5e2ca42f548a1b0114588 Mon Sep 17 00:00:00 2001 From: "ERDC\\RDITLSCW" Date: Wed, 22 Jan 2025 15:24:48 -0600 Subject: [PATCH 01/13] added initial framework for slurm support for status and submit functions --- uit/uit.py | 62 ++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 51 insertions(+), 11 deletions(-) diff --git a/uit/uit.py b/uit/uit.py index dccc663..d934fc1 100644 --- a/uit/uit.py +++ b/uit/uit.py @@ -11,6 +11,7 @@ from functools import wraps from itertools import chain from pathlib import PurePosixPath, Path +from enum import StrEnum, auto from urllib.parse import urljoin, urlencode # noqa: F401 import param @@ -43,6 +44,37 @@ _auth_code = None _server = None +class BatchSystem(StrEnum): + PBS = auto() + SLURM = auto() + +SYSTEMS = { + 'carpenter': BatchSystem.PBS, + 'nautilus': BatchSystem.SLURM +} + +COMMANDS = { + BatchSystem.PBS: { + 'status': { + 'command': 'qstat', + 'full': ' -f', + 'username': ' -u', + 'job_id': ' -x', + }, + 'submit': 'qsub', # might want to do these the same way as status to maintain extensibility and cohesion + 'delete': 'qdel', + }, + BatchSystem.SLURM: { + 'status': 'squeue', + 'squeue': { + 'full': ' -l', + 'username': ' -u', + 'job_id': ' -j', + }, + 'submit': 'sbatch', + 'delete': 'scancel', + } +} class Client(param.Parameterized): """Provides a python abstraction for interacting with the UIT API. @@ -101,6 +133,9 @@ def __init__( self.scope = scope self.port = port + self.commands = None + self.options = None + if self.token is not None: self.param.trigger("token") @@ -176,7 +211,7 @@ def CENTER(self): def headers(self): if self._headers is None: self._headers = {"x-uit-auth-token": self.token} if self.token else None - return self._headers + return self._headers @param.depends("token", watch=True) def get_token_dependent_info(self): @@ -307,6 +342,7 @@ def prepare_connect( self._username = self._userinfo["SYSTEMS"][self._system.upper()]["USERNAME"] self._uit_url = self._uit_urls[login_node] self.connected = True + self.commands = COMMANDS[SYSTEMS[self.system]] return login_node, retry_on_failure @@ -735,21 +771,25 @@ def status( ): username = username if username is not None else self.username - cmd = "qstat" + # cmd will either be "qstat" or "squeue" + cmd = self.commands['status']['command'] + if full: - cmd += " -f" + cmd += self.commands['status']['full'] elif username: - cmd += f" -u {username}" + cmd += self.commands['status']['username'] + cmd += f' {self.username}' if job_id: if isinstance(job_id, (tuple, list)): job_id = " ".join([j.split(".")[0] for j in job_id]) - cmd += f" -x {job_id}" - result = self.call(cmd) - return self._process_status_result( - result, parse=parse, full=full, as_df=as_df - ) + cmd += self.commands['status']['job_id'] + cmd += job_id + result = self.call(cmd) + return self._process_status_result( + result, parse=parse, full=full, as_df=as_df + ) else: # If no jobs are specified then result = self.call(cmd) @@ -759,7 +799,7 @@ def status( if not with_historic: return result1 else: - cmd += " -x" + cmd += self.commands['status']['job_id'] result = self.call(cmd) result2 = self._process_status_result( result, parse=parse, full=full, as_df=as_df @@ -817,7 +857,7 @@ def submit( # Submit the script using call() with qsub command try: - job_id = self.call(f"qsub {remote_name}", working_dir=working_dir) + job_id = self.call(f"{self.commands['submit']} {remote_name}", working_dir=working_dir) except RuntimeError as e: raise RuntimeError( "An exception occurred while submitting job script: {}".format(str(e)) From 4d7393ba6abd8d3b1581f073ef4d555f972c25c3 Mon Sep 17 00:00:00 2001 From: "ERDC\\RDITLSCW" Date: Wed, 12 Mar 2025 10:24:29 -0500 Subject: [PATCH 02/13] full support for slurm --- uit/uit.py | 153 ++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 111 insertions(+), 42 deletions(-) diff --git a/uit/uit.py b/uit/uit.py index d934fc1..9c77bcd 100644 --- a/uit/uit.py +++ b/uit/uit.py @@ -12,6 +12,7 @@ from itertools import chain from pathlib import PurePosixPath, Path from enum import StrEnum, auto +from io import StringIO from urllib.parse import urljoin, urlencode # noqa: F401 import param @@ -61,15 +62,14 @@ class BatchSystem(StrEnum): 'username': ' -u', 'job_id': ' -x', }, - 'submit': 'qsub', # might want to do these the same way as status to maintain extensibility and cohesion + 'submit': 'qsub', 'delete': 'qdel', }, BatchSystem.SLURM: { - 'status': 'squeue', - 'squeue': { - 'full': ' -l', + 'status': { + 'command': 'squeue -l', 'username': ' -u', - 'job_id': ' -j', + 'job_id': ' -j ', }, 'submit': 'sbatch', 'delete': 'scancel', @@ -133,8 +133,8 @@ def __init__( self.scope = scope self.port = port + self.batch_system = None self.commands = None - self.options = None if self.token is not None: self.param.trigger("token") @@ -342,7 +342,8 @@ def prepare_connect( self._username = self._userinfo["SYSTEMS"][self._system.upper()]["USERNAME"] self._uit_url = self._uit_urls[login_node] self.connected = True - self.commands = COMMANDS[SYSTEMS[self.system]] + self.batch_system = SYSTEMS[self.system] + self.commands = COMMANDS[self.batch_system] return login_node, retry_on_failure @@ -756,7 +757,7 @@ def show_usage(self, parse=True, as_df=False): if not parse: return result - return self._parse_hpc_output(result, as_df) + return self._parse_hpc_output(result, as_df, batch_system=self.batch_system) @_ensure_connected @robust() @@ -774,12 +775,16 @@ def status( # cmd will either be "qstat" or "squeue" cmd = self.commands['status']['command'] - - if full: - cmd += self.commands['status']['full'] - elif username: - cmd += self.commands['status']['username'] - cmd += f' {self.username}' + if self.batch_system == BatchSystem.SLURM: + if username: + cmd += self.commands['status']['username'] + cmd += f' {username}' + else: # Assume PBS + if full: + cmd += self.commands['status']['full'] + elif username: + cmd += self.commands['status']['username'] + cmd += f' {username}' if job_id: if isinstance(job_id, (tuple, list)): @@ -804,8 +809,9 @@ def status( result2 = self._process_status_result( result, parse=parse, full=full, as_df=as_df ) - - if not parse: + if self.batch_system == BatchSystem.SLURM: + return pd.concat((result1, result2)) + elif not parse: return result1, result2 elif as_df: return pd.concat((result1, result2)) @@ -871,11 +877,14 @@ def submit( @_ensure_connected def get_queues(self, update_cache=False): if self._queues is None or update_cache: - self._queues = self._process_get_queues_output(self.call("qstat -Q")) + if self.batch_system == BatchSystem.SLURM: + self._queues = self._process_get_queues_output(self.call("sacctmgr show qos format=Name%20")) + else: + self._queues = self._process_get_queues_output(self.call("qstat -Q")) return self._queues def _process_get_queues_output(self, output): - standard_queues = [] if self.system == "jim" else QUEUES + standard_queues = QUEUES other_queues = set([i.split()[0] for i in output.splitlines()][2:]) - set( standard_queues ) @@ -884,10 +893,43 @@ def _process_get_queues_output(self, output): @_ensure_connected def get_raw_queue_stats(self): - return json.loads(self.call("qstat -Q -f -F json"))["Queue"] + if self.batch_system == BatchSystem.SLURM: + output = "id name max_walltime max_jobs max_nodes" + for queue in json.loads(self.call('sacctmgr show qos --json'))["QOS"]: + max_walltime = str(queue['limits']['max']['wall_clock']['per']['job']['number']) + max_jobs = str(queue['limits']['max']['jobs']['active_jobs']['per']['user']['number']) + max_nodes = -1 + for max_tres in queue['limits']['max']['tres']['per']['job']: + if max_tres['type'] == "node": + max_nodes = max_tres['count'] + output += f"\n{queue['id']} {queue['name']} {max_walltime} {max_jobs} {max_nodes}" + return self._parse_slurm_output(output) + + else: + return json.loads(self.call("qstat -Q -f -F json"))["Queue"] @_ensure_connected def get_node_maxes(self, queues, queues_stats): + if self.batch_system == BatchSystem.SLURM: + return self._slurm_node_maxes(queues, queues_stats) + + else: + return self._pbs_node_maxes(queues, queues_stats) + + def _slurm_node_maxes(self, queues, queues_stats): + ncpus_maxes = dict() + + for q in queues: + max_nodes = str(queues_stats.loc[queues_stats['name'] == f'{q.lower()}', 'max_nodes'].iloc[0]) + ncpus_maxes[q] = ( + max_nodes + if max_nodes != "-1" + else "Not Found" + ) + + return ncpus_maxes + + def _pbs_node_maxes(self, queues, queues_stats): q_sts = {q: queues_stats[q] for q in queues if q in queues_stats.keys()} ncpus_maxes = dict() @@ -902,6 +944,23 @@ def get_node_maxes(self, queues, queues_stats): @_ensure_connected def get_wall_time_maxes(self, queues, queues_stats): + if self.batch_system == BatchSystem.SLURM: + return self._slurm_wall_time_maxes(queues, queues_stats) + else: + return self._pbs_wall_time_maxes(queues, queues_stats) + + def _slurm_wall_time_maxes(self, queues, queues_stats): + wall_time_maxes = dict() + + for q in queues: + max_walltimes = str(queues_stats.loc[queues_stats['name'] == f'{q.lower()}', 'max_walltime'].iloc[0]) + wall_time_maxes[q] = ( + max_walltimes + ) + + return wall_time_maxes + + def _pbs_wall_time_maxes(self, queues, queues_stats): q_sts = {q: queues_stats[q] for q in queues if q in queues_stats.keys()} wall_time_maxes = dict() @@ -945,30 +1004,39 @@ def _process_status_result(self, result, parse, full, as_df): if not parse: return result - if full: - result = self._parse_full_status(result) - if as_df: - return self._as_df(result).T - else: - return result - - columns = ( - "job_id", - "username", - "queue", - "jobname", - "session_id", - "nds", - "tsk", - "requested_memory", - "requested_time", - "status", - "elapsed_time", - ) + if self.batch_system == BatchSystem.SLURM: + # Trimming the top of result so that read_tables works properly + result = result.split('\n', 1)[1] + return self._parse_slurm_output( + result=result + ) + else: + if full: + result = self._parse_full_status(result) + if as_df: + return self._as_df(result).T + else: + return result - return self._parse_hpc_output( - result, as_df, columns=columns, delimiter_char="-" - ) + columns = ( + "job_id", + "username", + "queue", + "jobname", + "session_id", + "nds", + "tsk", + "requested_memory", + "requested_time", + "status", + "elapsed_time", + ) + + return self._parse_hpc_output(result, as_df, columns=columns, delimiter_char="-") + + @staticmethod + def _parse_slurm_output(result): + return pd.read_table(StringIO(result), delim_whitespace=True) @staticmethod def _parse_full_status(status_str): @@ -1037,6 +1105,7 @@ def _parse_hpc_output( delimiter_char="=", num_header_lines=3, ): + if output: delimiter = delimiter or cls._parse_hpc_delimiter( output, delimiter_char=delimiter_char From 6e1d56bd951b53f9c51a9d8bbf63fd5b7223f99f Mon Sep 17 00:00:00 2001 From: "ERDC\\RDITLSCW" Date: Mon, 17 Mar 2025 09:45:56 -0500 Subject: [PATCH 03/13] fixed formatting --- uit/uit.py | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/uit/uit.py b/uit/uit.py index e326c69..2cb8fee 100644 --- a/uit/uit.py +++ b/uit/uit.py @@ -46,6 +46,9 @@ _server = None class BatchSystem(StrEnum): + + """Provides an interface to check what job system is being run.""" + PBS = auto() SLURM = auto() @@ -62,7 +65,7 @@ class BatchSystem(StrEnum): 'username': ' -u', 'job_id': ' -x', }, - 'submit': 'qsub', + 'submit': 'qsub', 'delete': 'qdel', }, BatchSystem.SLURM: { @@ -77,6 +80,7 @@ class BatchSystem(StrEnum): } class Client(param.Parameterized): + """Provides a python abstraction for interacting with the UIT API. Attributes: @@ -205,7 +209,7 @@ def CENTER(self): def headers(self): if self._headers is None: self._headers = {"x-uit-auth-token": self.token} if self.token else None - return self._headers + return self._headers @param.depends("token", watch=True) def get_token_dependent_info(self): @@ -739,14 +743,14 @@ def status( cmd = self.commands['status']['command'] if self.batch_system == BatchSystem.SLURM: - if username: - cmd += self.commands['status']['username'] + if username: + cmd += self.commands['status']['username'] cmd += f' {username}' - else: # Assume PBS + else: if full: cmd += self.commands['status']['full'] elif username: - cmd += self.commands['status']['username'] + cmd += self.commands['status']['username'] cmd += f' {username}' if job_id: @@ -754,8 +758,8 @@ def status( job_id = " ".join([j.split(".")[0] for j in job_id]) cmd += self.commands['status']['job_id'] cmd += job_id - result = self.call(cmd) - return self._process_status_result(result, parse=parse, full=full, as_df=as_df) + result = self.call(cmd) + return self._process_status_result(result, parse=parse, full=full, as_df=as_df) else: # If no jobs are specified then result = self.call(cmd) @@ -851,7 +855,7 @@ def get_raw_queue_stats(self): max_nodes = max_tres['count'] output += f"\n{queue['id']} {queue['name']} {max_walltime} {max_jobs} {max_nodes}" return self._parse_slurm_output(output) - + else: return json.loads(self.call("qstat -Q -f -F json"))["Queue"] @@ -873,7 +877,7 @@ def _slurm_node_maxes(self, queues, queues_stats): if max_nodes != "-1" else "Not Found" ) - + return ncpus_maxes def _pbs_node_maxes(self, queues, queues_stats): @@ -904,7 +908,7 @@ def _slurm_wall_time_maxes(self, queues, queues_stats): wall_time_maxes[q] = ( max_walltimes ) - + return wall_time_maxes def _pbs_wall_time_maxes(self, queues, queues_stats): From 84c29f0c8214b89f7b124aad0d835c9d26e09182 Mon Sep 17 00:00:00 2001 From: "ERDC\\RDITLSCW" Date: Mon, 17 Mar 2025 09:58:49 -0500 Subject: [PATCH 04/13] fixed formatting --- uit/uit.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/uit/uit.py b/uit/uit.py index 2cb8fee..0e721fb 100644 --- a/uit/uit.py +++ b/uit/uit.py @@ -45,13 +45,13 @@ _auth_code = None _server = None -class BatchSystem(StrEnum): - - """Provides an interface to check what job system is being run.""" +class BatchSystem(StrEnum): + PBS = auto() SLURM = auto() + SYSTEMS = { 'carpenter': BatchSystem.PBS, 'nautilus': BatchSystem.SLURM @@ -79,6 +79,7 @@ class BatchSystem(StrEnum): } } + class Client(param.Parameterized): """Provides a python abstraction for interacting with the UIT API. From 306e3e88494bf8ef7dacfcd4a0279637f821a970 Mon Sep 17 00:00:00 2001 From: "ERDC\\RDITLSCW" Date: Mon, 17 Mar 2025 10:01:43 -0500 Subject: [PATCH 05/13] fixed formatting --- uit/uit.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/uit/uit.py b/uit/uit.py index 0e721fb..fde2b6e 100644 --- a/uit/uit.py +++ b/uit/uit.py @@ -47,7 +47,7 @@ class BatchSystem(StrEnum): - + PBS = auto() SLURM = auto() From b733f65077d60fcf48c259c634bc24fb3bc0033f Mon Sep 17 00:00:00 2001 From: "ERDC\\RDITLSCW" Date: Mon, 17 Mar 2025 10:15:41 -0500 Subject: [PATCH 06/13] updated formatting --- uit/uit.py | 82 +++++++++++++++++++++++------------------------------- 1 file changed, 35 insertions(+), 47 deletions(-) diff --git a/uit/uit.py b/uit/uit.py index fde2b6e..538062f 100644 --- a/uit/uit.py +++ b/uit/uit.py @@ -52,36 +52,32 @@ class BatchSystem(StrEnum): SLURM = auto() -SYSTEMS = { - 'carpenter': BatchSystem.PBS, - 'nautilus': BatchSystem.SLURM -} +SYSTEMS = {"carpenter": BatchSystem.PBS, "nautilus": BatchSystem.SLURM} COMMANDS = { BatchSystem.PBS: { - 'status': { - 'command': 'qstat', - 'full': ' -f', - 'username': ' -u', - 'job_id': ' -x', + "status": { + "command": "qstat", + "full": " -f", + "username": " -u", + "job_id": " -x", }, - 'submit': 'qsub', - 'delete': 'qdel', + "submit": "qsub", + "delete": "qdel", }, BatchSystem.SLURM: { - 'status': { - 'command': 'squeue -l', - 'username': ' -u', - 'job_id': ' -j ', + "status": { + "command": "squeue -l", + "username": " -u", + "job_id": " -j ", }, - 'submit': 'sbatch', - 'delete': 'scancel', - } + "submit": "sbatch", + "delete": "scancel", + }, } class Client(param.Parameterized): - """Provides a python abstraction for interacting with the UIT API. Attributes: @@ -741,23 +737,23 @@ def status( username = username if username is not None else self.username # cmd will either be "qstat" or "squeue" - cmd = self.commands['status']['command'] + cmd = self.commands["status"]["command"] if self.batch_system == BatchSystem.SLURM: if username: - cmd += self.commands['status']['username'] - cmd += f' {username}' + cmd += self.commands["status"]["username"] + cmd += f" {username}" else: if full: - cmd += self.commands['status']['full'] + cmd += self.commands["status"]["full"] elif username: - cmd += self.commands['status']['username'] - cmd += f' {username}' + cmd += self.commands["status"]["username"] + cmd += f" {username}" if job_id: if isinstance(job_id, (tuple, list)): job_id = " ".join([j.split(".")[0] for j in job_id]) - cmd += self.commands['status']['job_id'] + cmd += self.commands["status"]["job_id"] cmd += job_id result = self.call(cmd) return self._process_status_result(result, parse=parse, full=full, as_df=as_df) @@ -768,7 +764,7 @@ def status( if not with_historic: return result1 else: - cmd += self.commands['status']['job_id'] + cmd += self.commands["status"]["job_id"] result = self.call(cmd) result2 = self._process_status_result(result, parse=parse, full=full, as_df=as_df) if self.batch_system == BatchSystem.SLURM: @@ -847,13 +843,13 @@ def _process_get_queues_output(self, output): def get_raw_queue_stats(self): if self.batch_system == BatchSystem.SLURM: output = "id name max_walltime max_jobs max_nodes" - for queue in json.loads(self.call('sacctmgr show qos --json'))["QOS"]: - max_walltime = str(queue['limits']['max']['wall_clock']['per']['job']['number']) - max_jobs = str(queue['limits']['max']['jobs']['active_jobs']['per']['user']['number']) + for queue in json.loads(self.call("sacctmgr show qos --json"))["QOS"]: + max_walltime = str(queue["limits"]["max"]["wall_clock"]["per"]["job"]["number"]) + max_jobs = str(queue["limits"]["max"]["jobs"]["active_jobs"]["per"]["user"]["number"]) max_nodes = -1 - for max_tres in queue['limits']['max']['tres']['per']['job']: - if max_tres['type'] == "node": - max_nodes = max_tres['count'] + for max_tres in queue["limits"]["max"]["tres"]["per"]["job"]: + if max_tres["type"] == "node": + max_nodes = max_tres["count"] output += f"\n{queue['id']} {queue['name']} {max_walltime} {max_jobs} {max_nodes}" return self._parse_slurm_output(output) @@ -872,12 +868,8 @@ def _slurm_node_maxes(self, queues, queues_stats): ncpus_maxes = dict() for q in queues: - max_nodes = str(queues_stats.loc[queues_stats['name'] == f'{q.lower()}', 'max_nodes'].iloc[0]) - ncpus_maxes[q] = ( - max_nodes - if max_nodes != "-1" - else "Not Found" - ) + max_nodes = str(queues_stats.loc[queues_stats["name"] == f"{q.lower()}", "max_nodes"].iloc[0]) + ncpus_maxes[q] = max_nodes if max_nodes != "-1" else "Not Found" return ncpus_maxes @@ -905,10 +897,8 @@ def _slurm_wall_time_maxes(self, queues, queues_stats): wall_time_maxes = dict() for q in queues: - max_walltimes = str(queues_stats.loc[queues_stats['name'] == f'{q.lower()}', 'max_walltime'].iloc[0]) - wall_time_maxes[q] = ( - max_walltimes - ) + max_walltimes = str(queues_stats.loc[queues_stats["name"] == f"{q.lower()}", "max_walltime"].iloc[0]) + wall_time_maxes[q] = max_walltimes return wall_time_maxes @@ -953,10 +943,8 @@ def _process_status_result(self, result, parse, full, as_df): if self.batch_system == BatchSystem.SLURM: # Trimming the top of result so that read_tables works properly - result = result.split('\n', 1)[1] - return self._parse_slurm_output( - result=result - ) + result = result.split("\n", 1)[1] + return self._parse_slurm_output(result=result) else: if full: result = self._parse_full_status(result) From 497ca4907cee8eed36683269f32fa6667098aed2 Mon Sep 17 00:00:00 2001 From: "ERDC\\RDITLSCW" Date: Wed, 19 Mar 2025 15:11:43 -0500 Subject: [PATCH 07/13] addressed feedback --- uit/node_types.csv | 10 +++++----- uit/uit.py | 43 ++++++++++++++++--------------------------- 2 files changed, 21 insertions(+), 32 deletions(-) diff --git a/uit/node_types.csv b/uit/node_types.csv index 9d39b46..7c11ab9 100644 --- a/uit/node_types.csv +++ b/uit/node_types.csv @@ -1,5 +1,5 @@ -system,compute,gpu,bigmem,transfer,mla,highclock -nautilus,128,128,128,1,128,32 -narwhal,128,128,128,1 -warhawk,128,128,128,1,128 -carpenter,192,128,192,1 \ No newline at end of file +system,scheduler,compute,gpu,bigmem,transfer,mla,highclock +nautilus,slurm,128,128,128,1,128,32 +narwhal,pbs,128,128,128,1 +warhawk,pbs,128,128,128,1,128 +carpenter,pbs,192,128,192,1 \ No newline at end of file diff --git a/uit/uit.py b/uit/uit.py index 538062f..fb9cf9a 100644 --- a/uit/uit.py +++ b/uit/uit.py @@ -21,7 +21,7 @@ from werkzeug.serving import make_server from .config import parse_config, DEFAULT_CA_FILE, DEFAULT_CONFIG -from .pbs_script import PbsScript +from .pbs_script import PbsScript, NODE_TYPES from .util import robust, HpcEnv from .exceptions import UITError, MaxRetriesError @@ -45,17 +45,8 @@ _auth_code = None _server = None - -class BatchSystem(StrEnum): - - PBS = auto() - SLURM = auto() - - -SYSTEMS = {"carpenter": BatchSystem.PBS, "nautilus": BatchSystem.SLURM} - COMMANDS = { - BatchSystem.PBS: { + "pbs": { "status": { "command": "qstat", "full": " -f", @@ -64,8 +55,9 @@ class BatchSystem(StrEnum): }, "submit": "qsub", "delete": "qdel", + "list_queues": "qstat -Q", }, - BatchSystem.SLURM: { + "slurm": { "status": { "command": "squeue -l", "username": " -u", @@ -73,6 +65,7 @@ class BatchSystem(StrEnum): }, "submit": "sbatch", "delete": "scancel", + "list_queues": "sacctmgr show qos format=Name%20", }, } @@ -134,7 +127,7 @@ def __init__( self.scope = scope self.port = port - self.batch_system = None + self.scheduler = None self.commands = None if self.token is not None: @@ -323,8 +316,8 @@ def prepare_connect(self, system, login_node, exclude_login_nodes, retry_on_fail self._username = self._userinfo["SYSTEMS"][self._system.upper()]["USERNAME"] self._uit_url = self._uit_urls[login_node] self.connected = True - self.batch_system = SYSTEMS[self.system] - self.commands = COMMANDS[self.batch_system] + self.scheduler = NODE_TYPES[f"{self.system}"]["scheduler"] + self.commands = COMMANDS[self.scheduler] return login_node, retry_on_failure @@ -721,7 +714,7 @@ def show_usage(self, parse=True, as_df=False): if not parse: return result - return self._parse_hpc_output(result, as_df, batch_system=self.batch_system) + return self._parse_hpc_output(result, as_df, scheduler=self.scheduler) @_ensure_connected @robust() @@ -739,7 +732,7 @@ def status( # cmd will either be "qstat" or "squeue" cmd = self.commands["status"]["command"] - if self.batch_system == BatchSystem.SLURM: + if self.scheduler == "slurm": if username: cmd += self.commands["status"]["username"] cmd += f" {username}" @@ -767,7 +760,7 @@ def status( cmd += self.commands["status"]["job_id"] result = self.call(cmd) result2 = self._process_status_result(result, parse=parse, full=full, as_df=as_df) - if self.batch_system == BatchSystem.SLURM: + if self.scheduler == "slurm": return pd.concat((result1, result2)) elif not parse: return result1, result2 @@ -827,10 +820,7 @@ def submit(self, pbs_script, working_dir=None, remote_name="run.pbs", local_temp @_ensure_connected def get_queues(self, update_cache=False): if self._queues is None or update_cache: - if self.batch_system == BatchSystem.SLURM: - self._queues = self._process_get_queues_output(self.call("sacctmgr show qos format=Name%20")) - else: - self._queues = self._process_get_queues_output(self.call("qstat -Q")) + self._queues = self._process_get_queues_output(self.call(self.commands["list_queues"])) return self._queues def _process_get_queues_output(self, output): @@ -841,7 +831,7 @@ def _process_get_queues_output(self, output): @_ensure_connected def get_raw_queue_stats(self): - if self.batch_system == BatchSystem.SLURM: + if self.scheduler == "slurm": output = "id name max_walltime max_jobs max_nodes" for queue in json.loads(self.call("sacctmgr show qos --json"))["QOS"]: max_walltime = str(queue["limits"]["max"]["wall_clock"]["per"]["job"]["number"]) @@ -858,7 +848,7 @@ def get_raw_queue_stats(self): @_ensure_connected def get_node_maxes(self, queues, queues_stats): - if self.batch_system == BatchSystem.SLURM: + if self.scheduler == "slurm": return self._slurm_node_maxes(queues, queues_stats) else: @@ -888,7 +878,7 @@ def _pbs_node_maxes(self, queues, queues_stats): @_ensure_connected def get_wall_time_maxes(self, queues, queues_stats): - if self.batch_system == BatchSystem.SLURM: + if self.scheduler == "slurm": return self._slurm_wall_time_maxes(queues, queues_stats) else: return self._pbs_wall_time_maxes(queues, queues_stats) @@ -941,8 +931,7 @@ def _process_status_result(self, result, parse, full, as_df): if not parse: return result - if self.batch_system == BatchSystem.SLURM: - # Trimming the top of result so that read_tables works properly + if self.scheduler == "slurm": result = result.split("\n", 1)[1] return self._parse_slurm_output(result=result) else: From f17899d9246e3f242e248adb14ad148a5e46a085 Mon Sep 17 00:00:00 2001 From: Mark Lugar <14322382+araglu@users.noreply.github.com> Date: Wed, 19 Mar 2025 17:12:00 -0500 Subject: [PATCH 08/13] Allow get_default to optionally set a default value --- uit/gui_tools/submit.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/uit/gui_tools/submit.py b/uit/gui_tools/submit.py index 66cae46..0ae026e 100644 --- a/uit/gui_tools/submit.py +++ b/uit/gui_tools/submit.py @@ -91,8 +91,13 @@ def set_file_browser(self): self.workdir.file_browser = create_file_browser(self.uit_client, patterns=[]) @staticmethod - def get_default(value, objects): - return value if value in objects else objects[0] + def get_default(value, objects, default=None): + """Verify that value exists in the objects list, otherwise return a default or the first item in the list""" + if value in objects: + return value + if default in objects: + return default + return objects[0] @param.depends("uit_client", watch=True) async def update_hpc_connection_dependent_defaults(self): @@ -107,7 +112,7 @@ async def update_hpc_connection_dependent_defaults(self): self.hpc_subproject = self.get_default(self.hpc_subproject, subprojects) self.workdir.file_path = self.uit_client.WORKDIR.as_posix() self.param.node_type.objects = list(NODE_TYPES[self.uit_client.system].keys()) - self.node_type = self.get_default(self.node_type, self.param.node_type.objects) + self.node_type = self.get_default(self.node_type, self.param.node_type.objects, default="compute") self.param.queue.objects = await self.await_if_async(self.uit_client.get_queues()) self.queue = self.get_default(self.queue, self.param.queue.objects) self.node_maxes = await self.await_if_async( From c053ae0df3edf2ed7425b36f288b27663eff02b6 Mon Sep 17 00:00:00 2001 From: "ERDC\\RDITLSCW" Date: Fri, 21 Mar 2025 13:47:09 -0500 Subject: [PATCH 09/13] async support --- uit/async_client.py | 75 +++++++++++++++++++++++++++++++++++++-------- uit/node_types.csv | 3 +- 2 files changed, 64 insertions(+), 14 deletions(-) diff --git a/uit/async_client.py b/uit/async_client.py index da74389..e49425a 100644 --- a/uit/async_client.py +++ b/uit/async_client.py @@ -29,12 +29,35 @@ ALL_OFF, ) from .util import robust, AsyncHpcEnv -from .pbs_script import PbsScript +from .pbs_script import PbsScript, NODE_TYPES from .exceptions import UITError, MaxRetriesError logger = logging.getLogger(__name__) _ensure_connected = Client._ensure_connected +COMMANDS = { + "pbs": { + "status": { + "command": "qstat", + "full": " -f", + "username": " -u", + "job_id": " -x", + }, + "submit": "qsub", + "delete": "qdel", + "list_queues": "qstat -Q", + }, + "slurm": { + "status": { + "command": "squeue -l", + "username": " -u", + "job_id": " -j ", + }, + "submit": "sbatch", + "delete": "scancel", + "list_queues": "sacctmgr show qos format=Name%20", + }, +} class AsyncClient(Client): """Provides a python abstraction for interacting with the UIT API. @@ -77,6 +100,8 @@ def __init__( delay_token=True, ) self.env = AsyncHpcEnv(self) + self.scheduler=None, + self.commands=None, self._session = None if async_init: self.param.trigger("_async_init") @@ -227,6 +252,9 @@ async def get_userinfo(self): ] self._uit_urls = {k: v for _list in self._uit_urls for d in _list for k, v in d.items()} # noqa: E741 + self.scheduler = NODE_TYPES[self.system]["scheduler"] + self.commands = COMMANDS[self.scheduler] + @_ensure_connected @robust() async def call( @@ -465,7 +493,7 @@ async def show_usage(self, parse=True, as_df=False): if not parse: return result - return self._parse_hpc_output(result, as_df) + return self._parse_hpc_output(result, as_df, scheduler=self.scheduler) @_ensure_connected @robust() @@ -480,17 +508,23 @@ async def status( ): username = username if username is not None else self.username - cmd = "qstat" + cmd = self.commands["status"]["command"] - if full: - cmd += " -f" - elif username: - cmd += f" -u {username}" + if self.scheduler == "slurm": + if username: + cmd += self.commands["status"]["username"] + cmd += f" {username}" + else: + if full: + cmd += self.commands["status"]["full"] + elif username: + cmd += f" {username}" if job_id: if isinstance(job_id, (tuple, list)): job_id = " ".join([j.split(".")[0] for j in job_id]) - cmd += f" -x {job_id}" + cmd += self.commands["status"]["job_id"] + cmd += job_id result = await self.call(cmd) return self._process_status_result(result, parse=parse, full=full, as_df=as_df) else: @@ -500,11 +534,13 @@ async def status( if not with_historic: return result1 else: - cmd += " -x" + cmd += self.commands["status"]["job_id"] result = await self.call(cmd) result2 = self._process_status_result(result, parse=parse, full=full, as_df=as_df) - - if not parse: + + if self.scheduler == "slurm": + return pd.concat((result1, result2)) + elif not parse: return result1, result2 elif as_df: return pd.concat((result1, result2)) @@ -562,12 +598,25 @@ async def submit(self, pbs_script, working_dir=None, remote_name="run.pbs", loca @_ensure_connected async def get_queues(self, update_cache=False): if self._queues is None or update_cache: - self._queues = self._process_get_queues_output(await self.call("qstat -Q")) + self._queues = self._process_get_queues_output(await self.call(self.commands["list_queues"])) return self._queues @_ensure_connected async def get_raw_queue_stats(self): - return json.loads(await self.call("qstat -Q -f -F json"))["Queue"] + if self.scheduler == "slurm": + output = "id name max_walltime max_jobs max_nodes" + for queue in json.loads(await self.call("sacctmgr show qos --json"))["QOS"]: + max_walltime = str(queue["limits"]["max"]["wall_clock"]["per"]["job"]["number"]) + max_jobs = str(queue["limits"]["max"]["jobs"]["active_jobs"]["per"]["user"]["number"]) + max_nodes = -1 + for max_tres in queue["limits"]["max"]["tres"]["per"]["job"]: + if max_tres["type"] == "node": + max_nodes = max_tres["count"] + output += f"\n{queue['id']} {queue['name']} {max_walltime} {max_jobs} {max_nodes}" + return self._parse_slurm_output(output) + + else: + return json.loads(await self.call("qstat -Q -f -F json"))["Queue"] @_ensure_connected async def get_available_modules(self, flatten=False): diff --git a/uit/node_types.csv b/uit/node_types.csv index 7c11ab9..bf5c5ba 100644 --- a/uit/node_types.csv +++ b/uit/node_types.csv @@ -2,4 +2,5 @@ system,scheduler,compute,gpu,bigmem,transfer,mla,highclock nautilus,slurm,128,128,128,1,128,32 narwhal,pbs,128,128,128,1 warhawk,pbs,128,128,128,1,128 -carpenter,pbs,192,128,192,1 \ No newline at end of file +carpenter,pbs,192,128,192,1 +raider,slurm,128,128,128,1,128,32 \ No newline at end of file From 30b9ba999f7ace2e6cfc86ad9d67b3f9bf3494f9 Mon Sep 17 00:00:00 2001 From: "ERDC\\RDITLSCW" Date: Fri, 21 Mar 2025 14:02:04 -0500 Subject: [PATCH 10/13] styling fixes --- uit/async_client.py | 9 +++++---- uit/uit.py | 3 +-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/uit/async_client.py b/uit/async_client.py index e49425a..6a29be3 100644 --- a/uit/async_client.py +++ b/uit/async_client.py @@ -59,6 +59,7 @@ }, } + class AsyncClient(Client): """Provides a python abstraction for interacting with the UIT API. @@ -100,8 +101,8 @@ def __init__( delay_token=True, ) self.env = AsyncHpcEnv(self) - self.scheduler=None, - self.commands=None, + self.scheduler = (None,) + self.commands = (None,) self._session = None if async_init: self.param.trigger("_async_init") @@ -537,9 +538,9 @@ async def status( cmd += self.commands["status"]["job_id"] result = await self.call(cmd) result2 = self._process_status_result(result, parse=parse, full=full, as_df=as_df) - + if self.scheduler == "slurm": - return pd.concat((result1, result2)) + return pd.concat((result1, result2)) elif not parse: return result1, result2 elif as_df: diff --git a/uit/uit.py b/uit/uit.py index fb9cf9a..e1b7f0b 100644 --- a/uit/uit.py +++ b/uit/uit.py @@ -11,7 +11,6 @@ from functools import wraps from itertools import chain from pathlib import PurePosixPath, Path -from enum import StrEnum, auto from io import StringIO from urllib.parse import urljoin, urlencode # noqa: F401 @@ -316,7 +315,7 @@ def prepare_connect(self, system, login_node, exclude_login_nodes, retry_on_fail self._username = self._userinfo["SYSTEMS"][self._system.upper()]["USERNAME"] self._uit_url = self._uit_urls[login_node] self.connected = True - self.scheduler = NODE_TYPES[f"{self.system}"]["scheduler"] + self.scheduler = NODE_TYPES[self.system]["scheduler"] self.commands = COMMANDS[self.scheduler] return login_node, retry_on_failure From 3a61a94336b1257408c36e38ba978b62519af828 Mon Sep 17 00:00:00 2001 From: "ERDC\\RDITLSCW" Date: Fri, 21 Mar 2025 18:03:11 -0500 Subject: [PATCH 11/13] addressed feedback --- uit/async_client.py | 6 +++--- uit/uit.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/uit/async_client.py b/uit/async_client.py index 6a29be3..68da94e 100644 --- a/uit/async_client.py +++ b/uit/async_client.py @@ -101,8 +101,8 @@ def __init__( delay_token=True, ) self.env = AsyncHpcEnv(self) - self.scheduler = (None,) - self.commands = (None,) + self.scheduler = None + self.commands = None self._session = None if async_init: self.param.trigger("_async_init") @@ -494,7 +494,7 @@ async def show_usage(self, parse=True, as_df=False): if not parse: return result - return self._parse_hpc_output(result, as_df, scheduler=self.scheduler) + return self._parse_hpc_output(result, as_df) @_ensure_connected @robust() diff --git a/uit/uit.py b/uit/uit.py index e1b7f0b..be1b86f 100644 --- a/uit/uit.py +++ b/uit/uit.py @@ -713,7 +713,7 @@ def show_usage(self, parse=True, as_df=False): if not parse: return result - return self._parse_hpc_output(result, as_df, scheduler=self.scheduler) + return self._parse_hpc_output(result, as_df) @_ensure_connected @robust() From db51d17e7270823f25721f15f1f7aebb56319dcf Mon Sep 17 00:00:00 2001 From: "ERDC\\RDITLSCW" Date: Wed, 23 Apr 2025 14:30:13 -0500 Subject: [PATCH 12/13] addressed feedback --- uit/async_client.py | 5 +---- uit/pbs_script.py | 12 ++++++++---- uit/uit.py | 5 +++-- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/uit/async_client.py b/uit/async_client.py index 68da94e..6966223 100644 --- a/uit/async_client.py +++ b/uit/async_client.py @@ -253,9 +253,6 @@ async def get_userinfo(self): ] self._uit_urls = {k: v for _list in self._uit_urls for d in _list for k, v in d.items()} # noqa: E741 - self.scheduler = NODE_TYPES[self.system]["scheduler"] - self.commands = COMMANDS[self.scheduler] - @_ensure_connected @robust() async def call( @@ -587,7 +584,7 @@ async def submit(self, pbs_script, working_dir=None, remote_name="run.pbs", loca # Submit the script using call() with qsub command try: - job_id = await self.call(f"qsub {remote_name}", working_dir=working_dir) + job_id = await self.call(f"{self.commands['submit']} {remote_name}", working_dir=working_dir) except RuntimeError as e: raise RuntimeError("An exception occurred while submitting job script: {}".format(str(e))) diff --git a/uit/pbs_script.py b/uit/pbs_script.py index b689cef..da36717 100644 --- a/uit/pbs_script.py +++ b/uit/pbs_script.py @@ -153,11 +153,15 @@ def parse_time(time_str): return None @staticmethod - def format_time(date_time_obj): - hours = date_time_obj.days * 24 + date_time_obj.seconds // 3600 - minutes = date_time_obj.seconds % 3600 // 60 - seconds = date_time_obj.seconds % 3600 % 60 + def format_time(time_delta_obj): + hours = time_delta_obj.days * 24 + time_delta_obj.seconds // 3600 + minutes = time_delta_obj.seconds % 3600 // 60 + seconds = time_delta_obj.seconds % 3600 % 60 return f"{hours}:{minutes:02}:{seconds:02}" + + @staticmethod + def parse_minutes(minutes): + return PbsScript.format_time(datetime.timedelta(minutes=minutes)) @property def max_time(self): diff --git a/uit/uit.py b/uit/uit.py index be1b86f..b8cd431 100644 --- a/uit/uit.py +++ b/uit/uit.py @@ -833,12 +833,13 @@ def get_raw_queue_stats(self): if self.scheduler == "slurm": output = "id name max_walltime max_jobs max_nodes" for queue in json.loads(self.call("sacctmgr show qos --json"))["QOS"]: - max_walltime = str(queue["limits"]["max"]["wall_clock"]["per"]["job"]["number"]) + minutes = queue["limits"]["max"]["wall_clock"]["per"]["job"]["number"] + max_walltime = PbsScript.parse_minutes(minutes) max_jobs = str(queue["limits"]["max"]["jobs"]["active_jobs"]["per"]["user"]["number"]) max_nodes = -1 for max_tres in queue["limits"]["max"]["tres"]["per"]["job"]: if max_tres["type"] == "node": - max_nodes = max_tres["count"] + max_nodes = max_tres["count"] * int(NODE_TYPES[self.system]['compute']) output += f"\n{queue['id']} {queue['name']} {max_walltime} {max_jobs} {max_nodes}" return self._parse_slurm_output(output) From 1452a065071d47f7432e43f2f1a4815b57ef06c1 Mon Sep 17 00:00:00 2001 From: Mark Lugar <14322382+araglu@users.noreply.github.com> Date: Fri, 16 May 2025 15:16:52 -0500 Subject: [PATCH 13/13] Add Raider and Ruth HPCs This also sorts the list by system name. --- uit/node_types.csv | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/uit/node_types.csv b/uit/node_types.csv index bf5c5ba..3feb066 100644 --- a/uit/node_types.csv +++ b/uit/node_types.csv @@ -1,6 +1,7 @@ system,scheduler,compute,gpu,bigmem,transfer,mla,highclock +carpenter,pbs,192,128,192,1,, +narwhal,pbs,128,128,128,1,, nautilus,slurm,128,128,128,1,128,32 -narwhal,pbs,128,128,128,1 -warhawk,pbs,128,128,128,1,128 -carpenter,pbs,192,128,192,1 -raider,slurm,128,128,128,1,128,32 \ No newline at end of file +raider,slurm,128,128,128,1,128,32 +ruth,pbs,192,128,192,1,64,32 +warhawk,pbs,128,128,128,1,128 \ No newline at end of file