Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cylc/flow/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ async def poll_tasks(schd: 'Scheduler', tasks: Iterable[str]):
matched, unmatched = schd.pool.id_match(ids, only_match_pool=True)
_report_unmatched(unmatched)
itasks = schd.pool.get_itasks(matched)
schd.task_job_mgr.poll_task_jobs(itasks)
schd.task_job_mgr.poll_task_jobs(itasks, manual_request=True)
yield len(unmatched)


Expand Down
52 changes: 42 additions & 10 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,10 @@ def kill_prep_task(self, itask: 'TaskProxy') -> None:
self._prep_submit_task_job_error(itask, '(killed in job prep)', '')

def poll_task_jobs(
self, itasks: 'Iterable[TaskProxy]', msg: str | None = None
self,
itasks: 'Iterable[TaskProxy]',
msg: str | None = None,
manual_request: bool = False,
):
"""Poll jobs of specified tasks.

Expand All @@ -245,7 +248,8 @@ def poll_task_jobs(
if itask.state.status != TASK_STATUS_WAITING
],
self._poll_task_jobs_callback,
self._poll_task_jobs_callback_255
self._poll_task_jobs_callback_255,
continue_if_no_good_hosts=manual_request,
)

def prep_submit_task_jobs(
Expand Down Expand Up @@ -918,13 +922,34 @@ def _poll_task_job_message_callback(self, itask, cmd_ctx, line):
log_task_job_activity(ctx, self.workflow, itask.point, itask.tdef.name)

def _run_job_cmd(
self, cmd_key, itasks, callback, callback_255
self,
cmd_key,
itasks,
callback,
callback_255,
continue_if_no_good_hosts=False,
):
"""Run job commands, e.g. poll, kill, etc.

Group itasks with their platform_name and host.
Put a job command for each group to the multiprocess pool.

Args:
cmd_key:
Identifier for the command to run.
itasks:
List of task proxies to run the command against.
callback:
Callback to run on command completion.
callback_255:
Callback to run on SSH error.
continue_if_no_good_hosts:
If True, the bad hosts set will be reset in the event that
there are no "good hosts" to run the command on. This should
only be turned on for manual operations, e.g. manual poll. Use
of this option for automatic commands may result in feedback
loops between parallel opererations.

"""
if not itasks:
return
Expand Down Expand Up @@ -968,15 +993,22 @@ def _run_job_cmd(
host = get_host_from_platform(
platform, bad_hosts=self.bad_hosts
)
cmd = construct_ssh_cmd(
cmd, platform, host
)
except NoHostsError:
ctx.err = f'No available hosts for {platform["name"]}'
LOG.debug(ctx)
callback_255(ctx, itasks)
continue
if continue_if_no_good_hosts:
# no hosts available for this platform
# -> reset the bad hosts and try again
self.task_events_mgr.reset_bad_hosts()
host = get_host_from_platform(
platform, bad_hosts=self.bad_hosts
)
else:
ctx.err = f'No available hosts for {platform["name"]}'
LOG.debug(ctx)
callback_255(ctx, itasks)
continue
Comment on lines +997 to +1008
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both branches here continue to the next iteration of the for loop. host is not used in the first branch. Did you mean to remove the else condition on the line below?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mildly concerned that if Ronnie is right (I think he is) that the test isn't failing....

else:

cmd = construct_ssh_cmd(cmd, platform, host)
ctx = SubProcContext(cmd_key, cmd, host=host)

for itask in sorted(itasks, key=lambda task: task.identity):
Expand Down
4 changes: 1 addition & 3 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,9 +453,7 @@ def capture_polling():
def _disable_polling(schd: 'Scheduler') -> 'Set[TaskProxy]':
polled_tasks: 'Set[TaskProxy]' = set()

def run_job_cmd(
_1, itasks, _3, _4=None
):
def run_job_cmd(_, itasks, *__, **___):
polled_tasks.update(itasks)
return itasks

Expand Down
59 changes: 59 additions & 0 deletions tests/integration/test_task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@
from unittest.mock import Mock

from cylc.flow import CYLC_LOG
from cylc.flow.commands import poll_tasks, run_cmd
from cylc.flow.job_runner_mgr import JOB_FILES_REMOVED_MESSAGE
from cylc.flow.platforms import get_platform
from cylc.flow.scheduler import Scheduler
from cylc.flow.task_state import (
TASK_STATUS_FAILED,
TASK_STATUS_RUNNING,
TASK_STATUS_SUBMITTED,
)


Expand Down Expand Up @@ -260,6 +263,62 @@ async def test_poll_job_deleted_log_folder(
)


async def test_manual_poll_resets_bad_hosts(
flow,
scheduler,
start,
mock_glbl_cfg,
):
"""Manual poll should be attempted even if all hosts are "bad".

Automated polling will be skipped in the event that all of a platform's
hosts are in the known "bad hosts" list.

Manual poll (similar to manual trigger), will reset the bad hosts list in
this eventuality in order to ensure the operation is attempted anyway.

See https://github.com/cylc/cylc-flow/issues/7029
"""
mock_glbl_cfg(
'cylc.flow.platforms.glbl_cfg',
'''
[platforms]
[[my-remote]]
hosts = abc
''',
)
id_ = flow({
'scheduling': {
'graph': {
'R1': 'foo',
},
},
'runtime': {
'foo': {
'platform': 'my-remote',
},
},
})
schd: 'Scheduler' = scheduler(id_, run_mode='live')
async with start(schd):
# make it look like the task is submitted
itask = schd.pool.get_tasks()[0]
itask.platform = get_platform('my-remote')
itask.state_reset(TASK_STATUS_SUBMITTED)

# mark all of the hosts as bad hosts
schd.task_events_mgr.bad_hosts.update(itask.platform['hosts'])

# request a manual poll
await run_cmd(poll_tasks(schd, [itask.tokens.relative_id]))

# the bad hosts set should be empty
assert not schd.task_events_mgr.bad_hosts

# and a poll command should have been issued
assert len(schd.proc_pool.queuings)


async def test__prep_submit_task_job_impl_handles_all_old_platform_settings(
flow: Fixture,
scheduler: Fixture,
Expand Down
Loading