diff --git a/changes.d/7026.fix.md b/changes.d/7026.fix.md new file mode 100644 index 0000000000..054cf7dd81 --- /dev/null +++ b/changes.d/7026.fix.md @@ -0,0 +1 @@ +Log (re)commencement of xtrigger calls, for old xtriggers. diff --git a/cylc/flow/xtrigger_mgr.py b/cylc/flow/xtrigger_mgr.py index 101a8ee76c..b3cffad9ab 100644 --- a/cylc/flow/xtrigger_mgr.py +++ b/cylc/flow/xtrigger_mgr.py @@ -711,6 +711,7 @@ def call_xtriggers_async(self, itask: 'TaskProxy'): # General case: potentially slow asynchronous function call. if sig in self.sat_xtrig: # Already satisfied, just update the task + LOG.info(f"[{itask}] satisfying xtrigger prerequisite: {sig}") if not itask.state.xtriggers[label]: itask.state.xtriggers[label] = True res = {} @@ -760,7 +761,10 @@ def housekeep(self, itasks): itask, sigs_only=True, unsat_only=True) for sig in list(self.sat_xtrig): if sig not in all_xtrig: + LOG.debug(f"Housekeeping xtrigger result: {sig}") del self.sat_xtrig[sig] + with suppress(KeyError): + del self.t_next_call[sig] self.do_housekeeping = False def all_task_seq_xtriggers_satisfied(self, itask: 'TaskProxy') -> bool: diff --git a/tests/integration/test_xtrigger_mgr.py b/tests/integration/test_xtrigger_mgr.py index 8c33d821e7..59deb5b2c1 100644 --- a/tests/integration/test_xtrigger_mgr.py +++ b/tests/integration/test_xtrigger_mgr.py @@ -27,6 +27,7 @@ from cylc.flow.pathutil import get_workflow_run_dir from cylc.flow.scheduler import Scheduler from cylc.flow.subprocctx import SubFuncContext +from cylc.flow.task_outputs import TASK_OUTPUT_SUCCEEDED async def test_2_xtriggers(flow, start, scheduler, monkeypatch): @@ -95,10 +96,12 @@ async def test_2_xtriggers(flow, start, scheduler, monkeypatch): async def test_1_xtrigger_2_tasks(flow, start, scheduler, mocker): """ - If multiple tasks depend on the same satisfied xtrigger, the DB mgr method - put_xtriggers should only be called once - when the xtrigger gets satisfied + If two tasks depend on the same satisfied xtrigger, put_xtriggers should + only be called once - when the xtrigger gets satisfied. + - https://github.com/cylc/cylc-flow/pull/5908 - See [GitHub #5908](https://github.com/cylc/cylc-flow/pull/5908) + So long as both tasks are active at once: + - https://github.com/cylc/cylc-flow/issues/7027 """ id_ = flow({ @@ -120,8 +123,8 @@ async def test_1_xtrigger_2_tasks(flow, start, scheduler, mocker): # (For clock triggers this is synchronous) schd.xtrigger_mgr.call_xtriggers_async(task) - # It should now be satisfied. - assert task.state.xtriggers == {'wall_clock': True} + # It should now be satisfied. + assert task.state.xtriggers == {'wall_clock': True} # Check one put_xtriggers call only, not two. assert spy.call_count == 1 @@ -133,6 +136,148 @@ async def test_1_xtrigger_2_tasks(flow, start, scheduler, mocker): # loop doesn't run in this test. +async def test_1_xtrigger_2_tasks_async( + flow, start, scheduler, mocker, caplog +): + """ + Like test_1_xtrigger_2_tasks but for async (not clock) xtriggers. + + If two tasks depend on the same satisfied xtrigger, put_xtriggers should + only be called once - when the xtrigger gets satisfied (so long as both + are active at once - https://github.com/cylc/cylc-flow/issues/7027 + + """ + id_ = flow({ + 'scheduling': { + 'cycling mode': 'integer', + 'xtriggers': { + 'echo': 'echo("whatever", succeed=False)', + }, + 'graph': { + 'R1': ''' + @echo => foo & bar + ''' + }, + }, + }) + + schd = scheduler(id_) + spy = mocker.spy(schd.workflow_db_mgr, 'put_xtriggers') + + async with start(schd): + + foo = schd.pool._get_task_by_id('1/foo') + bar = schd.pool._get_task_by_id('1/bar') + + schd.xtrigger_mgr.call_xtriggers_async(foo) + assert "Commencing xtrigger" in caplog.text + caplog.clear() + + schd.xtrigger_mgr.call_xtriggers_async(bar) + assert "Commencing xtrigger" not in caplog.text + caplog.clear() + + satisfy_xtrigger_functions(schd) # mock results + assert "xtrigger succeeded" in caplog.text + caplog.clear() + + schd.xtrigger_mgr.call_xtriggers_async(foo) # process callbacks + assert "satisfying xtrigger" in caplog.text + caplog.clear() + + schd.xtrigger_mgr.call_xtriggers_async(bar) # process callbacks + assert "satisfying xtrigger" in caplog.text + caplog.clear() + + # It should now be satisfied. + assert foo.state.xtriggers == {'echo': True} + assert bar.state.xtriggers == {'echo': True} + + # Check put_xtriggers called once, not twice. + assert spy.call_count == 1 + + +async def test_1_xtrigger_2_tasks_later( + flow, start, scheduler, mocker, caplog +): + """ + If two tasks depend on the same satisfied xtrigger, but are not + both active at the same time, the xtrigger will need to be + satisfied twice - https://github.com/cylc/cylc-flow/issues/7027 + + """ + id_ = flow({ + 'scheduling': { + 'cycling mode': 'integer', + 'xtriggers': { + 'echo': 'echo("whatever", succeed=False)', + }, + 'graph': { + 'R1': ''' + @echo => foo & bar + # spawn bar after @echo is housekept post satisfying foo + foo => bar + ''' + }, + }, + }) + + schd = scheduler(id_) + spy = mocker.spy(schd.workflow_db_mgr, 'put_xtriggers') + + async with start(schd): + + # Get foo at startup + foo = schd.pool._get_task_by_id('1/foo') + + # call the xtrigger + schd.xtrigger_mgr.call_xtriggers_async(foo) + assert "Commencing xtrigger" in caplog.text + caplog.clear() + + # satisfy it + satisfy_xtrigger_functions(schd) # mock results + assert "xtrigger succeeded" in caplog.text + caplog.clear() + + # process callback + schd.xtrigger_mgr.call_xtriggers_async(foo) + assert "satisfying xtrigger" in caplog.text + caplog.clear() + + # foo should now be satisfied. + assert foo.state.xtriggers == {'echo': True} + + # this will delete the xtrigger - nothing else depends on it + schd.xtrigger_mgr.housekeep([foo]) + + # Spawn bar and remove foo + schd.pool.spawn_on_output(foo, TASK_OUTPUT_SUCCEEDED) + + bar = schd.pool._get_task_by_id('1/bar') + + # the xtrigger should be called again for bar + schd.xtrigger_mgr.call_xtriggers_async(bar) + assert "Commencing xtrigger" in caplog.text + caplog.clear() + + # satisfy it + satisfy_xtrigger_functions(schd) # mock results + assert "xtrigger succeeded" in caplog.text + caplog.clear() + + # process callback + schd.xtrigger_mgr.call_xtriggers_async(bar) + assert "satisfying xtrigger" in caplog.text + caplog.clear() + + # bar should now be satisfied. + assert bar.state.xtriggers == {'echo': True} + + # Check put_xtriggers called twice, not once. + assert spy.call_count == 2 + + async def test_xtriggers_restart(flow, start, scheduler, db_select): """It should write satisfied xtriggers to the DB and load on restart.