Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/7024.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Ensure `next()/previous()` syntax works for the `cylc play --startcp` option.
1 change: 1 addition & 0 deletions changes.d/7048.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed `initial cycle point = now` changing on reload/restart.
9 changes: 5 additions & 4 deletions cylc/flow/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,10 @@ async def reload_workflow(schd: 'Scheduler', reload_global: bool = False):
schd.reload_pending = 'loading the workflow definition'
schd.update_data_store() # update workflow status msg
schd._update_workflow_state()
# Things that can't change on workflow reload:
schd._set_workflow_params(
schd.workflow_db_mgr.pri_dao.select_workflow_params()
)
LOG.info("Reloading the workflow definition.")
config = schd.load_flow_file(is_reload=True)
except (ParsecError, CylcConfigError) as exc:
Expand All @@ -589,10 +593,7 @@ async def reload_workflow(schd: 'Scheduler', reload_global: bool = False):
else:
schd.reload_pending = 'applying the new config'
old_tasks = set(schd.config.get_task_name_list())
# Things that can't change on workflow reload:
schd._set_workflow_params(
schd.workflow_db_mgr.pri_dao.select_workflow_params()
)

schd.apply_new_config(config, is_reload=True)
schd.broadcast_mgr.linearized_ancestors = (
schd.config.get_linearized_ancestors()
Expand Down
52 changes: 25 additions & 27 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
Dict,
Iterable,
List,
Literal,
Mapping,
Optional,
Set,
Expand Down Expand Up @@ -249,6 +250,17 @@ def interpolate_template(tmpl, params_dict):
raise ParamExpandError('bad template syntax') from None


def _parse_iso_cycle_point(value: str) -> str:
"""Helper for parsing initial/start cycle point option in
datetime cycling mode."""
if value == 'now':
return get_current_time_string()
try:
return ingest_time(value, get_current_time_string())
except IsodatetimeError as exc:
raise WorkflowConfigError(str(exc)) from None


class WorkflowConfig:
"""Class for workflow configuration items and derived quantities."""

Expand Down Expand Up @@ -367,7 +379,8 @@ def __init__(
raise WorkflowConfigError("missing [scheduling][[graph]] section.")
# (The check that 'graph' is defined is below).

# Override the workflow defn with an initial point from the CLI.
# Override the workflow defn with an initial point from the CLI
# or from reload/restart:
icp_str = getattr(self.options, 'icp', None)
if icp_str is not None:
self.cfg['scheduling']['initial cycle point'] = icp_str
Expand Down Expand Up @@ -468,7 +481,9 @@ def __init__(

# after the call to init_cyclers, we can start getting proper points.
init_cyclers(self.cfg)
self.cycling_type = get_interval_cls().get_null().TYPE
self.cycling_type: Literal['integer', 'iso8601'] = (
get_interval_cls().get_null().TYPE
)
self.cycle_point_dump_format = get_dump_format(self.cycling_type)

# Initial point from workflow definition (or CLI override above).
Expand Down Expand Up @@ -550,7 +565,7 @@ def __init__(
self._upg_wflow_event_names()

self.mem_log("config.py: before load_graph()")
self.load_graph()
self._load_graph()
self.mem_log("config.py: after load_graph()")

self._set_completion_expressions()
Expand Down Expand Up @@ -670,10 +685,10 @@ def prelim_process_graph(self) -> None:
all(item in ['graph', '1', 'R1'] for item in graphdict)
):
# Pure acyclic graph, assume integer cycling mode with '1' cycle
self.cfg['scheduling']['cycling mode'] = INTEGER_CYCLING_TYPE
for key in ('initial cycle point', 'final cycle point'):
if key not in self.cfg['scheduling']:
self.cfg['scheduling'][key] = '1'
self.cfg['scheduling']['cycling mode'] = INTEGER_CYCLING_TYPE

def process_utc_mode(self):
"""Set UTC mode from config or from stored value on restart.
Expand Down Expand Up @@ -747,7 +762,6 @@ def process_initial_cycle_point(self) -> None:
Sets:
self.initial_point
self.cfg['scheduling']['initial cycle point']
self.evaluated_icp
Raises:
WorkflowConfigError - if it fails to validate
"""
Expand All @@ -756,22 +770,11 @@ def process_initial_cycle_point(self) -> None:
if orig_icp is None:
orig_icp = '1'
icp = orig_icp
elif self.cycling_type == ISO8601_CYCLING_TYPE:
else:
if orig_icp is None:
raise WorkflowConfigError(
"This workflow requires an initial cycle point.")
if orig_icp == "now":
icp = get_current_time_string()
else:
try:
icp = ingest_time(orig_icp, get_current_time_string())
except IsodatetimeError as exc:
raise WorkflowConfigError(str(exc)) from None
self.evaluated_icp = None
if icp != orig_icp:
# now/next()/previous() was used, need to store
# evaluated point in DB
self.evaluated_icp = icp
icp = _parse_iso_cycle_point(orig_icp)
self.initial_point = get_point(icp).standardise()
self.cfg['scheduling']['initial cycle point'] = str(self.initial_point)

Expand Down Expand Up @@ -807,8 +810,7 @@ def process_start_cycle_point(self) -> None:
)
if startcp:
# Start from a point later than initial point.
if self.options.startcp == 'now':
self.options.startcp = get_current_time_string()
self.options.startcp = _parse_iso_cycle_point(startcp)
self.start_point = get_point(self.options.startcp).standardise()
elif starttask:
# Start from designated task(s).
Expand Down Expand Up @@ -2294,7 +2296,7 @@ def _close_families(l_id, r_id, clf_map):

return lret, rret

def load_graph(self):
def _load_graph(self):
"""Parse and load dependency graph."""
LOG.debug("Parsing the dependency graph")

Expand All @@ -2318,18 +2320,14 @@ def load_graph(self):
section = get_sequence_cls().get_async_expr()
graphdict[section] = graphdict.pop('graph')

icp = self.cfg['scheduling']['initial cycle point']
icp = str(self.initial_point)
fcp = self.cfg['scheduling']['final cycle point']

# Make a stack of sections and graphs [(sec1, graph1), ...]
sections = []
for section, value in self.cfg['scheduling']['graph'].items():
# Substitute initial and final cycle points.
if icp:
section = section.replace("^", icp)
elif "^" in section:
raise WorkflowConfigError("Initial cycle point referenced"
" (^) but not defined.")
section = section.replace("^", icp)
if fcp:
section = section.replace("$", fcp)
elif "$" in section:
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/cycling/integer.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ def init_from_cfg(_):
pass


def get_dump_format(cycling_type=None):
def get_dump_format() -> None:
"""Return cycle point string dump format."""
# Not used for integer cycling.
return None
Expand Down
5 changes: 4 additions & 1 deletion cylc/flow/cycling/iso8601.py
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,9 @@ def __lt__(self, other):
def __str__(self):
return self.value

def __repr__(self) -> str:
return f"<{type(self).__name__} {self.value}>"

def __hash__(self) -> int:
return hash(self.value)

Expand Down Expand Up @@ -902,7 +905,7 @@ def init(num_expanded_year_digits=0, custom_dump_format=None, time_zone=None,
return WorkflowSpecifics


def get_dump_format():
def get_dump_format() -> str:
"""Return cycle point string dump format."""
return WorkflowSpecifics.DUMP_FORMAT

Expand Down
28 changes: 24 additions & 4 deletions cylc/flow/cycling/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,21 @@
Each task may have multiple sequences, e.g. 12-hourly and 6-hourly.
"""

from typing import Optional, Type, overload
from typing import (
Literal,
Optional,
Type,
overload,
)

from cylc.flow.cycling import PointBase, integer, iso8601
from metomi.isodatetime.data import Calendar

from cylc.flow.cycling import (
PointBase,
integer,
iso8601,
)


ISO8601_CYCLING_TYPE = iso8601.CYCLER_TYPE_ISO8601
INTEGER_CYCLING_TYPE = integer.CYCLER_TYPE_INTEGER
Expand Down Expand Up @@ -88,8 +98,18 @@ def get_point_cls(cycling_type: Optional[str] = None) -> Type[PointBase]:
return POINTS[cycling_type]


def get_dump_format(cycling_type=None):
"""Return cycle point dump format, or None."""
@overload
def get_dump_format(cycling_type: Literal["integer"]) -> None:
...


@overload
def get_dump_format(cycling_type: Literal["iso8601"]) -> str:
...


def get_dump_format(cycling_type: Literal["integer", "iso8601"]) -> str | None:
"""Return cycle point dump format (None for integer mode)."""
return DUMP_FORMAT_GETTERS[cycling_type]()


Expand Down
9 changes: 5 additions & 4 deletions cylc/flow/option_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,11 @@ def _update_sources(self, other):
ICP_OPTION = OptionSettings(
["--initial-cycle-point", "--icp"],
help=(
"Set the initial cycle point."
" Required if not defined in flow.cylc."
"\nMay be either an absolute point or an offset: See"
f" {SHORTLINK_TO_ICP_DOCS} (Cylc documentation link)."
"Set the initial cycle point. "
"Required if not defined in flow.cylc.\n"
"Can be an absolute point or an offset relative to the "
"current time - see "
f"{SHORTLINK_TO_ICP_DOCS} (Cylc documentation link)."
),
metavar="CYCLE_POINT or OFFSET",
action='store',
Expand Down
10 changes: 5 additions & 5 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1228,7 +1228,7 @@ def apply_new_config(self, config, is_reload=False):
})

def _set_workflow_params(
self, params: Iterable[Tuple[str, Optional[str]]]
self, params: Iterable[tuple[str, str | None]]
) -> None:
"""Set workflow params on restart/reload.

Expand All @@ -1240,20 +1240,20 @@ def _set_workflow_params(
* A flag to indicate if the workflow should be paused or not.
* Original workflow run time zone.
"""
LOG.info('LOADING workflow parameters')
LOG.info("LOADING saved workflow parameters")
for key, value in params:
if key == self.workflow_db_mgr.KEY_RUN_MODE:
self.options.run_mode = value or RunMode.LIVE.value
LOG.info(f"+ run mode = {value}")
if value is None:
continue
if key in self.workflow_db_mgr.KEY_INITIAL_CYCLE_POINT_COMPATS:
if key == self.workflow_db_mgr.KEY_INITIAL_CYCLE_POINT:
self.options.icp = value
LOG.info(f"+ initial point = {value}")
elif key in self.workflow_db_mgr.KEY_START_CYCLE_POINT_COMPATS:
elif key == self.workflow_db_mgr.KEY_START_CYCLE_POINT:
self.options.startcp = value
LOG.info(f"+ start point = {value}")
elif key in self.workflow_db_mgr.KEY_FINAL_CYCLE_POINT_COMPATS:
elif key == self.workflow_db_mgr.KEY_FINAL_CYCLE_POINT:
if self.is_restart and self.options.fcp == 'reload':
LOG.debug(f"- final point = {value} (ignored)")
elif self.options.fcp is None:
Expand Down
51 changes: 33 additions & 18 deletions cylc/flow/scheduler_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Common logic for "cylc play" CLI."""

from ansimarkup import parse as cparse
import asyncio
from copy import deepcopy
from functools import lru_cache
Expand All @@ -24,50 +23,62 @@
from pathlib import Path
from shlex import quote
import sys
from typing import TYPE_CHECKING, Tuple
from typing import (
TYPE_CHECKING,
Tuple,
)

from ansimarkup import parse as cparse
from packaging.version import Version

from cylc.flow import LOG, __version__
from cylc.flow import (
LOG,
__version__,
)
from cylc.flow.exceptions import (
CylcError,
ServiceFileError,
WorkflowStopped,
)
from cylc.flow.scripts.ping import run as cylc_ping
import cylc.flow.flags
from cylc.flow.id import upgrade_legacy_ids
from cylc.flow.host_select import select_workflow_host
from cylc.flow.hostuserutil import is_remote_host
from cylc.flow.id import upgrade_legacy_ids
from cylc.flow.id_cli import parse_ids_async
from cylc.flow.loggingutil import (
close_log,
RotatingLogFileHandler,
close_log,
)
from cylc.flow.network.client import WorkflowRuntimeClient
from cylc.flow.network.log_stream_handler import ProtobufStreamHandler
from cylc.flow.option_parsers import (
ICP_OPTION,
SHORTLINK_TO_ICP_DOCS,
WORKFLOW_ID_ARG_DOC,
CylcOptionParser as COP,
OptionSettings,
Options,
ICP_OPTION,
OptionSettings,
)
from cylc.flow.pathutil import get_workflow_run_scheduler_log_path
from cylc.flow.remote import cylc_server_cmd
from cylc.flow.scheduler import Scheduler, SchedulerError
from cylc.flow.scripts.common import cylc_header
from cylc.flow.run_modes import WORKFLOW_RUN_MODES
from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager
from cylc.flow.workflow_files import (
SUITERC_DEPR_MSG,
get_workflow_srv_dir,
from cylc.flow.scheduler import (
Scheduler,
SchedulerError,
)
from cylc.flow.scripts.common import cylc_header
from cylc.flow.scripts.ping import run as cylc_ping
from cylc.flow.terminal import (
cli_function,
is_terminal,
prompt,
)
from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager
from cylc.flow.workflow_files import (
SUITERC_DEPR_MSG,
get_workflow_srv_dir,
)


if TYPE_CHECKING:
from optparse import Values
Expand Down Expand Up @@ -162,10 +173,14 @@
OptionSettings(
["--start-cycle-point", "--startcp"],
help=(
"Set the start cycle point, which may be after"
" the initial cycle point. If the specified start point is"
" not in the sequence, the next on-sequence point will"
" be used. (Not to be confused with the initial cycle point)"),
"Set the start cycle point, which may be after "
"the initial cycle point. "
"If the specified start point is not in the sequence, the next "
"on-sequence point will be used.\n"
"Can be an absolute point or an offset relative to the "
"current time, like the --initial-cycle-point option - see "
f"{SHORTLINK_TO_ICP_DOCS} (Cylc documentation link)."
),
metavar="CYCLE_POINT",
action='store',
dest="startcp",
Expand Down
Loading
Loading