Skip to content

Commit

Permalink
response to review 1
Browse files Browse the repository at this point in the history
  • Loading branch information
wxtim committed May 8, 2024
1 parent 19faa47 commit c37ef72
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 30 deletions.
1 change: 0 additions & 1 deletion cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@
)
from cylc.flow.print_tree import print_tree
from cylc.flow.task_qualifiers import ALT_QUALIFIERS
from cylc.flow.simulation import configure_sim_modes
from cylc.flow.run_modes.nonlive import mode_validate_checks
from cylc.flow.subprocctx import SubFuncContext
from cylc.flow.task_events_mgr import (
Expand Down
20 changes: 15 additions & 5 deletions cylc/flow/prerequisite.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class Prerequisite:
MESSAGE_TEMPLATE = r'%s/%s %s'

DEP_STATE_SATISFIED = 'satisfied naturally'
DEP_STATE_ARTIFICIAL = 'Artificially satisfied'
DEP_STATE_OVERRIDDEN = 'force satisfied'
DEP_STATE_UNSATISFIED = False

Expand Down Expand Up @@ -198,20 +199,24 @@ def _conditional_is_satisfied(self):
'"%s":\n%s' % (self.get_raw_conditional_expression(), err_msg))
return res

def satisfy_me(self, outputs: Iterable['Tokens']) -> 'Set[Tokens]':
def satisfy_me(self, outputs: Iterable['Tokens'], mode) -> 'Set[Tokens]':
"""Attempt to satisfy me with given outputs.
Updates cache with the result.
Return outputs that match.
"""
if mode != 'live':
satisfied_message = self.DEP_STATE_ARTIFICIAL + f' by {mode} mode'
else:
satisfied_message = self.DEP_STATE_SATISFIED
valid = set()
for output in outputs:
prereq = (output['cycle'], output['task'], output['task_sel'])
if prereq not in self.satisfied:
continue
valid.add(output)
self.satisfied[prereq] = self.DEP_STATE_SATISFIED
self.satisfied[prereq] = satisfied_message
if self.conditional_expression is None:
self._all_satisfied = all(self.satisfied.values())
else:
Expand Down Expand Up @@ -292,6 +297,11 @@ def get_resolved_dependencies(self):
E.G: ['1/foo', '2/bar']
"""
return [f'{point}/{name}' for
(point, name, _), satisfied in self.satisfied.items() if
satisfied == self.DEP_STATE_SATISFIED]
return [
f'{point}/{name}' for
(point, name, _), satisfied in self.satisfied.items()
if (
satisfied == self.DEP_STATE_SATISFIED
or satisfied.startswith(self.DEP_STATE_ARTIFICIAL)
)
]
13 changes: 7 additions & 6 deletions cylc/flow/run_modes/skip.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,13 @@ def process_outputs(itask: 'TaskProxy') -> List[str]:
# Send the rest of our outputs, unless they are succeed or failed,
# which we hold back, to prevent warnings about pre-requisites being
# unmet being shown because a "finished" output happens to come first.
for output, message in itask.state.outputs._required.items():
# Send message unless it be succeeded/failed.
if output in [TASK_OUTPUT_SUCCEEDED, TASK_OUTPUT_FAILED]:
continue
if not conf_outputs or output in conf_outputs:
result.append(message)
if hasattr(itask.state.outputs, '_required'):
for output, message in itask.state.outputs._required.items():
# Send message unless it be succeeded/failed.
if output in [TASK_OUTPUT_SUCCEEDED, TASK_OUTPUT_FAILED]:
continue
if not conf_outputs or output in conf_outputs:
result.append(message)

# Send succeeded/failed last.
if TASK_OUTPUT_FAILED in conf_outputs:
Expand Down
7 changes: 3 additions & 4 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1500,7 +1500,7 @@ def release_queued_tasks(self) -> bool:
pre_prep_tasks,
self.server.curve_auth,
self.server.client_pub_key_dir,
run_mode=self.config.run_mode()
run_mode=self.get_run_mode()
):
if itask.flow_nums:
flow = ','.join(str(i) for i in itask.flow_nums)
Expand Down Expand Up @@ -1745,16 +1745,15 @@ async def _main_loop(self) -> None:

if self.xtrigger_mgr.do_housekeeping:
self.xtrigger_mgr.housekeep(self.pool.get_tasks())

self.pool.clock_expire_tasks()
self.release_queued_tasks()
if sim_time_check(
self.task_events_mgr,
self.pool.get_tasks(),
self.workflow_db_mgr,
):
# A simulated task state change occurred.
self.reset_inactivity_timer()
self.pool.clock_expire_tasks()
self.release_queued_tasks()

self.broadcast_mgr.expire_broadcast(self.pool.get_min_point())
self.late_tasks_check()
Expand Down
6 changes: 3 additions & 3 deletions cylc/flow/scheduler_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,9 @@
help=(
f"Run mode: {RunMode.WORKFLOW_MODES} (default live)."
" Live mode executes the tasks as defined in the runtime section."
" Simulation, Skip and Dummy partially or wholly ignore"
" the task defined in runtime configuration. Simulation and"
" dummy are designed for testing and Skip for flow control."
" Simulation, skip and dummy modes ignore part of tasks'"
" runtime configurations. Simulation and dummy modes are"
" designed for testing, and skip mode is for flow control."
),
metavar="STRING", action='store', dest="run_mode",
choices=list(RunMode.WORKFLOW_MODES),
Expand Down
8 changes: 6 additions & 2 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1398,7 +1398,10 @@ def spawn_on_output(self, itask, output, forced=False):
else:
tasks = [c_task]
for t in tasks:
t.satisfy_me([itask.tokens.duplicate(task_sel=output)])
t.satisfy_me(
[itask.tokens.duplicate(task_sel=output)],
getattr(itask.tdef, 'run_mode', 'live')
)
self.data_store_mgr.delta_task_prerequisite(t)
self.add_to_pool(t)

Expand Down Expand Up @@ -1521,7 +1524,8 @@ def spawn_on_all_outputs(
continue
if completed_only:
c_task.satisfy_me(
[itask.tokens.duplicate(task_sel=message)]
[itask.tokens.duplicate(task_sel=message)],
itask.tdef.run_mode
)
self.data_store_mgr.delta_task_prerequisite(c_task)
self.add_to_pool(c_task)
Expand Down
4 changes: 2 additions & 2 deletions cylc/flow/task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ def state_reset(
return False

def satisfy_me(
self, task_messages: 'List[Tokens]'
self, task_messages: 'List[Tokens]', mode='live'
) -> 'Set[Tokens]':
"""Try to satisfy my prerequisites with given output messages.
Expand All @@ -554,7 +554,7 @@ def satisfy_me(
Return a set of unmatched task messages.
"""
used = self.state.satisfy_me(task_messages)
used = self.state.satisfy_me(task_messages, mode)
return set(task_messages) - used

def clock_expire(self) -> bool:
Expand Down
13 changes: 6 additions & 7 deletions cylc/flow/task_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ class RunMode:

MODES = {LIVE, SIMULATION, DUMMY, SKIP, WORKFLOW}

WORKFLOW_MODES = sorted(MODES - {WORKFLOW})
WORKFLOW_MODES = [LIVE, DUMMY, SIMULATION, SKIP]
"""Workflow mode not sensible mode for workflow.
n.b. converted to a list to ensure ordering doesn't change in
Expand Down Expand Up @@ -214,16 +214,14 @@ def disable_task_event_handlers(itask):
if we don't deliberately enable them:
"""
mode = itask.tdef.run_mode
if (
return (
mode == RunMode.SIMULATION
or (
mode == RunMode.SKIP
and itask.tdef.rtconfig['skip'][
'disable task event handlers'] is True
)
):
return True
return False
)


def status_leq(status_a, status_b):
Expand Down Expand Up @@ -384,15 +382,16 @@ def __call__(

def satisfy_me(
self,
outputs: Iterable['Tokens']
outputs: Iterable['Tokens'],
mode,
) -> Set['Tokens']:
"""Try to satisfy my prerequisites with given outputs.
Return which outputs I actually depend on.
"""
valid: Set[Tokens] = set()
for prereq in (*self.prerequisites, *self.suicide_prerequisites):
yep = prereq.satisfy_me(outputs)
yep = prereq.satisfy_me(outputs, mode)
if yep:
valid = valid.union(yep)
continue
Expand Down

0 comments on commit c37ef72

Please sign in to comment.