Skip to content

Commit ab458e2

Browse files
polling: reset bad hosts on manual poll
* Closes #7029 * This reduces the pressure on #7001 (polling is not attempted if a platform runs out of hosts until the next run of reset-bad hosts) by making it easier for operators to reset bad hosts and recover their workflow in the event of platform outages.
1 parent 65a00ce commit ab458e2

File tree

4 files changed

+100
-14
lines changed

4 files changed

+100
-14
lines changed

cylc/flow/commands.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,7 @@ async def poll_tasks(schd: 'Scheduler', tasks: Iterable[str]):
430430
matched, unmatched = schd.pool.id_match(ids, only_match_pool=True)
431431
_report_unmatched(unmatched)
432432
itasks = schd.pool.get_itasks(matched)
433-
schd.task_job_mgr.poll_task_jobs(itasks)
433+
schd.task_job_mgr.poll_task_jobs(itasks, manual_request=True)
434434
yield len(unmatched)
435435

436436

cylc/flow/task_job_mgr.py

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ def kill_prep_task(self, itask: 'TaskProxy') -> None:
222222
self._set_retry_timers(itask)
223223
self._prep_submit_task_job_error(itask, '(killed in job prep)', '')
224224

225-
def poll_task_jobs(self, itasks, msg=None):
225+
def poll_task_jobs(self, itasks, msg=None, manual_request=False):
226226
"""Poll jobs of specified tasks.
227227
228228
This method uses _poll_task_jobs_callback() and
@@ -243,7 +243,8 @@ def poll_task_jobs(self, itasks, msg=None):
243243
if itask.state.status != TASK_STATUS_WAITING
244244
],
245245
self._poll_task_jobs_callback,
246-
self._poll_task_jobs_callback_255
246+
self._poll_task_jobs_callback_255,
247+
continue_if_no_good_hosts=manual_request,
247248
)
248249

249250
def prep_submit_task_jobs(
@@ -916,13 +917,34 @@ def _poll_task_job_message_callback(self, itask, cmd_ctx, line):
916917
log_task_job_activity(ctx, self.workflow, itask.point, itask.tdef.name)
917918

918919
def _run_job_cmd(
919-
self, cmd_key, itasks, callback, callback_255
920+
self,
921+
cmd_key,
922+
itasks,
923+
callback,
924+
callback_255,
925+
continue_if_no_good_hosts=False,
920926
):
921927
"""Run job commands, e.g. poll, kill, etc.
922928
923929
Group itasks with their platform_name and host.
924930
Put a job command for each group to the multiprocess pool.
925931
932+
Args:
933+
cmd_key:
934+
Identifier for the command to run.
935+
itasks:
936+
List of task proxies to run the command against.
937+
callback:
938+
Callback to run on command completion.
939+
callback_255:
940+
Callback to run on SSH error.
941+
continue_if_no_good_hosts:
942+
If True, the bad hosts set will be reset in the event that
943+
there are no "good hosts" to run the command on. This should
944+
only be turned on for manual operations, e.g. manual poll. Use
945+
of this option for automatic commands may result in feedback
946+
loops between parallel opererations.
947+
926948
"""
927949
if not itasks:
928950
return
@@ -966,15 +988,22 @@ def _run_job_cmd(
966988
host = get_host_from_platform(
967989
platform, bad_hosts=self.bad_hosts
968990
)
969-
cmd = construct_ssh_cmd(
970-
cmd, platform, host
971-
)
972991
except NoHostsError:
973-
ctx.err = f'No available hosts for {platform["name"]}'
974-
LOG.debug(ctx)
975-
callback_255(ctx, itasks)
976-
continue
992+
if continue_if_no_good_hosts:
993+
# no hosts available for this platform
994+
# -> reset the bad hosts and try again
995+
self.task_events_mgr.reset_bad_hosts()
996+
host = get_host_from_platform(
997+
platform, bad_hosts=self.bad_hosts
998+
)
999+
else:
1000+
ctx.err = f'No available hosts for {platform["name"]}'
1001+
LOG.debug(ctx)
1002+
callback_255(ctx, itasks)
1003+
continue
9771004
else:
1005+
1006+
cmd = construct_ssh_cmd(cmd, platform, host)
9781007
ctx = SubProcContext(cmd_key, cmd, host=host)
9791008

9801009
for itask in sorted(itasks, key=lambda task: task.identity):

tests/integration/conftest.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -453,9 +453,7 @@ def capture_polling():
453453
def _disable_polling(schd: 'Scheduler') -> 'Set[TaskProxy]':
454454
polled_tasks: 'Set[TaskProxy]' = set()
455455

456-
def run_job_cmd(
457-
_1, itasks, _3, _4=None
458-
):
456+
def run_job_cmd(_, itasks, *__, **___):
459457
polled_tasks.update(itasks)
460458
return itasks
461459

tests/integration/test_task_job_mgr.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,14 @@
2121
from unittest.mock import Mock
2222

2323
from cylc.flow import CYLC_LOG
24+
from cylc.flow.commands import poll_tasks, run_cmd
2425
from cylc.flow.job_runner_mgr import JOB_FILES_REMOVED_MESSAGE
26+
from cylc.flow.platforms import get_platform
2527
from cylc.flow.scheduler import Scheduler
2628
from cylc.flow.task_state import (
2729
TASK_STATUS_FAILED,
2830
TASK_STATUS_RUNNING,
31+
TASK_STATUS_SUBMITTED,
2932
)
3033

3134

@@ -260,6 +263,62 @@ async def test_poll_job_deleted_log_folder(
260263
)
261264

262265

266+
async def test_manual_poll_resets_bad_hosts(
267+
flow,
268+
scheduler,
269+
start,
270+
mock_glbl_cfg,
271+
):
272+
"""Manual poll should be attempted even if all hosts are "bad".
273+
274+
Automated polling will be skipped in the event that all of a platform's
275+
hosts are in the known "bad hosts" list.
276+
277+
Manual poll (similar to manual trigger), will reset the bad hosts list in
278+
this eventuality in order to ensure the operation is attempted anyway.
279+
280+
See https://github.com/cylc/cylc-flow/issues/7029
281+
"""
282+
mock_glbl_cfg(
283+
'cylc.flow.platforms.glbl_cfg',
284+
'''
285+
[platforms]
286+
[[my-remote]]
287+
hosts = abc
288+
''',
289+
)
290+
id_ = flow({
291+
'scheduling': {
292+
'graph': {
293+
'R1': 'foo',
294+
},
295+
},
296+
'runtime': {
297+
'foo': {
298+
'platform': 'my-remote',
299+
},
300+
},
301+
})
302+
schd: 'Scheduler' = scheduler(id_, run_mode='live')
303+
async with start(schd):
304+
# make it look like the task is submitted
305+
itask = schd.pool.get_tasks()[0]
306+
itask.platform = get_platform('my-remote')
307+
itask.state_reset(TASK_STATUS_SUBMITTED)
308+
309+
# mark all of the hosts as bad hosts
310+
schd.task_events_mgr.bad_hosts.update(itask.platform['hosts'])
311+
312+
# request a manual poll
313+
await run_cmd(poll_tasks(schd, [itask.tokens.relative_id]))
314+
315+
# the bad hosts set should be empty
316+
assert not schd.task_events_mgr.bad_hosts
317+
318+
# and a poll command should have been issued
319+
assert len(schd.proc_pool.queuings)
320+
321+
263322
async def test__prep_submit_task_job_impl_handles_all_old_platform_settings(
264323
flow: Fixture,
265324
scheduler: Fixture,

0 commit comments

Comments
 (0)