diff --git a/cylc/flow/data_store_mgr.py b/cylc/flow/data_store_mgr.py index 17cc8616317..d711e8a4e22 100644 --- a/cylc/flow/data_store_mgr.py +++ b/cylc/flow/data_store_mgr.py @@ -528,36 +528,34 @@ def initiate_data_model(self, reloaded=False): self.__init__(self.schd) # Static elements - self.generate_definition_elements(reloaded) + self.generate_definition_elements() # Update workflow statuses and totals (assume needed) - self.update_workflow(reloaded) + self.update_workflow(True) # Apply current deltas self.batch_deltas() self.apply_delta_batch() + # Clear deltas after application + self.clear_delta_store() + self.clear_delta_batch() - if not reloaded: - # Gather this batch of deltas for publish - self.apply_delta_checksum() - self.publish_deltas = self.get_publish_deltas() + # Gather the store as batch of deltas for publishing + self.batch_deltas(True) + self.apply_delta_checksum() + self.publish_deltas = self.get_publish_deltas() self.updates_pending = False - # Clear deltas after application and publishing - self.clear_delta_store() + # Clear second batch after publishing self.clear_delta_batch() - def generate_definition_elements(self, reloaded): + def generate_definition_elements(self): """Generate static definition data elements. Populates the tasks, families, and workflow elements with data from and/or derived from the workflow definition. - Args: - reloaded (bool): - To set workflow reloaded field. - """ config = self.schd.config update_time = time() @@ -567,7 +565,8 @@ def generate_definition_elements(self, reloaded): workflow.id = self.workflow_id workflow.last_updated = update_time workflow.stamp = f'{workflow.id}@{workflow.last_updated}' - workflow.reloaded = reloaded + # Treat play/restart as hard reload of definition. + workflow.reloaded = True graph = workflow.edges graph.leaves[:] = config.leaves @@ -1499,7 +1498,7 @@ def insert_db_job(self, row_idx, row): tp_delta.jobs.append(j_id) self.updates_pending = True - def update_data_structure(self, reloaded=False): + def update_data_structure(self): """Workflow batch updates in the data structure.""" # load database history for flagged nodes self.apply_task_proxy_db_history() @@ -1516,7 +1515,7 @@ def update_data_structure(self, reloaded=False): self.update_family_proxies() # Update workflow statuses and totals if needed - self.update_workflow(reloaded) + self.update_workflow() # Don't process updated deltas of pruned nodes self.prune_pruned_updated_nodes() @@ -1526,11 +1525,7 @@ def update_data_structure(self, reloaded=False): # Apply all deltas self.apply_delta_batch() - if reloaded: - self.clear_delta_batch() - self.batch_deltas(reloaded=True) - - if self.updates_pending or reloaded: + if self.updates_pending: self.apply_delta_checksum() # Gather this batch of deltas for publish self.publish_deltas = self.get_publish_deltas() @@ -1541,6 +1536,18 @@ def update_data_structure(self, reloaded=False): self.clear_delta_batch() self.clear_delta_store() + def update_workflow_states(self): + """Batch workflow state updates.""" + + # update the workflow state in the data store + self.update_workflow() + + # push out update deltas + self.batch_deltas() + self.apply_delta_batch() + self.apply_delta_checksum() + self.publish_deltas = self.get_publish_deltas() + def prune_data_store(self): """Remove flagged nodes and edges not in the set of active paths.""" diff --git a/cylc/flow/network/resolvers.py b/cylc/flow/network/resolvers.py index eb3ccdfb99d..55016776bfb 100644 --- a/cylc/flow/network/resolvers.py +++ b/cylc/flow/network/resolvers.py @@ -568,6 +568,9 @@ async def subscribe_delta(self, root, info, args): workflow_id=w_id) delta_store[DELTA_ADDED] = ( self.data_store_mgr.data[w_id]) + delta_store[DELTA_ADDED][ + WORKFLOW + ].reloaded = True deltas_queue.put( (w_id, 'initial_burst', delta_store)) elif w_id in self.delta_store[sub_id]: diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 7449e7d274a..eb8853dd5fc 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -1160,7 +1160,7 @@ async def command_reload_workflow(self) -> None: self._update_workflow_state() # Re-initialise data model on reload - self.data_store_mgr.initiate_data_model(reloaded=True) + self.data_store_mgr.initiate_data_model(self.is_reloaded) # Reset the remote init map to trigger fresh file installation self.task_job_mgr.task_remote_mgr.remote_init_map.clear() @@ -1548,7 +1548,7 @@ async def workflow_shutdown(self): # Is the workflow ready to shut down now? if self.pool.can_stop(self.stop_mode): - await self.update_data_structure(self.is_reloaded) + await self.update_data_structure() self.proc_pool.close() if self.stop_mode != StopMode.REQUEST_NOW_NOW: # Wait for process pool to complete, @@ -1767,7 +1767,7 @@ async def main_loop(self) -> None: if has_updated or self.data_store_mgr.updates_pending: # Update the datastore. - await self.update_data_structure(self.is_reloaded) + await self.update_data_structure() if has_updated: if not self.is_reloaded: @@ -1838,28 +1838,24 @@ def _update_workflow_state(self): A cut-down version of update_data_structure which only considers workflow state changes e.g. status, status message, state totals, etc. """ + # Publish any existing before potentially creating more + self._publish_deltas() # update the workflow state in the data store - self.data_store_mgr.update_workflow() - - # push out update deltas - self.data_store_mgr.batch_deltas() - self.data_store_mgr.apply_delta_batch() - self.data_store_mgr.apply_delta_checksum() - self.data_store_mgr.publish_deltas = ( - self.data_store_mgr.get_publish_deltas() - ) - self.server.publish_queue.put( - self.data_store_mgr.publish_deltas) - - # Non-async sleep - yield to other threads rather - # than event loop - sleep(0) + self.data_store_mgr.update_workflow_states() + self._publish_deltas() async def update_data_structure(self, reloaded: bool = False): """Update DB, UIS, Summary data elements""" + # Publish any existing before potentially creating more + self._publish_deltas() # Collect/apply data store updates/deltas - self.data_store_mgr.update_data_structure(reloaded=reloaded) - # Publish updates: + self.data_store_mgr.update_data_structure() + self._publish_deltas() + # Database update + self.workflow_db_mgr.put_task_pool(self.pool) + + def _publish_deltas(self): + """Publish pending deltas.""" if self.data_store_mgr.publish_pending: self.data_store_mgr.publish_pending = False self.server.publish_queue.put( @@ -1867,8 +1863,6 @@ async def update_data_structure(self, reloaded: bool = False): # Non-async sleep - yield to other threads rather # than event loop sleep(0) - # Database update - self.workflow_db_mgr.put_task_pool(self.pool) def check_workflow_timers(self): """Check timers, and abort or run event handlers as configured."""