diff --git a/cylc/flow/flow_mgr.py b/cylc/flow/flow_mgr.py index 67f816982ec..64ca069dd22 100644 --- a/cylc/flow/flow_mgr.py +++ b/cylc/flow/flow_mgr.py @@ -160,7 +160,7 @@ def get_flow_num( if flow_num in self.flows: if meta is not None: - LOG.warning( + LOG.debug( f'Ignoring flow metadata "{meta}":' f' {flow_num} is not a new flow' ) diff --git a/cylc/flow/prerequisite.py b/cylc/flow/prerequisite.py index 8b5c2cd7941..a6aeebcf7f2 100644 --- a/cylc/flow/prerequisite.py +++ b/cylc/flow/prerequisite.py @@ -123,6 +123,13 @@ def __init__(self, point: 'PointBase'): # * `False` (prerequisite unsatisfied). self._cached_satisfied: Optional[bool] = None + def dump(self): + print(f"POINT {self.point}") + for k, v in self._satisfied.items(): + print(f"_SAT: {k}, {v}") + print(f"COND: {self.conditional_expression}") + print(f"ALL SAT: {self._all_satisfied}") + def instantaneous_hash(self) -> int: """Generate a hash of this prerequisite in its current state. diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index dec3a279ce9..a770e4d6c47 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -1546,10 +1546,14 @@ def start_job_submission(self, itasks: 'Iterable[TaskProxy]') -> bool: flow = ','.join(str(i) for i in itask.flow_nums) else: flow = FLOW_NONE - log( - f"{itask.identity} -triggered off " - f"{itask.state.get_resolved_dependencies()} in flow {flow}" - ) + if itask.is_manual_submit: + off = f"[] in flow {flow}" + else: + off = ( + f"{itask.state.get_resolved_dependencies()}" + f" in flow {flow}" + ) + log(f"{itask.identity} -triggered off {off}") # one or more tasks were passed through the submission pipeline return True diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py index 7e1cfa02cc4..8719856a7ab 100644 --- a/cylc/flow/task_job_mgr.py +++ b/cylc/flow/task_job_mgr.py @@ -469,7 +469,7 @@ def submit_livelike_task_jobs( 'platform_name': itask.platform['name'], 'job_runner_name': itask.summary['job_runner_name'], }) - + # reset the is_manual_submit flag in case of retries itask.is_manual_submit = False if ri_map[install_target] == REMOTE_FILE_INSTALL_255: diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 4732f43d587..f4d744b38fc 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -1345,10 +1345,12 @@ def hold_tasks(self, items: Iterable[str]) -> int: ) for itask in itasks: self.hold_active_task(itask) + # Set inactive tasks to be held: for tdef, cycle in inactive_tasks: self.data_store_mgr.delta_task_held(tdef.name, cycle, True) self.tasks_to_hold.add((tdef.name, cycle)) + self.workflow_db_mgr.put_tasks_to_hold(self.tasks_to_hold) LOG.debug(f"Tasks to hold: {self.tasks_to_hold}") return len(unmatched) @@ -1717,6 +1719,7 @@ def spawn_task( point: 'PointBase', flow_nums: Set[int], flow_wait: bool = False, + force: bool = False ) -> Optional[TaskProxy]: """Return a new task proxy for the given flow if possible. @@ -1751,7 +1754,8 @@ def spawn_task( return None if ( - prev_status is not None + not force + and prev_status is not None and not itask.state.outputs.get_completed_outputs() ): # If itask has any history in this flow but no completed outputs @@ -1762,7 +1766,7 @@ def spawn_task( LOG.debug(f"Not respawning {point}/{name} - task was removed") return None - if prev_status in TASK_STATUSES_FINAL: + if not force and prev_status in TASK_STATUSES_FINAL: # Task finished previously. msg = f"[{point}/{name}:{prev_status}] already finished" if itask.is_complete(): @@ -1921,7 +1925,8 @@ def set_prereqs_and_outputs( prereqs: List[str], flow: List[str], flow_wait: bool = False, - flow_descr: Optional[str] = None + flow_descr: Optional[str] = None, + flow_nums: Optional[Set[int]] = None, ): """Set prerequisites or outputs of target tasks. @@ -1955,11 +1960,22 @@ def set_prereqs_and_outputs( items: task ID match patterns prereqs: prerequisites to set outputs: outputs to set - flow: flow numbers for spawned or merged tasks + flow: raw input flow numbers for spawned or merged tasks + flow_nums: if actual flow numbers have already been computed flow_wait: wait for flows to catch up before continuing flow_descr: description of new flow """ + if not flow_nums: + if not flow: + # default: assign to all active flows + flow_nums = self._get_active_flow_nums() + else: + flow_nums = self._get_flow_nums(flow, flow_descr) + + if flow_nums is None: + return + # Get matching pool tasks and inactive task definitions. itasks, inactive_tasks, unmatched = self.filter_task_proxies( items, @@ -1967,17 +1983,19 @@ def set_prereqs_and_outputs( warn_no_active=False, ) - flow_nums = self._get_flow_nums(flow, flow_descr) - # Set existing task proxies. for itask in itasks: + # TODO can flow be 'none' now? if flow == ['none'] and itask.flow_nums != set(): LOG.error( f"[{itask}] ignoring 'flow=none' set: task already has" f" {repr_flow_nums(itask.flow_nums, full=True)}" ) continue - self.merge_flows(itask, flow_nums) + if flow: + self.merge_flows(itask, flow_nums) + # else keep existing flows + if prereqs: self._set_prereqs_itask(itask, prereqs, flow_nums) else: @@ -1987,9 +2005,6 @@ def set_prereqs_and_outputs( self._set_outputs_itask(itask, outputs) # Spawn and set inactive tasks. - if not flow: - # default: assign to all active flows - flow_nums = self._get_active_flow_nums() for tdef, point in inactive_tasks: if prereqs: self._set_prereqs_tdef( @@ -2002,8 +2017,9 @@ def set_prereqs_and_outputs( if trans is not None: self._set_outputs_itask(trans, outputs) - if self.compute_runahead(): - self.release_runahead_tasks() + # for "cylc play --start-tasks" compute runahead after spawning + self.compute_runahead() + self.release_runahead_tasks() def _set_outputs_itask( self, @@ -2076,26 +2092,39 @@ def _set_prereqs_itask( # No prereqs matched. return False if ( - self.runahead_limit_point is not None + itask.state.is_runahead + and self.runahead_limit_point is not None and itask.point <= self.runahead_limit_point ): - self.rh_release_and_queue(itask) - self.data_store_mgr.delta_task_prerequisite(itask) + # Release from runahead, and queue it. + # TODO? self.rh_release_and_queue(itask) + self.spawn_to_rh_limit( + itask.tdef, + itask.tdef.next_point(itask.point), + itask.flow_nums + ) + return True def _set_prereqs_tdef( self, point, taskdef, prereqs, flow_nums, flow_wait - ): + ) -> Optional[TaskProxy]: """Spawn an inactive task and set prerequisites on it.""" itask = self.spawn_task( - taskdef.name, point, flow_nums, flow_wait=flow_wait + taskdef.name, point, flow_nums, flow_wait=flow_wait, force=True ) if itask is None: - return + return None + + self.db_add_new_flow_rows(itask) + + # TODO check flow-wait (see master force_trigger_tasks) if self._set_prereqs_itask(itask, prereqs, flow_nums): self.add_to_pool(itask) + return itask + def _get_active_flow_nums(self) -> 'FlowNums': """Return all active flow numbers. @@ -2125,7 +2154,8 @@ def _get_flow_nums( active tasks. """ - if flow == [FLOW_NONE]: + # TODO check is None possible or should be prevented getting here? + if flow is None or flow == [FLOW_NONE]: return set() if flow == [FLOW_ALL]: return self._get_active_flow_nums() @@ -2202,6 +2232,14 @@ def _force_trigger(self, itask: 'TaskProxy', on_resume: bool = False): # if so check/spawn if xtrigger sequential. self.check_spawn_psx_task(itask) + def _force_trigger_if_ready(self, itask, on_resume): + if not itask.prereqs_are_satisfied(): + return + itask.is_manual_submit = True + itask.reset_try_timers() + self.data_store_mgr.delta_task_prerequisite(itask) + self._force_trigger(itask, on_resume) + def force_trigger_tasks( self, items: Iterable[str], @@ -2210,10 +2248,16 @@ def force_trigger_tasks( flow_descr: Optional[str] = None, on_resume: bool = False ): - """Manually trigger tasks. + """Manually trigger a selected group of tasks. + + Satisfy any off-group prerequisites, in the group. Tasks with only + off-group prerequisites will run immediately. Other prerequisites + will be respected within the group. + + # TODO: check the following (presumably there are tests): If a task did not run before in the flow: - - trigger it, and spawn on outputs unless flow-wait is set. + - run it, and spawn on outputs unless flow-wait is set. (but load the previous outputs from the DB) If a task ran before in the flow: @@ -2229,6 +2273,10 @@ def force_trigger_tasks( existing_tasks, inactive, unmatched = self.filter_task_proxies( items, inactive=True, warn_no_active=False, ) + all_ids = ( + list(inactive) + + [(itask.tdef.name, itask.point) for itask in existing_tasks] + ) flow_nums = self._get_flow_nums(flow, flow_descr) @@ -2243,8 +2291,27 @@ def force_trigger_tasks( if itask.state(TASK_STATUS_PREPARING, *TASK_STATUSES_ACTIVE): LOG.error(f"[{itask}] ignoring trigger - already active") continue + for pre in itask.state.prerequisites: + # satisfy off-group prerequisites + for ( + p_point, p_name, p_out + ), p_state in pre._satisfied.items(): + if ( + not p_state and + (p_name, get_point(p_point)) not in all_ids + ): + # off-group + itask.satisfy_me( + [ + Tokens( + cycle=p_point, + task=p_name, + task_sel=p_out + ) + ] + ) self.merge_flows(itask, flow_nums) - self._force_trigger(itask, on_resume) + self._force_trigger_if_ready(itask, on_resume) # Spawn and trigger inactive tasks. if not flow: @@ -2252,34 +2319,24 @@ def force_trigger_tasks( flow_nums = self._get_active_flow_nums() for tdef, point in inactive: - if not self.can_be_spawned(tdef.name, point): - continue - submit_num, _, prev_fwait = ( - self._get_task_history(tdef.name, point, flow_nums) - ) - itask = TaskProxy( - self.tokens, - tdef, - point, - flow_nums, - flow_wait=flow_wait, - submit_num=submit_num, - sequential_xtrigger_labels=( - self.xtrigger_mgr.xtriggers.sequential_xtrigger_labels - ), - ) - if itask is None: - continue - - self.db_add_new_flow_rows(itask) - - if prev_fwait: - # update completed outputs from the DB - self._load_historical_outputs(itask) - - # run it (or run it again for incomplete flow-wait) - self.add_to_pool(itask) - self._force_trigger(itask, on_resume) + if tdef.is_parentless(point): + # parentless: set pre=all to spawn into task pool + jtask = self._set_prereqs_tdef( + point, tdef, ["all"], flow_nums, flow_wait + ) + else: + # set off-group prerequisites + off_flow_prereqs = [] + for pid in tdef.get_triggers(point): + p_point = pid.get_point(point) + p_name = pid.task_name + if (p_name, get_point(p_point)) not in all_ids: + off_flow_prereqs.append(f"{p_point}/{p_name}") + jtask = self._set_prereqs_tdef( + point, tdef, off_flow_prereqs, flow_nums, flow_wait + ) + if jtask is not None: + self._force_trigger_if_ready(jtask, on_resume) def spawn_parentless_sequential_xtriggers(self): """Spawn successor(s) of parentless wall clock satisfied tasks.""" @@ -2457,18 +2514,6 @@ def match_inactive_tasks( continue point_str = cast('str', tokens['cycle']) - name_str = cast('str', tokens['task']) - if name_str not in self.config.taskdefs: - if self.config.find_taskdefs(name_str): - # It's a family name; was not matched by active tasks - LOG.warning( - f"No active tasks in the family {name_str}" - f' matching: {id_}' - ) - else: - LOG.warning(self.ERR_TMPL_NO_TASKID_MATCH.format(name_str)) - unmatched_tasks.append(id_) - continue try: point_str = standardise_point_string(point_str) except PointParsingError as exc: @@ -2476,18 +2521,32 @@ def match_inactive_tasks( f"{id_} - invalid cycle point: {point_str} ({exc})") unmatched_tasks.append(id_) continue - point = get_point(point_str) - taskdef = self.config.taskdefs[name_str] - if taskdef.is_valid_point(point): - matched_tasks.add((taskdef, point)) - else: - LOG.warning( - self.ERR_PREFIX_TASK_NOT_ON_SEQUENCE.format( - taskdef.name, point - ) - ) + + name_str = cast('str', tokens['task']) + + members = self.config.find_taskdefs(name_str) + if not members: + LOG.warning(self.ERR_TMPL_NO_TASKID_MATCH.format(name_str)) unmatched_tasks.append(id_) continue + + point = get_point(point_str) + for name in [m.name for m in members]: + try: + taskdef = self.config.taskdefs[name] + except KeyError: + # family name + continue + if taskdef.is_valid_point(point): + matched_tasks.add((taskdef, point)) + else: + LOG.warning( + self.ERR_PREFIX_TASK_NOT_ON_SEQUENCE.format( + taskdef.name, point + ) + ) + unmatched_tasks.append(id_) + continue return matched_tasks, unmatched_tasks def match_taskdefs( diff --git a/cylc/flow/task_proxy.py b/cylc/flow/task_proxy.py index 8d9134a6ac4..caecbff6706 100644 --- a/cylc/flow/task_proxy.py +++ b/cylc/flow/task_proxy.py @@ -334,6 +334,15 @@ def __str__(self) -> str: f"{id_}{repr_flow_nums(self.flow_nums)}:{self.state}" ) + def __eq__(self, other) -> bool: + """Task proxy equality is based point/name only.""" + # (Needed e.g. for adding to task_pool.tasks_to_trigger sets) + return self.identity == other.identity + + def __hash__(self): + """Task proxy equality is based point/name only.""" + return hash(self.identity) + def copy_to_reload_successor( self, reload_successor: 'TaskProxy', diff --git a/cylc/flow/taskdef.py b/cylc/flow/taskdef.py index 49784822127..d88e55096a9 100644 --- a/cylc/flow/taskdef.py +++ b/cylc/flow/taskdef.py @@ -325,6 +325,22 @@ def get_parent_points(self, point): parent_points.add(trig.get_parent_point(point)) return parent_points + def get_triggers(self, point): + """Return my triggers, at point.""" + triggers = set() + for seq in self.sequences: + if not seq.is_valid(point): + continue + if seq in self.dependencies: + # task has prereqs in this sequence + for dep in self.dependencies[seq]: + # TODO? + if dep.suicide: + continue + for trig in dep.task_triggers: + triggers.add(trig) + return triggers + def has_only_abs_triggers(self, point): """Return whether I have only absolute triggers at point.""" if not self.has_abs_triggers: diff --git a/tests/functional/cylc-show/06-past-present-future.t b/tests/functional/cylc-show/06-past-present-future.t index a67636bc613..73a749f523b 100644 --- a/tests/functional/cylc-show/06-past-present-future.t +++ b/tests/functional/cylc-show/06-past-present-future.t @@ -42,11 +42,12 @@ state: succeeded prerequisites: (n/a for past tasks) __END__ +# Note trigger command satisfies off-flow prerequisites. TEST_NAME="${TEST_NAME_BASE}-show.present" contains_ok "${WORKFLOW_RUN_DIR}/show-c.txt" <<__END__ state: running prerequisites: ('⨯': not satisfied) - ⨯ 1/b succeeded + ✓ 1/b succeeded __END__ TEST_NAME="${TEST_NAME_BASE}-show.future" diff --git a/tests/functional/cylc-trigger/01-queued/reference.log b/tests/functional/cylc-trigger/01-queued/reference.log index 1a9f846c98d..499c919d40d 100644 --- a/tests/functional/cylc-trigger/01-queued/reference.log +++ b/tests/functional/cylc-trigger/01-queued/reference.log @@ -1,4 +1,4 @@ Initial point: 1 Final point: 1 1/foo -triggered off [] -1/bar -triggered off ['1/foo'] +1/bar -triggered off [] diff --git a/tests/functional/spawn-on-demand/02-merge/reference.log b/tests/functional/spawn-on-demand/02-merge/reference.log index e150aa34f34..162bcb6ea75 100644 --- a/tests/functional/spawn-on-demand/02-merge/reference.log +++ b/tests/functional/spawn-on-demand/02-merge/reference.log @@ -1,6 +1,6 @@ Initial point: 1 Final point: 3 -1/foo -triggered off ['0/foo'] +1/foo -triggered off [] 2/foo -triggered off ['1/foo'] 1/bar -triggered off ['1/foo'] 3/foo -triggered off ['2/foo'] diff --git a/tests/functional/spawn-on-demand/10-retrigger/reference.log b/tests/functional/spawn-on-demand/10-retrigger/reference.log index 7c9a0a19599..ad8394c752e 100644 --- a/tests/functional/spawn-on-demand/10-retrigger/reference.log +++ b/tests/functional/spawn-on-demand/10-retrigger/reference.log @@ -3,5 +3,5 @@ Final point: 1 1/foo -triggered off [] 1/oops -triggered off ['1/foo'] 1/triggerer -triggered off ['1/foo'] -1/oops -triggered off ['1/foo'] +1/oops -triggered off [] 1/bar -triggered off ['1/oops'] diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py index ffb255c0cf5..82df00e0754 100644 --- a/tests/integration/test_task_pool.py +++ b/tests/integration/test_task_pool.py @@ -348,9 +348,9 @@ async def test_match_taskdefs( id="Name globs hold active tasks only" # (active means n=0 here) ), param( - ['1/FAM', '2/FAM', '6/FAM'], ['1/bar', '2/bar'], - ["No active tasks in the family FAM matching: 6/FAM"], - id="Family names hold active tasks only" + ['1/FAM', '2/FAM', '6/FAM'], ['1/bar', '2/bar', '6/bar'], + [], + id="Family names hold active and future tasks" ), param( ['1/grogu', 'H/foo', '20/foo', '1/pub'], [], @@ -500,7 +500,7 @@ async def test_trigger_states( ): """It should only trigger tasks in compatible states.""" - async with start(one): + async with start(one): #, level=logging.DEBUG): task = one.pool.filter_task_proxies(['1/one'])[0][0] # reset task a to the provided state diff --git a/tests/unit/test_flow_mgr.py b/tests/unit/test_flow_mgr.py index c9171b02073..33a623de173 100644 --- a/tests/unit/test_flow_mgr.py +++ b/tests/unit/test_flow_mgr.py @@ -48,7 +48,7 @@ def test_all( db_mgr = WorkflowDatabaseManager() flow_mgr = FlowMgr(db_mgr) - caplog.set_level(logging.INFO, CYLC_LOG) + caplog.set_level(logging.DEBUG, CYLC_LOG) meta = "the quick brown fox" assert flow_mgr.get_flow_num(None, meta) == 1