From c37ef7215382de5d8da0f8158bf4855264ea409d Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Wed, 8 May 2024 13:35:08 +0100 Subject: [PATCH] response to review 1 --- cylc/flow/config.py | 1 - cylc/flow/prerequisite.py | 20 +++++++++++++++----- cylc/flow/run_modes/skip.py | 13 +++++++------ cylc/flow/scheduler.py | 7 +++---- cylc/flow/scheduler_cli.py | 6 +++--- cylc/flow/task_pool.py | 8 ++++++-- cylc/flow/task_proxy.py | 4 ++-- cylc/flow/task_state.py | 13 ++++++------- 8 files changed, 42 insertions(+), 30 deletions(-) diff --git a/cylc/flow/config.py b/cylc/flow/config.py index ad256f253f9..e9bbda63852 100644 --- a/cylc/flow/config.py +++ b/cylc/flow/config.py @@ -82,7 +82,6 @@ ) from cylc.flow.print_tree import print_tree from cylc.flow.task_qualifiers import ALT_QUALIFIERS -from cylc.flow.simulation import configure_sim_modes from cylc.flow.run_modes.nonlive import mode_validate_checks from cylc.flow.subprocctx import SubFuncContext from cylc.flow.task_events_mgr import ( diff --git a/cylc/flow/prerequisite.py b/cylc/flow/prerequisite.py index bfe5a50fc46..558af85c1a8 100644 --- a/cylc/flow/prerequisite.py +++ b/cylc/flow/prerequisite.py @@ -57,6 +57,7 @@ class Prerequisite: MESSAGE_TEMPLATE = r'%s/%s %s' DEP_STATE_SATISFIED = 'satisfied naturally' + DEP_STATE_ARTIFICIAL = 'Artificially satisfied' DEP_STATE_OVERRIDDEN = 'force satisfied' DEP_STATE_UNSATISFIED = False @@ -198,20 +199,24 @@ def _conditional_is_satisfied(self): '"%s":\n%s' % (self.get_raw_conditional_expression(), err_msg)) return res - def satisfy_me(self, outputs: Iterable['Tokens']) -> 'Set[Tokens]': + def satisfy_me(self, outputs: Iterable['Tokens'], mode) -> 'Set[Tokens]': """Attempt to satisfy me with given outputs. Updates cache with the result. Return outputs that match. """ + if mode != 'live': + satisfied_message = self.DEP_STATE_ARTIFICIAL + f' by {mode} mode' + else: + satisfied_message = self.DEP_STATE_SATISFIED valid = set() for output in outputs: prereq = (output['cycle'], output['task'], output['task_sel']) if prereq not in self.satisfied: continue valid.add(output) - self.satisfied[prereq] = self.DEP_STATE_SATISFIED + self.satisfied[prereq] = satisfied_message if self.conditional_expression is None: self._all_satisfied = all(self.satisfied.values()) else: @@ -292,6 +297,11 @@ def get_resolved_dependencies(self): E.G: ['1/foo', '2/bar'] """ - return [f'{point}/{name}' for - (point, name, _), satisfied in self.satisfied.items() if - satisfied == self.DEP_STATE_SATISFIED] + return [ + f'{point}/{name}' for + (point, name, _), satisfied in self.satisfied.items() + if ( + satisfied == self.DEP_STATE_SATISFIED + or satisfied.startswith(self.DEP_STATE_ARTIFICIAL) + ) + ] diff --git a/cylc/flow/run_modes/skip.py b/cylc/flow/run_modes/skip.py index b336e05e578..3c5a985c033 100644 --- a/cylc/flow/run_modes/skip.py +++ b/cylc/flow/run_modes/skip.py @@ -102,12 +102,13 @@ def process_outputs(itask: 'TaskProxy') -> List[str]: # Send the rest of our outputs, unless they are succeed or failed, # which we hold back, to prevent warnings about pre-requisites being # unmet being shown because a "finished" output happens to come first. - for output, message in itask.state.outputs._required.items(): - # Send message unless it be succeeded/failed. - if output in [TASK_OUTPUT_SUCCEEDED, TASK_OUTPUT_FAILED]: - continue - if not conf_outputs or output in conf_outputs: - result.append(message) + if hasattr(itask.state.outputs, '_required'): + for output, message in itask.state.outputs._required.items(): + # Send message unless it be succeeded/failed. + if output in [TASK_OUTPUT_SUCCEEDED, TASK_OUTPUT_FAILED]: + continue + if not conf_outputs or output in conf_outputs: + result.append(message) # Send succeeded/failed last. if TASK_OUTPUT_FAILED in conf_outputs: diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 5bde0e9676a..61b280ccf2a 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -1500,7 +1500,7 @@ def release_queued_tasks(self) -> bool: pre_prep_tasks, self.server.curve_auth, self.server.client_pub_key_dir, - run_mode=self.config.run_mode() + run_mode=self.get_run_mode() ): if itask.flow_nums: flow = ','.join(str(i) for i in itask.flow_nums) @@ -1745,7 +1745,8 @@ async def _main_loop(self) -> None: if self.xtrigger_mgr.do_housekeeping: self.xtrigger_mgr.housekeep(self.pool.get_tasks()) - + self.pool.clock_expire_tasks() + self.release_queued_tasks() if sim_time_check( self.task_events_mgr, self.pool.get_tasks(), @@ -1753,8 +1754,6 @@ async def _main_loop(self) -> None: ): # A simulated task state change occurred. self.reset_inactivity_timer() - self.pool.clock_expire_tasks() - self.release_queued_tasks() self.broadcast_mgr.expire_broadcast(self.pool.get_min_point()) self.late_tasks_check() diff --git a/cylc/flow/scheduler_cli.py b/cylc/flow/scheduler_cli.py index 5ed69284d1a..f1a0ebbbcf4 100644 --- a/cylc/flow/scheduler_cli.py +++ b/cylc/flow/scheduler_cli.py @@ -132,9 +132,9 @@ help=( f"Run mode: {RunMode.WORKFLOW_MODES} (default live)." " Live mode executes the tasks as defined in the runtime section." - " Simulation, Skip and Dummy partially or wholly ignore" - " the task defined in runtime configuration. Simulation and" - " dummy are designed for testing and Skip for flow control." + " Simulation, skip and dummy modes ignore part of tasks'" + " runtime configurations. Simulation and dummy modes are" + " designed for testing, and skip mode is for flow control." ), metavar="STRING", action='store', dest="run_mode", choices=list(RunMode.WORKFLOW_MODES), diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index e40b4047f9d..b32fbbf538d 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -1398,7 +1398,10 @@ def spawn_on_output(self, itask, output, forced=False): else: tasks = [c_task] for t in tasks: - t.satisfy_me([itask.tokens.duplicate(task_sel=output)]) + t.satisfy_me( + [itask.tokens.duplicate(task_sel=output)], + getattr(itask.tdef, 'run_mode', 'live') + ) self.data_store_mgr.delta_task_prerequisite(t) self.add_to_pool(t) @@ -1521,7 +1524,8 @@ def spawn_on_all_outputs( continue if completed_only: c_task.satisfy_me( - [itask.tokens.duplicate(task_sel=message)] + [itask.tokens.duplicate(task_sel=message)], + itask.tdef.run_mode ) self.data_store_mgr.delta_task_prerequisite(c_task) self.add_to_pool(c_task) diff --git a/cylc/flow/task_proxy.py b/cylc/flow/task_proxy.py index f8662d08442..da1ef196342 100644 --- a/cylc/flow/task_proxy.py +++ b/cylc/flow/task_proxy.py @@ -544,7 +544,7 @@ def state_reset( return False def satisfy_me( - self, task_messages: 'List[Tokens]' + self, task_messages: 'List[Tokens]', mode='live' ) -> 'Set[Tokens]': """Try to satisfy my prerequisites with given output messages. @@ -554,7 +554,7 @@ def satisfy_me( Return a set of unmatched task messages. """ - used = self.state.satisfy_me(task_messages) + used = self.state.satisfy_me(task_messages, mode) return set(task_messages) - used def clock_expire(self) -> bool: diff --git a/cylc/flow/task_state.py b/cylc/flow/task_state.py index 0fd09a6640f..8c106714d69 100644 --- a/cylc/flow/task_state.py +++ b/cylc/flow/task_state.py @@ -175,7 +175,7 @@ class RunMode: MODES = {LIVE, SIMULATION, DUMMY, SKIP, WORKFLOW} - WORKFLOW_MODES = sorted(MODES - {WORKFLOW}) + WORKFLOW_MODES = [LIVE, DUMMY, SIMULATION, SKIP] """Workflow mode not sensible mode for workflow. n.b. converted to a list to ensure ordering doesn't change in @@ -214,16 +214,14 @@ def disable_task_event_handlers(itask): if we don't deliberately enable them: """ mode = itask.tdef.run_mode - if ( + return ( mode == RunMode.SIMULATION or ( mode == RunMode.SKIP and itask.tdef.rtconfig['skip'][ 'disable task event handlers'] is True ) - ): - return True - return False + ) def status_leq(status_a, status_b): @@ -384,7 +382,8 @@ def __call__( def satisfy_me( self, - outputs: Iterable['Tokens'] + outputs: Iterable['Tokens'], + mode, ) -> Set['Tokens']: """Try to satisfy my prerequisites with given outputs. @@ -392,7 +391,7 @@ def satisfy_me( """ valid: Set[Tokens] = set() for prereq in (*self.prerequisites, *self.suicide_prerequisites): - yep = prereq.satisfy_me(outputs) + yep = prereq.satisfy_me(outputs, mode) if yep: valid = valid.union(yep) continue