Skip to content

Commit

Permalink
extend reloaded flag to play/restart/initial-burst
Browse files Browse the repository at this point in the history
  • Loading branch information
dwsutherland committed Sep 5, 2023
1 parent 52dacf1 commit fb47755
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 43 deletions.
49 changes: 28 additions & 21 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,36 +528,34 @@ def initiate_data_model(self, reloaded=False):
self.__init__(self.schd)

# Static elements
self.generate_definition_elements(reloaded)
self.generate_definition_elements()

# Update workflow statuses and totals (assume needed)
self.update_workflow(reloaded)
self.update_workflow(True)

# Apply current deltas
self.batch_deltas()
self.apply_delta_batch()
# Clear deltas after application
self.clear_delta_store()
self.clear_delta_batch()

if not reloaded:
# Gather this batch of deltas for publish
self.apply_delta_checksum()
self.publish_deltas = self.get_publish_deltas()
# Gather the store as batch of deltas for publishing
self.batch_deltas(True)
self.apply_delta_checksum()
self.publish_deltas = self.get_publish_deltas()

self.updates_pending = False

# Clear deltas after application and publishing
self.clear_delta_store()
# Clear second batch after publishing
self.clear_delta_batch()

def generate_definition_elements(self, reloaded):
def generate_definition_elements(self):
"""Generate static definition data elements.
Populates the tasks, families, and workflow elements
with data from and/or derived from the workflow definition.
Args:
reloaded (bool):
To set workflow reloaded field.
"""
config = self.schd.config
update_time = time()
Expand All @@ -567,7 +565,8 @@ def generate_definition_elements(self, reloaded):
workflow.id = self.workflow_id
workflow.last_updated = update_time
workflow.stamp = f'{workflow.id}@{workflow.last_updated}'
workflow.reloaded = reloaded
# Treat play/restart as hard reload of definition.
workflow.reloaded = True

graph = workflow.edges
graph.leaves[:] = config.leaves
Expand Down Expand Up @@ -1499,7 +1498,7 @@ def insert_db_job(self, row_idx, row):
tp_delta.jobs.append(j_id)
self.updates_pending = True

def update_data_structure(self, reloaded=False):
def update_data_structure(self):
"""Workflow batch updates in the data structure."""
# load database history for flagged nodes
self.apply_task_proxy_db_history()
Expand All @@ -1516,7 +1515,7 @@ def update_data_structure(self, reloaded=False):
self.update_family_proxies()

# Update workflow statuses and totals if needed
self.update_workflow(reloaded)
self.update_workflow()

# Don't process updated deltas of pruned nodes
self.prune_pruned_updated_nodes()
Expand All @@ -1526,11 +1525,7 @@ def update_data_structure(self, reloaded=False):
# Apply all deltas
self.apply_delta_batch()

if reloaded:
self.clear_delta_batch()
self.batch_deltas(reloaded=True)

if self.updates_pending or reloaded:
if self.updates_pending:
self.apply_delta_checksum()
# Gather this batch of deltas for publish
self.publish_deltas = self.get_publish_deltas()
Expand All @@ -1541,6 +1536,18 @@ def update_data_structure(self, reloaded=False):
self.clear_delta_batch()
self.clear_delta_store()

def update_workflow_states(self):
"""Batch workflow state updates."""

# update the workflow state in the data store
self.update_workflow()

# push out update deltas
self.batch_deltas()
self.apply_delta_batch()
self.apply_delta_checksum()
self.publish_deltas = self.get_publish_deltas()

def prune_data_store(self):
"""Remove flagged nodes and edges not in the set of active paths."""

Expand Down
3 changes: 3 additions & 0 deletions cylc/flow/network/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,9 @@ async def subscribe_delta(self, root, info, args):
workflow_id=w_id)
delta_store[DELTA_ADDED] = (
self.data_store_mgr.data[w_id])
delta_store[DELTA_ADDED][

Check warning on line 571 in cylc/flow/network/resolvers.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/network/resolvers.py#L571

Added line #L571 was not covered by tests
WORKFLOW
].reloaded = True
deltas_queue.put(
(w_id, 'initial_burst', delta_store))
elif w_id in self.delta_store[sub_id]:
Expand Down
38 changes: 16 additions & 22 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1160,7 +1160,7 @@ async def command_reload_workflow(self) -> None:
self._update_workflow_state()

# Re-initialise data model on reload
self.data_store_mgr.initiate_data_model(reloaded=True)
self.data_store_mgr.initiate_data_model(self.is_reloaded)

# Reset the remote init map to trigger fresh file installation
self.task_job_mgr.task_remote_mgr.remote_init_map.clear()
Expand Down Expand Up @@ -1548,7 +1548,7 @@ async def workflow_shutdown(self):

# Is the workflow ready to shut down now?
if self.pool.can_stop(self.stop_mode):
await self.update_data_structure(self.is_reloaded)
await self.update_data_structure()
self.proc_pool.close()
if self.stop_mode != StopMode.REQUEST_NOW_NOW:
# Wait for process pool to complete,
Expand Down Expand Up @@ -1767,7 +1767,7 @@ async def main_loop(self) -> None:

if has_updated or self.data_store_mgr.updates_pending:
# Update the datastore.
await self.update_data_structure(self.is_reloaded)
await self.update_data_structure()

if has_updated:
if not self.is_reloaded:
Expand Down Expand Up @@ -1838,37 +1838,31 @@ def _update_workflow_state(self):
A cut-down version of update_data_structure which only considers
workflow state changes e.g. status, status message, state totals, etc.
"""
# Publish any existing before potentially creating more
self._publish_deltas()
# update the workflow state in the data store
self.data_store_mgr.update_workflow()

# push out update deltas
self.data_store_mgr.batch_deltas()
self.data_store_mgr.apply_delta_batch()
self.data_store_mgr.apply_delta_checksum()
self.data_store_mgr.publish_deltas = (
self.data_store_mgr.get_publish_deltas()
)
self.server.publish_queue.put(
self.data_store_mgr.publish_deltas)

# Non-async sleep - yield to other threads rather
# than event loop
sleep(0)
self.data_store_mgr.update_workflow_states()
self._publish_deltas()

async def update_data_structure(self, reloaded: bool = False):
"""Update DB, UIS, Summary data elements"""
# Publish any existing before potentially creating more
self._publish_deltas()
# Collect/apply data store updates/deltas
self.data_store_mgr.update_data_structure(reloaded=reloaded)
# Publish updates:
self.data_store_mgr.update_data_structure()
self._publish_deltas()
# Database update
self.workflow_db_mgr.put_task_pool(self.pool)

def _publish_deltas(self):
"""Publish pending deltas."""
if self.data_store_mgr.publish_pending:
self.data_store_mgr.publish_pending = False
self.server.publish_queue.put(
self.data_store_mgr.publish_deltas)
# Non-async sleep - yield to other threads rather
# than event loop
sleep(0)
# Database update
self.workflow_db_mgr.put_task_pool(self.pool)

def check_workflow_timers(self):
"""Check timers, and abort or run event handlers as configured."""
Expand Down

0 comments on commit fb47755

Please sign in to comment.