Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changes.d/7052.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Make the list of files used to determine the success of job log retrieval
commands configurable.
33 changes: 33 additions & 0 deletions cylc/flow/cfgspec/globalcfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@
logs appearing in their final location (due to the job runner)
you can configure time intervals here to delay the first and
subsequent retrieval attempts.

Job log retrieval will be retried until the expected job files have
been retried, see
:cylc:conf:`
global.cylc[platforms][<platform name>]retrieve job log expected files`
for details
''')
}

Expand Down Expand Up @@ -1751,6 +1757,33 @@ def default_for(
retry delays``.
{replaces}
''')
Conf(
'retrieve job log expected files',
VDR.V_STRING_LIST,
'',
desc='''
Configure the log files that job log retrieval is expected to
return.

By default, job log retrieval is considered successful once
it has retrieved the "job.out" file, and additionally the
"job.err" file if the job failed.

Cylc will repeat job log retrieval according to the configured
:cylc:conf:`[..]retrieve job logs retry delays` until the
expected file(s) have been retrieved.

This configuration allows you to configure additional files
to add to this success condition.

The purpose of this configuration is to facilitate working with
files written asynchronously by job runners which may not be
created until after the job has succeeded. E.g, job report
or accounting files.

.. versionadded:: 8.7.0
''',
)
Conf('tail command template',
VDR.V_STRING, 'tail -n +1 --follow=name %(filename)s',
desc=f'''
Expand Down
10 changes: 9 additions & 1 deletion cylc/flow/scripts/cat_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,15 @@ def _main(
workflow_id, point, task, submit_num
)

job_log_present = (Path(local_log_dir) / "job.out").exists()
job_log_present = all(
(Path(local_log_dir) / file).exists()
for file in [
# the files which are used to indicate that job log retrieval
# has completed
'job.out',
*platform['retrieve job log expected files'],
]
)
Comment on lines +589 to +597
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ChrisPaulBennett

With this PR, the condition we use to determine whether job log retrieval has completed can also take into account these configured files. The cylc cat-log logic should also respect this.


log_is_remote = (is_remote_platform(platform)
and (options.filename != JOB_LOG_ACTIVITY))
Expand Down
43 changes: 30 additions & 13 deletions cylc/flow/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
INFO,
getLevelName,
)
import logging
import os
from pathlib import Path
import shlex
Expand Down Expand Up @@ -224,6 +225,7 @@ def log_task_job_activity(
point: 'str | PointBase',
name: str,
submit_num: str | int | None = None,
level: int | None = None,
):
"""Log an activity for a task job."""
ctx_str = str(ctx)
Expand All @@ -241,9 +243,11 @@ def log_task_job_activity(
# selection command causes a submission failure, or if a waiting task
# expires before a job log directory is otherwise needed.
# (Don't log the exception content, it looks like a bug).
LOG.info(ctx_str)
level = logging.INFO
if ctx.cmd and ctx.ret_code:
LOG.error(ctx_str)
level = logging.ERROR
if level is not None:
LOG.log(level, ctx_str)


class EventData(Enum):
Expand Down Expand Up @@ -1184,6 +1188,15 @@ def _process_job_logs_retrieval(
# get a host to run retrieval on
try:
platform = get_platform(ctx.platform_name)
except PlatformLookupError:
log_platform_event(
'Unable to retrieve job logs.',
{'name': ctx.platform_name},
level='warning',
)
return

try:
host = get_host_from_platform(platform, bad_hosts=self.bad_hosts)
Copy link
Member Author

@oliver-sanders oliver-sanders Oct 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Split this try block into two so tools like mypy can tell that platform cannot be undefined.

(see the before part of the diff)

except NoHostsError:
# All of the platforms hosts have been found to be uncontactable.
Expand All @@ -1202,13 +1215,6 @@ def _process_job_logs_retrieval(
for id_key in id_keys:
self.unset_waiting_event_timer(id_key)
return
except PlatformLookupError:
log_platform_event(
'Unable to retrieve job logs.',
{'name': ctx.platform_name},
level='warning',
)
return

# construct the retrieval command
ssh_str = str(platform["ssh command"])
Expand Down Expand Up @@ -1237,26 +1243,33 @@ def _process_job_logs_retrieval(
# Local target
cmd.append(get_workflow_run_job_dir(schd.workflow) + "/")

expected_log_files = platform['retrieve job log expected files']

# schedule command
self.proc_pool.put_command(
SubProcContext(
ctx, cmd, env=dict(os.environ), id_keys=id_keys, host=host
),
bad_hosts=self.bad_hosts,
callback=self._job_logs_retrieval_callback,
callback_args=[schd],
callback_255=self._job_logs_retrieval_callback_255
callback_args=[schd, expected_log_files],
callback_255=self._job_logs_retrieval_callback_255,
)

def _job_logs_retrieval_callback_255(self, proc_ctx, schd) -> None:
def _job_logs_retrieval_callback_255(self, proc_ctx, *args) -> None:
"""Call back when log job retrieval fails with a 255 error."""
self.bad_hosts.add(proc_ctx.host)
for _ in proc_ctx.cmd_kwargs["id_keys"]:
for key in proc_ctx.cmd_kwargs['id_keys']:
timer = self._event_timers[key]
timer.reset()

def _job_logs_retrieval_callback(self, proc_ctx, schd) -> None:
def _job_logs_retrieval_callback(
self,
proc_ctx,
schd,
expected_log_files,
) -> None:
"""Call back when log job retrieval completes."""
if (
(proc_ctx.ret_code and LOG.isEnabledFor(DEBUG))
Expand All @@ -1271,6 +1284,7 @@ def _job_logs_retrieval_callback(self, proc_ctx, schd) -> None:
with suppress(TypeError):
if id_key.event not in 'succeeded':
fnames.append(JOB_LOG_ERR)
fnames.extend(expected_log_files or [])
fname_oks = {}
for fname in fnames:
fname_oks[fname] = os.path.exists(get_task_job_log(
Expand Down Expand Up @@ -1304,6 +1318,9 @@ def _job_logs_retrieval_callback(self, proc_ctx, schd) -> None:
id_key.tokens['cycle'],
id_key.tokens['task'],
id_key.tokens['job'],
# ensure the list of remaining files to retrieve is logged
# for deubging
level=logging.DEBUG
)
except KeyError as exc:
LOG.exception(exc)
Expand Down
106 changes: 103 additions & 3 deletions tests/integration/test_task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,34 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import logging
from pathlib import Path
from types import SimpleNamespace
from typing import Any as Fixture

import pytest

from cylc.flow import CYLC_LOG
from cylc.flow.data_store_mgr import (
JOBS,
TASK_STATUS_WAITING,
)
from cylc.flow.id import Tokens
from cylc.flow.network.resolvers import TaskMsg
from cylc.flow.run_modes import RunMode
from cylc.flow.scheduler import Scheduler
from cylc.flow.task_events_mgr import (
EventKey,
TaskEventsManager,
TaskJobLogsRetrieveContext,
)
from cylc.flow.task_job_logs import get_task_job_log
from cylc.flow.task_state import (
TASK_STATUS_PREPARING,
TASK_STATUS_SUBMIT_FAILED,
)

from cylc.flow.network.resolvers import TaskMsg

from .test_workflow_events import TEMPLATES


# NOTE: we do not test custom event handlers here because these are tested
# as a part of workflow validation (now also performed by cylc play)

Expand Down Expand Up @@ -368,3 +370,101 @@ async def test_event_email_body(
assert f'host: {mod_one.host}' in email_body
assert f'port: {mod_one.server.port}' in email_body
assert f'owner: {mod_one.owner}' in email_body


async def test_job_log_retrieval_success_condition(
one: 'Scheduler', start, caplog, mock_glbl_cfg, log_filter
):
"""Test the success condition for job log retrieval.

Job log retrieval may be retried automatically if configured.

It will stop when the configured list of files has been retrieved.
"""
mock_glbl_cfg(
'cylc.flow.platforms.glbl_cfg',
'''
[platforms]
[[localhost]]
retrieve job log expected files = job.forty-two
'''
)
# capture event timer calls
_remove_event_timer_calls = []
_unset_waiting_event_timer_calls = []

# called if retrieval complete
def _remove_event_timer(id_key):
_remove_event_timer_calls.append(id_key)

# called if retrieval incomplete
def _unset_waiting_event_timer(id_key):
_unset_waiting_event_timer_calls.append(id_key)

def job_logs_retrieve(*retrieved_files):
"""Run simulated job log retrieval.

Any files specified will be created on the filesystem (simulating their
retrieval).
"""
# request job log retrieval
ctx = TaskJobLogsRetrieveContext(
TaskEventsManager.HANDLER_JOB_LOGS_RETRIEVE, 'localhost', None
)
id_key = EventKey(
handler='job-logs-retrieve',
event='succeeded',
message='succeeded',
tokens=one.pool.get_tasks()[0].tokens,
)
one.task_events_mgr._process_job_logs_retrieval(
one,
ctx,
[id_key],
)

# simulate job log retrieval
ctx, _, callback, callback_args, *__ = one.proc_pool.queuings.popleft()
for fname in ('job-activity.log', *retrieved_files):
# create the job log dir and any requested files
job_log_file = Path(get_task_job_log(
one.workflow,
id_key.tokens['cycle'],
id_key.tokens['task'],
id_key.tokens['job'],
fname,
))
job_log_file.parent.mkdir(parents=True, exist_ok=True)
job_log_file.touch()
ctx.ret_code = 0
callback(ctx, *callback_args)

async with start(one):
one.task_events_mgr.remove_event_timer = _remove_event_timer
one.task_events_mgr.unset_waiting_event_timer = (
_unset_waiting_event_timer
)
caplog.set_level(logging.DEBUG, CYLC_LOG)

# run retrieval -> no files are retrieval
caplog.clear()
job_logs_retrieve()
assert log_filter(
contains='File(s) not retrieved: job.forty-two job.out'
)
assert len(_unset_waiting_event_timer_calls) == 1
assert len(_remove_event_timer_calls) == 0

# run retrieval -> "job.forty-two" is retrieved
caplog.clear()
job_logs_retrieve('job.forty-two')
assert log_filter(contains='File(s) not retrieved: job.out')
assert len(_unset_waiting_event_timer_calls) == 2
assert len(_remove_event_timer_calls) == 0

# run retrieval -> "job.out" is retrieved
caplog.clear()
job_logs_retrieve('job.out')
assert not log_filter(contains='File(s) not retrieved')
assert len(_unset_waiting_event_timer_calls) == 2
assert len(_remove_event_timer_calls) == 1
Loading