Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix.no validation of outputs from graph singletons #6583

Merged
1 change: 1 addition & 0 deletions changes.d/6583.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix bug where undefined outputs were missed by validation if no tasks trigger off of them.
29 changes: 29 additions & 0 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
get_trigger_completion_variable_maps,
trigger_to_completion_variable,
)
from cylc.flow.task_qualifiers import TASK_QUALIFIERS
from cylc.flow.run_modes import RunMode
from cylc.flow.task_trigger import TaskTrigger, Dependency
from cylc.flow.taskdef import TaskDef
Expand Down Expand Up @@ -1844,6 +1845,7 @@ def generate_triggers(self, lexpression, left_nodes, right, seq,

triggers = {}
xtrig_labels = set()

for left in left_nodes:
if left.startswith('@'):
xtrig_labels.add(left[1:])
Expand Down Expand Up @@ -2266,6 +2268,10 @@ def load_graph(self):
parser.workflow_state_polling_tasks)
self._proc_triggers(parser, seq, task_triggers)

# Checking for undefined outputs for terminal tasks. Tasks with
# dependencies are checked in generate_triggers:
self.check_terminal_outputs(parser.terminals)

self.set_required_outputs(task_output_opt)

# Detect use of xtrigger names with '@' prefix (creates a task).
Expand All @@ -2278,6 +2284,29 @@ def load_graph(self):
for tdef in self.taskdefs.values():
tdef.tweak_outputs()

def check_terminal_outputs(self, terminals: Iterable[str]) -> None:
"""Check that task outputs have been registered with tasks.


Where a "terminal output" is an output for a task at the end of a
graph string, such as "end" in `start => middle => end`.

Raises: WorkflowConfigError if a custom output is not defined.
"""
# BACK COMPAT: (On drop 3.7): Can be simplified with walrus :=
# if (b := a[1].strip("?")) not in TASK_QUALIFIERS
terminal_outputs = [
(a[0].strip("!"), a[1].strip("?"))
for a in (t.split(':') for t in terminals if ":" in t)
if (a[1].strip("?")) not in TASK_QUALIFIERS
]

for task, output in terminal_outputs:
if output not in self.cfg['runtime'][task]['outputs']:
raise WorkflowConfigError(
f"Undefined custom output: {task}:{output}"
)

def _proc_triggers(self, parser, seq, task_triggers):
"""Define graph edges, taskdefs, and triggers, from graph sections."""
suicides = 0
Expand Down
35 changes: 25 additions & 10 deletions cylc/flow/graph_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ class GraphParser:
_RE_OFFSET = r'\[[\w\-\+\^:]+\]'
_RE_QUAL = QUALIFIER + r'[\w\-]+' # task or fam trigger
_RE_OPT = r'\??' # optional output indicator
_RE_ANDOR = re.compile(r'\s*[&|]\s*')

REC_QUAL = re.compile(_RE_QUAL)

Expand Down Expand Up @@ -470,10 +471,23 @@ def parse_graph(self, graph_string: str) -> None:
pairs.add((chain[i], chain[i + 1]))

# Get a set of RH nodes which are not at the LH of another pair:
terminals = {p[1] for p in pairs}.difference({p[0] for p in pairs})
# terminals = {p[1] for p in pairs}.difference({p[0] for p in pairs})

check_terminals: Dict[str, str] = {}
lefts: Set[str] = set()
rights: Set[str] = set()

for pair in sorted(pairs, key=lambda p: str(p[0])):
self._proc_dep_pair(pair, terminals)
self._proc_dep_pair(pair, check_terminals, lefts, rights)
self.terminals = rights.difference(lefts)
for right in self.terminals:
left = check_terminals.get(right)
if left:
raise GraphParseError(
'Invalid cycle point offsets only on right hand'
' side of dependency (must be on left hand side):'
f'{left} => {right}'
)

@classmethod
def _report_invalid_lines(cls, lines: List[str]) -> None:
Expand Down Expand Up @@ -504,7 +518,9 @@ def _report_invalid_lines(cls, lines: List[str]) -> None:
def _proc_dep_pair(
self,
pair: Tuple[Optional[str], str],
terminals: Set[str],
check_terminals: Dict[str, str],
_lefts: Set[str],
_rights: Set[str],
) -> None:
"""Process a single dependency pair 'left => right'.

Expand Down Expand Up @@ -540,12 +556,8 @@ def _proc_dep_pair(
raise GraphParseError(mismatch_msg.format(right))

# Raise error for cycle point offsets at the end of chains
if '[' in right and left and (right in terminals):
# This right hand side is at the end of a chain:
raise GraphParseError(
'Invalid cycle point offsets only on right hand '
'side of a dependency (must be on left hand side):'
f' {left} => {right}')
if '[' in right and left:
check_terminals[right] = left

# Split right side on AND.
rights = right.split(self.__class__.OP_AND)
Expand All @@ -566,12 +578,15 @@ def _proc_dep_pair(
raise GraphParseError(
f"Null task name in graph: {left} => {right}")

_rights.update(*([rights] or []))

for left in lefts:
# Extract information about all nodes on the left.

if left:
info = self.__class__.REC_NODES.findall(left)
expr = left
for _left in info:
_lefts.add(''.join(_left))

else:
# There is no left-hand-side task.
Expand Down
3 changes: 2 additions & 1 deletion tests/integration/reftests/test_pre_initial.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ async def test_drop(flow, scheduler, reftest):
}


async def test_over_bracketed(flow, scheduler, reftest):
async def test_over_bracketed(flow, scheduler, reftest, validate):
"""Test nested conditional simplification for pre-initial cycling."""
wid = flow({
'scheduling': {
Expand All @@ -108,6 +108,7 @@ async def test_over_bracketed(flow, scheduler, reftest):
},
},
})
validate(wid)
schd = scheduler(wid, paused_start=False)

assert await reftest(schd) == {
Expand Down
6 changes: 3 additions & 3 deletions tests/integration/test_dbstatecheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async def checker(
},
'runtime': {
'bad': {'simulation': {'fail cycle points': '1000'}},
'output': {'outputs': {'trigger': 'message'}}
'output': {'outputs': {'trigger': 'message', 'custom_output': 'foo'}}
}
})
schd: Scheduler = mod_scheduler(wid, paused_start=False)
Expand Down Expand Up @@ -119,13 +119,13 @@ def test_output(checker):
'output',
'10000101T0000Z',
"{'submitted': 'submitted', 'started': 'started', 'succeeded': "
"'succeeded', 'trigger': 'message'}",
"'succeeded', 'trigger': 'message', 'custom_output': 'foo'}",
],
[
'output',
'10010101T0000Z',
"{'submitted': 'submitted', 'started': 'started', 'succeeded': "
"'succeeded', 'trigger': 'message'}",
"'succeeded', 'trigger': 'message', 'custom_output': 'foo'}",
],
]
assert result == expect
Expand Down
6 changes: 1 addition & 5 deletions tests/integration/test_optional_outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,11 +288,7 @@ def implicit_completion_config(mod_flow, mod_validate):
},
'runtime': {
'root': {
'outputs': {
'x': 'xxx',
'y': 'yyy',
'z': 'zzz',
}
'outputs': {x: f'{x * 3}' for x in 'abcdefghijklxyz'}
}
}
})
Expand Down
1 change: 1 addition & 0 deletions tests/integration/validate/test_outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ def test_completion_expression_invalid(
'outputs': {
'x': 'xxx',
'y': 'yyy',
'file-1': 'asdf'
},
},
},
Expand Down
46 changes: 41 additions & 5 deletions tests/unit/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import os
import sys
from optparse import Values
from typing import (
TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Type)
import pytest
import logging
from textwrap import dedent
from types import SimpleNamespace
from contextlib import suppress

Expand All @@ -47,6 +47,10 @@

from cylc.flow.cycling.iso8601 import ISO8601Point


param = pytest.param


if TYPE_CHECKING:
from pathlib import Path
Fixture = Any
Expand Down Expand Up @@ -1175,13 +1179,16 @@ def WorkflowConfig__assert_err_raised():
WorkflowConfig__assert_err_raised()


def test_undefined_custom_output(tmp_flow_config: Callable):
@pytest.mark.parametrize(
'graph', (('foo:x => bar'), ('foo:x'))
)
def test_undefined_custom_output(graph: str, tmp_flow_config: Callable):
"""Test error on undefined custom output referenced in graph."""
id_ = 'custom_out1'
flow_file = tmp_flow_config(id_, """
flow_file = tmp_flow_config(id_, f"""
[scheduling]
[[graph]]
R1 = "foo:x => bar"
R1 = "{graph}"
[runtime]
[[foo, bar]]
""")
Expand Down Expand Up @@ -1700,7 +1707,6 @@ def test_cylc_env_at_parsing(

def test_force_workflow_compat_mode(tmp_path):
fpath = (tmp_path / 'flow.cylc')
from textwrap import dedent
fpath.write_text(dedent("""
[scheduler]
allow implicit tasks = true
Expand All @@ -1713,3 +1719,33 @@ def test_force_workflow_compat_mode(tmp_path):
WorkflowConfig('foo', str(fpath), {})
# It succeeds with compat mode:
WorkflowConfig('foo', str(fpath), {}, force_compat_mode=True)


@pytest.mark.parametrize(
'registered_outputs, tasks_and_outputs, fails',
(
param([], ['foo:x'], True, id='output-unregistered'),
param([], ['foo:x?'], True, id='optional-output-unregistered'),
param([], ['foo'], False, id='no-modifier-unregistered'),
param(['x'], ['foo:x'], False, id='output-registered'),
param([], ['foo:succeed'], False, id='alt-default-ok'),
param([], ['foo:failed'], False, id='default-ok'),
)
)
def test_check_outputs(tmp_path, registered_outputs, tasks_and_outputs, fails):
(tmp_path / 'flow.cylc').write_text(dedent("""
[scheduler]
allow implicit tasks = true
[scheduling]
[[graph]]
R1 = foo
"""))
cfg = WorkflowConfig('', tmp_path / 'flow.cylc', '')
cfg.cfg['runtime']['foo']['outputs'] = registered_outputs
if fails:
with pytest.raises(
WorkflowConfigError, match='Undefined custom output'
):
cfg.check_terminal_outputs(tasks_and_outputs)
else:
assert cfg.check_terminal_outputs(tasks_and_outputs) is None
12 changes: 5 additions & 7 deletions tests/unit/test_graph_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from cylc.flow.graph_parser import GraphParser
from cylc.flow.task_outputs import (
TASK_OUTPUT_SUBMITTED,
TASK_OUTPUT_SUBMIT_FAILED,
TASK_OUTPUT_STARTED,
TASK_OUTPUT_SUCCEEDED,
TASK_OUTPUT_FAILED
Expand Down Expand Up @@ -810,7 +809,6 @@ def test_cannot_be_required():
gp.parse_graph('a:submit-failed => b')



@pytest.mark.parametrize(
'graph, error',
[
Expand Down Expand Up @@ -969,13 +967,13 @@ def test_RHS_AND(graph: str, expected_triggers: Dict[str, List[str]]):
@pytest.mark.parametrize(
'args, err',
(
# Error if offset in terminal RHS:
param((('a', 'b[-P42M]'), {'b[-P42M]'}), 'Invalid cycle point offset'),
# No error if offset in NON-terminal RHS:
param((('a', 'b[-P42M]'), {}), None),
param((('a', 'b[-P42M]'), {}, set(), set()), None),
# Check the left hand side if this has a non-terminal RHS:
param((('a &', 'b[-P42M]'), {}), 'Null task name in graph'),
)
param(
(('a &', 'b[-P42M]'), {}, set(), set()), 'Null task name in graph'
),
),
)
def test_proc_dep_pair(args, err):
"""
Expand Down
Loading