Skip to content

Commit 332d3d0

Browse files
sdc50araglu
andauthored
Fix Intermediate File Transfer Support (#43)
* add intermediate files to submit stage enable syncing dirs in addition to files * enable process_results functions * Capture connection errors during status update Occasional "Not Authenticated" errors pop up here. * Apply formatting --------- Co-authored-by: Mark Lugar <14322382+araglu@users.noreply.github.com>
1 parent d2cef08 commit 332d3d0

File tree

3 files changed

+61
-15
lines changed

3 files changed

+61
-15
lines changed

uit_plus_job/models.py

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from django.contrib.auth.models import User
1919
from social_django.utils import load_strategy
2020
from tethys_apps.base.function_extractor import TethysFunctionExtractor
21-
from uit.exceptions import UITError
21+
from uit.exceptions import MaxRetriesError, UITError
2222
from uit import AsyncClient, PbsScript, PbsJob, PbsArrayJob
2323
from uit.pbs_script import PbsDirective
2424
from tethys_compute.models.tethys_job import TethysJob
@@ -368,11 +368,11 @@ def process_intermediate_results_function(self):
368368
@process_intermediate_results_function.setter
369369
def process_intermediate_results_function(self, function):
370370
if isinstance(function, str):
371-
self._process_results_function = function
371+
self._process_intermediate_results_function = function
372372
return
373373
module_path = inspect.getmodule(function).__name__.split(".")
374374
module_path.append(function.__name__)
375-
self._process_results_function = ".".join(module_path)
375+
self._process_intermediate_results_function = ".".join(module_path)
376376

377377
@property
378378
def remote_workspace_id(self):
@@ -547,8 +547,12 @@ async def update_status(self, status=None, *args, **kwargs):
547547

548548
# Update status if status not given and still pending/running
549549
elif update_needed and self.is_time_to_update():
550-
await self._update_status(*args, **kwargs)
551-
self._last_status_update = timezone.now()
550+
try:
551+
await self._update_status(*args, **kwargs)
552+
self._last_status_update = timezone.now()
553+
except (MaxRetriesError, UITError) as e:
554+
log.info(f"Unable to connect to {self.system} for user {self.user} due to the following error: {e}")
555+
return
552556

553557
# Post-process status after update if old status was pending/running
554558
if update_needed:
@@ -617,18 +621,20 @@ def intermediate_transfer_interval_exceeded(self):
617621

618622
async def process_results(self):
619623
"""Process the results using the UIT Plus Python client."""
620-
log.debug("Started processing results for job: {}".format(self))
624+
log.debug(f"Started processing results for job: {self}")
621625
await self.get_remote_files(self.transfer_output_files)
622626
self.completion_time = timezone.now()
623627
self._status = "COM"
624628
await self._safe_save()
625-
log.debug("Finished processing results for job: {}".format(self))
629+
if self.process_results_function:
630+
self.process_results_function(self)
631+
log.debug(f"Finished processing results for job: {self}")
626632

627633
async def get_intermediate_results(self):
628634
"""Retrieve intermediate result files from the supercomputer."""
629635
if await self.get_remote_files(self.transfer_intermediate_files):
630636
if self.process_intermediate_results_function:
631-
self.process_intermediate_results_function()
637+
self.process_intermediate_results_function(self)
632638

633639
def resolve_paths(self, paths):
634640
resolved_paths = []
@@ -640,22 +646,29 @@ def resolve_paths(self, paths):
640646
resolved_paths.append(self.pbs_job.resolve_path(p))
641647
return resolved_paths
642648

643-
async def get_remote_files(self, remote_filenames):
649+
async def get_remote_files(self, remote_paths):
644650
"""Transfer files from a directory on the super computer.
645651
646652
Args:
647-
remote_filenames (List[str]): Files to retrieve from remote_dir
653+
remote_paths (List[str]): Files to retrieve from remote_dir
648654
649655
Returns:
650656
bool: True if all file transfers succeed.
651657
"""
658+
remote_dirnames = []
659+
if isinstance(remote_paths, dict):
660+
remote_dirnames = remote_paths.get("dirs", [])
661+
remote_filenames = remote_paths.get("files", [])
662+
else:
663+
remote_filenames = remote_paths
652664

653665
# Ensure the local transfer directory exists
654666
workspace = Path(self.workspace)
655667
success = True
656-
remote_paths = self.resolve_paths(remote_filenames)
668+
remote_file_paths = self.resolve_paths(remote_filenames)
669+
remote_dir_paths = self.resolve_paths(remote_dirnames)
657670

658-
for remote_path in remote_paths:
671+
for remote_path in remote_file_paths:
659672
rel_path = remote_path.relative_to(self.working_dir)
660673
local_path = workspace / rel_path
661674
local_path.parent.mkdir(parents=True, exist_ok=True)
@@ -668,6 +681,14 @@ async def get_remote_files(self, remote_filenames):
668681
success = False
669682
log.error("Failed to get remote file: {}".format(str(e)))
670683

684+
for remote_dir in remote_dir_paths:
685+
rel_path = remote_dir.relative_to(self.working_dir)
686+
local_dir = workspace / rel_path
687+
try:
688+
await self.client.get_dir(remote_dir=remote_dir, local_dir=local_dir)
689+
except Exception as e:
690+
log.exception(e)
691+
671692
return success
672693

673694
@_ensure_connected

uit_plus_job/submit_stage.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -710,13 +710,38 @@ def action_button(self):
710710

711711
return row
712712

713+
@property
714+
def transfer_input_files(self):
715+
return None
716+
717+
@property
718+
def transfer_intermediate_files(self):
719+
return None
720+
713721
@property
714722
def transfer_output_files(self):
715723
return None
716724

725+
@property
726+
def intermediate_transfer_interval(self):
727+
return 0
728+
729+
@property
730+
def process_intermediate_results_function(job):
731+
pass
732+
733+
@property
734+
def process_results_function(job):
735+
pass
736+
717737
async def submit(self, custom_logs=None):
718738
self.job.script = self.pbs_script # update script to ensure it reflects any UI updates
719739
job = await database_sync_to_async(UitPlusJob.instance_from_pbs_job)(self.job, self.tethys_user)
720740
job.custom_logs = custom_logs or self.custom_logs
741+
job.transfer_input_files = self.transfer_input_files
742+
job.transfer_intermediate_files = self.transfer_intermediate_files
721743
job.transfer_output_files = self.transfer_output_files
744+
job.intermediate_transfer_interval = self.intermediate_transfer_interval
745+
job.process_intermediate_results_function = self.process_intermediate_results_function
746+
job.process_results_function = self.process_results_function
722747
await job.execute()

uit_plus_job/tests/integrated_tests/test_models.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ def test_get_remote_file_no_local_path(self, mock_client):
345345
remote_dir = "WORKDIR"
346346
mock_client.get_file.return_value = {"success": True}
347347
self.assertFalse(
348-
asyncio.run(self.uitplusjob.get_remote_files(remote_dir=remote_dir, remote_filenames=remote_files_names))
348+
asyncio.run(self.uitplusjob.get_remote_files(remote_dir=remote_dir, remote_paths=remote_files_names))
349349
)
350350

351351
@mock.patch("uit_plus_job.models.log")
@@ -356,7 +356,7 @@ def test_get_remote_file_io_error(self, mock_client, mock_log):
356356
mock_client.get_file.side_effect = IOError
357357

358358
# call the method
359-
ret = asyncio.run(self.uitplusjob.get_remote_files(remote_dir=remote_dir, remote_filenames=remote_files_names))
359+
ret = asyncio.run(self.uitplusjob.get_remote_files(remote_dir=remote_dir, remote_paths=remote_files_names))
360360

361361
# test results
362362
self.assertFalse(ret)
@@ -370,7 +370,7 @@ def test_get_remote_file(self, mock_client, mock_os):
370370
mock_os.path.join.side_effect = ["local_path", "remote_path"]
371371
mock_client.get_file.return_value = {"success": True}
372372
mock_os.path.exists.return_value = True
373-
ret = asyncio.run(self.uitplusjob.get_remote_files(remote_dir=remote_dir, remote_filenames=remote_files_names))
373+
ret = asyncio.run(self.uitplusjob.get_remote_files(remote_dir=remote_dir, remote_paths=remote_files_names))
374374

375375
# test results
376376
self.assertTrue(ret)

0 commit comments

Comments
 (0)