diff --git a/cylc/flow/commands.py b/cylc/flow/commands.py index 01982f233d..5ecd36230a 100644 --- a/cylc/flow/commands.py +++ b/cylc/flow/commands.py @@ -430,7 +430,7 @@ async def poll_tasks(schd: 'Scheduler', tasks: Iterable[str]): matched, unmatched = schd.pool.id_match(ids, only_match_pool=True) _report_unmatched(unmatched) itasks = schd.pool.get_itasks(matched) - schd.task_job_mgr.poll_task_jobs(itasks) + schd.task_job_mgr.poll_task_jobs(itasks, manual_request=True) yield len(unmatched) diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py index 47f0a17e7f..3451858502 100644 --- a/cylc/flow/task_job_mgr.py +++ b/cylc/flow/task_job_mgr.py @@ -223,7 +223,10 @@ def kill_prep_task(self, itask: 'TaskProxy') -> None: self._prep_submit_task_job_error(itask, '(killed in job prep)', '') def poll_task_jobs( - self, itasks: 'Iterable[TaskProxy]', msg: str | None = None + self, + itasks: 'Iterable[TaskProxy]', + msg: str | None = None, + manual_request: bool = False, ): """Poll jobs of specified tasks. @@ -245,7 +248,8 @@ def poll_task_jobs( if itask.state.status != TASK_STATUS_WAITING ], self._poll_task_jobs_callback, - self._poll_task_jobs_callback_255 + self._poll_task_jobs_callback_255, + continue_if_no_good_hosts=manual_request, ) def prep_submit_task_jobs( @@ -918,13 +922,34 @@ def _poll_task_job_message_callback(self, itask, cmd_ctx, line): log_task_job_activity(ctx, self.workflow, itask.point, itask.tdef.name) def _run_job_cmd( - self, cmd_key, itasks, callback, callback_255 + self, + cmd_key, + itasks, + callback, + callback_255, + continue_if_no_good_hosts=False, ): """Run job commands, e.g. poll, kill, etc. Group itasks with their platform_name and host. Put a job command for each group to the multiprocess pool. + Args: + cmd_key: + Identifier for the command to run. + itasks: + List of task proxies to run the command against. + callback: + Callback to run on command completion. + callback_255: + Callback to run on SSH error. + continue_if_no_good_hosts: + If True, the bad hosts set will be reset in the event that + there are no "good hosts" to run the command on. This should + only be turned on for manual operations, e.g. manual poll. Use + of this option for automatic commands may result in feedback + loops between parallel opererations. + """ if not itasks: return @@ -968,15 +993,22 @@ def _run_job_cmd( host = get_host_from_platform( platform, bad_hosts=self.bad_hosts ) - cmd = construct_ssh_cmd( - cmd, platform, host - ) except NoHostsError: - ctx.err = f'No available hosts for {platform["name"]}' - LOG.debug(ctx) - callback_255(ctx, itasks) - continue + if continue_if_no_good_hosts: + # no hosts available for this platform + # -> reset the bad hosts and try again + self.task_events_mgr.reset_bad_hosts() + host = get_host_from_platform( + platform, bad_hosts=self.bad_hosts + ) + else: + ctx.err = f'No available hosts for {platform["name"]}' + LOG.debug(ctx) + callback_255(ctx, itasks) + continue else: + + cmd = construct_ssh_cmd(cmd, platform, host) ctx = SubProcContext(cmd_key, cmd, host=host) for itask in sorted(itasks, key=lambda task: task.identity): diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 342bb1d58d..2e02523469 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -453,9 +453,7 @@ def capture_polling(): def _disable_polling(schd: 'Scheduler') -> 'Set[TaskProxy]': polled_tasks: 'Set[TaskProxy]' = set() - def run_job_cmd( - _1, itasks, _3, _4=None - ): + def run_job_cmd(_, itasks, *__, **___): polled_tasks.update(itasks) return itasks diff --git a/tests/integration/test_task_job_mgr.py b/tests/integration/test_task_job_mgr.py index d3164d8421..a91083fef0 100644 --- a/tests/integration/test_task_job_mgr.py +++ b/tests/integration/test_task_job_mgr.py @@ -21,11 +21,14 @@ from unittest.mock import Mock from cylc.flow import CYLC_LOG +from cylc.flow.commands import poll_tasks, run_cmd from cylc.flow.job_runner_mgr import JOB_FILES_REMOVED_MESSAGE +from cylc.flow.platforms import get_platform from cylc.flow.scheduler import Scheduler from cylc.flow.task_state import ( TASK_STATUS_FAILED, TASK_STATUS_RUNNING, + TASK_STATUS_SUBMITTED, ) @@ -260,6 +263,62 @@ async def test_poll_job_deleted_log_folder( ) +async def test_manual_poll_resets_bad_hosts( + flow, + scheduler, + start, + mock_glbl_cfg, +): + """Manual poll should be attempted even if all hosts are "bad". + + Automated polling will be skipped in the event that all of a platform's + hosts are in the known "bad hosts" list. + + Manual poll (similar to manual trigger), will reset the bad hosts list in + this eventuality in order to ensure the operation is attempted anyway. + + See https://github.com/cylc/cylc-flow/issues/7029 + """ + mock_glbl_cfg( + 'cylc.flow.platforms.glbl_cfg', + ''' + [platforms] + [[my-remote]] + hosts = abc + ''', + ) + id_ = flow({ + 'scheduling': { + 'graph': { + 'R1': 'foo', + }, + }, + 'runtime': { + 'foo': { + 'platform': 'my-remote', + }, + }, + }) + schd: 'Scheduler' = scheduler(id_, run_mode='live') + async with start(schd): + # make it look like the task is submitted + itask = schd.pool.get_tasks()[0] + itask.platform = get_platform('my-remote') + itask.state_reset(TASK_STATUS_SUBMITTED) + + # mark all of the hosts as bad hosts + schd.task_events_mgr.bad_hosts.update(itask.platform['hosts']) + + # request a manual poll + await run_cmd(poll_tasks(schd, [itask.tokens.relative_id])) + + # the bad hosts set should be empty + assert not schd.task_events_mgr.bad_hosts + + # and a poll command should have been issued + assert len(schd.proc_pool.queuings) + + async def test__prep_submit_task_job_impl_handles_all_old_platform_settings( flow: Fixture, scheduler: Fixture,