diff --git a/changes.d/7052.feat.md b/changes.d/7052.feat.md new file mode 100644 index 0000000000..36b20a05f5 --- /dev/null +++ b/changes.d/7052.feat.md @@ -0,0 +1,2 @@ +Make the list of files used to determine the success of job log retrieval +commands configurable. diff --git a/cylc/flow/cfgspec/globalcfg.py b/cylc/flow/cfgspec/globalcfg.py index d4328e4e93..1059863fc8 100644 --- a/cylc/flow/cfgspec/globalcfg.py +++ b/cylc/flow/cfgspec/globalcfg.py @@ -160,6 +160,12 @@ logs appearing in their final location (due to the job runner) you can configure time intervals here to delay the first and subsequent retrieval attempts. + + Job log retrieval will be retried until the expected job files have + been retried, see + :cylc:conf:` + global.cylc[platforms][]retrieve job log expected files` + for details ''') } @@ -1751,6 +1757,33 @@ def default_for( retry delays``. {replaces} ''') + Conf( + 'retrieve job log expected files', + VDR.V_STRING_LIST, + '', + desc=''' + Configure the log files that job log retrieval is expected to + return. + + By default, job log retrieval is considered successful once + it has retrieved the "job.out" file, and additionally the + "job.err" file if the job failed. + + Cylc will repeat job log retrieval according to the configured + :cylc:conf:`[..]retrieve job logs retry delays` until the + expected file(s) have been retrieved. + + This configuration allows you to configure additional files + to add to this success condition. + + The purpose of this configuration is to facilitate working with + files written asynchronously by job runners which may not be + created until after the job has succeeded. E.g, job report + or accounting files. + + .. versionadded:: 8.7.0 + ''', + ) Conf('tail command template', VDR.V_STRING, 'tail -n +1 --follow=name %(filename)s', desc=f''' diff --git a/cylc/flow/scripts/cat_log.py b/cylc/flow/scripts/cat_log.py index 535107a7a4..39fe8a5850 100755 --- a/cylc/flow/scripts/cat_log.py +++ b/cylc/flow/scripts/cat_log.py @@ -586,7 +586,15 @@ def _main( workflow_id, point, task, submit_num ) - job_log_present = (Path(local_log_dir) / "job.out").exists() + job_log_present = all( + (Path(local_log_dir) / file).exists() + for file in [ + # the files which are used to indicate that job log retrieval + # has completed + 'job.out', + *platform['retrieve job log expected files'], + ] + ) log_is_remote = (is_remote_platform(platform) and (options.filename != JOB_LOG_ACTIVITY)) diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py index 5cfb496345..91d0ec0c61 100644 --- a/cylc/flow/task_events_mgr.py +++ b/cylc/flow/task_events_mgr.py @@ -32,6 +32,7 @@ INFO, getLevelName, ) +import logging import os from pathlib import Path import shlex @@ -224,6 +225,7 @@ def log_task_job_activity( point: 'str | PointBase', name: str, submit_num: str | int | None = None, + level: int | None = None, ): """Log an activity for a task job.""" ctx_str = str(ctx) @@ -241,9 +243,11 @@ def log_task_job_activity( # selection command causes a submission failure, or if a waiting task # expires before a job log directory is otherwise needed. # (Don't log the exception content, it looks like a bug). - LOG.info(ctx_str) + level = logging.INFO if ctx.cmd and ctx.ret_code: - LOG.error(ctx_str) + level = logging.ERROR + if level is not None: + LOG.log(level, ctx_str) class EventData(Enum): @@ -1184,6 +1188,15 @@ def _process_job_logs_retrieval( # get a host to run retrieval on try: platform = get_platform(ctx.platform_name) + except PlatformLookupError: + log_platform_event( + 'Unable to retrieve job logs.', + {'name': ctx.platform_name}, + level='warning', + ) + return + + try: host = get_host_from_platform(platform, bad_hosts=self.bad_hosts) except NoHostsError: # All of the platforms hosts have been found to be uncontactable. @@ -1202,13 +1215,6 @@ def _process_job_logs_retrieval( for id_key in id_keys: self.unset_waiting_event_timer(id_key) return - except PlatformLookupError: - log_platform_event( - 'Unable to retrieve job logs.', - {'name': ctx.platform_name}, - level='warning', - ) - return # construct the retrieval command ssh_str = str(platform["ssh command"]) @@ -1237,6 +1243,8 @@ def _process_job_logs_retrieval( # Local target cmd.append(get_workflow_run_job_dir(schd.workflow) + "/") + expected_log_files = platform['retrieve job log expected files'] + # schedule command self.proc_pool.put_command( SubProcContext( @@ -1244,11 +1252,11 @@ def _process_job_logs_retrieval( ), bad_hosts=self.bad_hosts, callback=self._job_logs_retrieval_callback, - callback_args=[schd], - callback_255=self._job_logs_retrieval_callback_255 + callback_args=[schd, expected_log_files], + callback_255=self._job_logs_retrieval_callback_255, ) - def _job_logs_retrieval_callback_255(self, proc_ctx, schd) -> None: + def _job_logs_retrieval_callback_255(self, proc_ctx, *args) -> None: """Call back when log job retrieval fails with a 255 error.""" self.bad_hosts.add(proc_ctx.host) for _ in proc_ctx.cmd_kwargs["id_keys"]: @@ -1256,7 +1264,12 @@ def _job_logs_retrieval_callback_255(self, proc_ctx, schd) -> None: timer = self._event_timers[key] timer.reset() - def _job_logs_retrieval_callback(self, proc_ctx, schd) -> None: + def _job_logs_retrieval_callback( + self, + proc_ctx, + schd, + expected_log_files, + ) -> None: """Call back when log job retrieval completes.""" if ( (proc_ctx.ret_code and LOG.isEnabledFor(DEBUG)) @@ -1271,6 +1284,7 @@ def _job_logs_retrieval_callback(self, proc_ctx, schd) -> None: with suppress(TypeError): if id_key.event not in 'succeeded': fnames.append(JOB_LOG_ERR) + fnames.extend(expected_log_files or []) fname_oks = {} for fname in fnames: fname_oks[fname] = os.path.exists(get_task_job_log( @@ -1304,6 +1318,9 @@ def _job_logs_retrieval_callback(self, proc_ctx, schd) -> None: id_key.tokens['cycle'], id_key.tokens['task'], id_key.tokens['job'], + # ensure the list of remaining files to retrieve is logged + # for deubging + level=logging.DEBUG ) except KeyError as exc: LOG.exception(exc) diff --git a/tests/integration/test_task_events_mgr.py b/tests/integration/test_task_events_mgr.py index 9814c58c2d..00e7d99589 100644 --- a/tests/integration/test_task_events_mgr.py +++ b/tests/integration/test_task_events_mgr.py @@ -15,32 +15,34 @@ # along with this program. If not, see . import logging +from pathlib import Path from types import SimpleNamespace from typing import Any as Fixture import pytest +from cylc.flow import CYLC_LOG from cylc.flow.data_store_mgr import ( JOBS, TASK_STATUS_WAITING, ) from cylc.flow.id import Tokens +from cylc.flow.network.resolvers import TaskMsg from cylc.flow.run_modes import RunMode from cylc.flow.scheduler import Scheduler from cylc.flow.task_events_mgr import ( EventKey, + TaskEventsManager, TaskJobLogsRetrieveContext, ) +from cylc.flow.task_job_logs import get_task_job_log from cylc.flow.task_state import ( TASK_STATUS_PREPARING, TASK_STATUS_SUBMIT_FAILED, ) -from cylc.flow.network.resolvers import TaskMsg - from .test_workflow_events import TEMPLATES - # NOTE: we do not test custom event handlers here because these are tested # as a part of workflow validation (now also performed by cylc play) @@ -368,3 +370,101 @@ async def test_event_email_body( assert f'host: {mod_one.host}' in email_body assert f'port: {mod_one.server.port}' in email_body assert f'owner: {mod_one.owner}' in email_body + + +async def test_job_log_retrieval_success_condition( + one: 'Scheduler', start, caplog, mock_glbl_cfg, log_filter +): + """Test the success condition for job log retrieval. + + Job log retrieval may be retried automatically if configured. + + It will stop when the configured list of files has been retrieved. + """ + mock_glbl_cfg( + 'cylc.flow.platforms.glbl_cfg', + ''' + [platforms] + [[localhost]] + retrieve job log expected files = job.forty-two + ''' + ) + # capture event timer calls + _remove_event_timer_calls = [] + _unset_waiting_event_timer_calls = [] + + # called if retrieval complete + def _remove_event_timer(id_key): + _remove_event_timer_calls.append(id_key) + + # called if retrieval incomplete + def _unset_waiting_event_timer(id_key): + _unset_waiting_event_timer_calls.append(id_key) + + def job_logs_retrieve(*retrieved_files): + """Run simulated job log retrieval. + + Any files specified will be created on the filesystem (simulating their + retrieval). + """ + # request job log retrieval + ctx = TaskJobLogsRetrieveContext( + TaskEventsManager.HANDLER_JOB_LOGS_RETRIEVE, 'localhost', None + ) + id_key = EventKey( + handler='job-logs-retrieve', + event='succeeded', + message='succeeded', + tokens=one.pool.get_tasks()[0].tokens, + ) + one.task_events_mgr._process_job_logs_retrieval( + one, + ctx, + [id_key], + ) + + # simulate job log retrieval + ctx, _, callback, callback_args, *__ = one.proc_pool.queuings.popleft() + for fname in ('job-activity.log', *retrieved_files): + # create the job log dir and any requested files + job_log_file = Path(get_task_job_log( + one.workflow, + id_key.tokens['cycle'], + id_key.tokens['task'], + id_key.tokens['job'], + fname, + )) + job_log_file.parent.mkdir(parents=True, exist_ok=True) + job_log_file.touch() + ctx.ret_code = 0 + callback(ctx, *callback_args) + + async with start(one): + one.task_events_mgr.remove_event_timer = _remove_event_timer + one.task_events_mgr.unset_waiting_event_timer = ( + _unset_waiting_event_timer + ) + caplog.set_level(logging.DEBUG, CYLC_LOG) + + # run retrieval -> no files are retrieval + caplog.clear() + job_logs_retrieve() + assert log_filter( + contains='File(s) not retrieved: job.forty-two job.out' + ) + assert len(_unset_waiting_event_timer_calls) == 1 + assert len(_remove_event_timer_calls) == 0 + + # run retrieval -> "job.forty-two" is retrieved + caplog.clear() + job_logs_retrieve('job.forty-two') + assert log_filter(contains='File(s) not retrieved: job.out') + assert len(_unset_waiting_event_timer_calls) == 2 + assert len(_remove_event_timer_calls) == 0 + + # run retrieval -> "job.out" is retrieved + caplog.clear() + job_logs_retrieve('job.out') + assert not log_filter(contains='File(s) not retrieved') + assert len(_unset_waiting_event_timer_calls) == 2 + assert len(_remove_event_timer_calls) == 1