Skip to content
74 changes: 62 additions & 12 deletions uit/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,36 @@
ALL_OFF,
)
from .util import robust, AsyncHpcEnv
from .pbs_script import PbsScript
from .pbs_script import PbsScript, NODE_TYPES
from .exceptions import UITError, MaxRetriesError

logger = logging.getLogger(__name__)
_ensure_connected = Client._ensure_connected

COMMANDS = {
"pbs": {
"status": {
"command": "qstat",
"full": " -f",
"username": " -u",
"job_id": " -x",
},
"submit": "qsub",
"delete": "qdel",
"list_queues": "qstat -Q",
},
"slurm": {
"status": {
"command": "squeue -l",
"username": " -u",
"job_id": " -j ",
},
"submit": "sbatch",
"delete": "scancel",
"list_queues": "sacctmgr show qos format=Name%20",
},
}


class AsyncClient(Client):
"""Provides a python abstraction for interacting with the UIT API.
Expand Down Expand Up @@ -77,6 +101,8 @@ def __init__(
delay_token=True,
)
self.env = AsyncHpcEnv(self)
self.scheduler = (None,)
self.commands = (None,)
self._session = None
if async_init:
self.param.trigger("_async_init")
Expand Down Expand Up @@ -227,6 +253,9 @@ async def get_userinfo(self):
]
self._uit_urls = {k: v for _list in self._uit_urls for d in _list for k, v in d.items()} # noqa: E741

self.scheduler = NODE_TYPES[self.system]["scheduler"]
self.commands = COMMANDS[self.scheduler]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the right spot for these two lines? I get an error on them whenever I try to submit a new job or delete that job.

In uit.py, these lines are in prepare_connect() which is called by both sync and async connect(), so async_client.py might not need these lines here.

Copy link
Collaborator Author

@samuelcwilliams samuelcwilliams Mar 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I guess I didn't realize it was calling prepare_connect() from the regular client. I'll make the changes ASAP


@_ensure_connected
@robust()
async def call(
Expand Down Expand Up @@ -465,7 +494,7 @@ async def show_usage(self, parse=True, as_df=False):
if not parse:
return result

return self._parse_hpc_output(result, as_df)
return self._parse_hpc_output(result, as_df, scheduler=self.scheduler)

@_ensure_connected
@robust()
Expand All @@ -480,17 +509,23 @@ async def status(
):
username = username if username is not None else self.username

cmd = "qstat"
cmd = self.commands["status"]["command"]

if full:
cmd += " -f"
elif username:
cmd += f" -u {username}"
if self.scheduler == "slurm":
if username:
cmd += self.commands["status"]["username"]
cmd += f" {username}"
else:
if full:
cmd += self.commands["status"]["full"]
elif username:
cmd += f" {username}"

if job_id:
if isinstance(job_id, (tuple, list)):
job_id = " ".join([j.split(".")[0] for j in job_id])
cmd += f" -x {job_id}"
cmd += self.commands["status"]["job_id"]
cmd += job_id
result = await self.call(cmd)
return self._process_status_result(result, parse=parse, full=full, as_df=as_df)
else:
Expand All @@ -500,11 +535,13 @@ async def status(
if not with_historic:
return result1
else:
cmd += " -x"
cmd += self.commands["status"]["job_id"]
result = await self.call(cmd)
result2 = self._process_status_result(result, parse=parse, full=full, as_df=as_df)

if not parse:
if self.scheduler == "slurm":
return pd.concat((result1, result2))
elif not parse:
return result1, result2
elif as_df:
return pd.concat((result1, result2))
Expand Down Expand Up @@ -562,12 +599,25 @@ async def submit(self, pbs_script, working_dir=None, remote_name="run.pbs", loca
@_ensure_connected
async def get_queues(self, update_cache=False):
if self._queues is None or update_cache:
self._queues = self._process_get_queues_output(await self.call("qstat -Q"))
self._queues = self._process_get_queues_output(await self.call(self.commands["list_queues"]))
return self._queues

@_ensure_connected
async def get_raw_queue_stats(self):
return json.loads(await self.call("qstat -Q -f -F json"))["Queue"]
if self.scheduler == "slurm":
output = "id name max_walltime max_jobs max_nodes"
for queue in json.loads(await self.call("sacctmgr show qos --json"))["QOS"]:
max_walltime = str(queue["limits"]["max"]["wall_clock"]["per"]["job"]["number"])
max_jobs = str(queue["limits"]["max"]["jobs"]["active_jobs"]["per"]["user"]["number"])
max_nodes = -1
for max_tres in queue["limits"]["max"]["tres"]["per"]["job"]:
if max_tres["type"] == "node":
max_nodes = max_tres["count"]
output += f"\n{queue['id']} {queue['name']} {max_walltime} {max_jobs} {max_nodes}"
return self._parse_slurm_output(output)

else:
return json.loads(await self.call("qstat -Q -f -F json"))["Queue"]

@_ensure_connected
async def get_available_modules(self, flatten=False):
Expand Down
11 changes: 8 additions & 3 deletions uit/gui_tools/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,13 @@ def set_file_browser(self):
self.workdir.file_browser = create_file_browser(self.uit_client, patterns=[])

@staticmethod
def get_default(value, objects):
return value if value in objects else objects[0]
def get_default(value, objects, default=None):
"""Verify that value exists in the objects list, otherwise return a default or the first item in the list"""
if value in objects:
return value
if default in objects:
return default
return objects[0]

@param.depends("uit_client", watch=True)
async def update_hpc_connection_dependent_defaults(self):
Expand All @@ -107,7 +112,7 @@ async def update_hpc_connection_dependent_defaults(self):
self.hpc_subproject = self.get_default(self.hpc_subproject, subprojects)
self.workdir.file_path = self.uit_client.WORKDIR.as_posix()
self.param.node_type.objects = list(NODE_TYPES[self.uit_client.system].keys())
self.node_type = self.get_default(self.node_type, self.param.node_type.objects)
self.node_type = self.get_default(self.node_type, self.param.node_type.objects, default="compute")
self.param.queue.objects = await self.await_if_async(self.uit_client.get_queues())
self.queue = self.get_default(self.queue, self.param.queue.objects)
self.node_maxes = await self.await_if_async(
Expand Down
11 changes: 6 additions & 5 deletions uit/node_types.csv
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
system,compute,gpu,bigmem,transfer,mla,highclock
nautilus,128,128,128,1,128,32
narwhal,128,128,128,1
warhawk,128,128,128,1,128
carpenter,192,128,192,1
system,scheduler,compute,gpu,bigmem,transfer,mla,highclock
nautilus,slurm,128,128,128,1,128,32
narwhal,pbs,128,128,128,1
warhawk,pbs,128,128,128,1,128
carpenter,pbs,192,128,192,1
raider,slurm,128,128,128,1,128,32
Loading