Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

protobuf: dynamic subscriptions #568

Open
oliver-sanders opened this issue Mar 26, 2024 · 1 comment
Open

protobuf: dynamic subscriptions #568

oliver-sanders opened this issue Mar 26, 2024 · 1 comment
Assignees
Labels
efficiency question Flag this as a question for the next Cylc project meeting.
Milestone

Comments

@oliver-sanders
Copy link
Member

oliver-sanders commented Mar 26, 2024

Background:

We use GraphQL as an API, but we do not use it as a "store". The data does into a Protobuf serialisation, we then "resolve" GraphQL requests off of this serialisation, re-serialising it into JSON format for transfer to the UI.

So GraphQL is the external API for UI-UIS comms, Protobuf is both the store and the internal API for UIS-Flow comms.

Presently, the UIS subscribes to all topics for all active workflows (running, paused, stopping). This is vastly surplus to requirements as (at present) users can only look at one workflow at a time in the UI. This may cause excess load on the UIS and, potentially, high memory usage.

In order to field requests we only need to subscribe to the subset of topics that are required to satisfy active GraphQL subscriptions. E.G, to satisfy the requirements of the UI when looking at the tree view for workflow "a" we would need:

  1. A subscription to the WORKFLOW topic for all active workflows. This is required for workflow listing (for the GScan view).
  2. A subscription to the TASK_PROXIES, FAMILY_PROXIES, TASKS and JOBS topics for workflow "a" only (for the tree view).

To get started, it might be easier to consider two subscription levels:

  • WORKFLOW - for all active workflows.
  • ALL - for any workflows that match active GraphQL subscriptions/queries.

Getting the data store to handle two different subscription levels is straight forward:

Subscription Level Patch

Cylc UIS:

diff --git a/cylc/uiserver/data_store_mgr.py b/cylc/uiserver/data_store_mgr.py
index 101fc36..578dcac 100644
--- a/cylc/uiserver/data_store_mgr.py
+++ b/cylc/uiserver/data_store_mgr.py
@@ -36,7 +36,7 @@ from concurrent.futures import ThreadPoolExecutor
 from copy import deepcopy
 from pathlib import Path
 import time
-from typing import Dict, Optional, Set
+from typing import Dict, Optional, Set, List
 
 from cylc.flow.exceptions import WorkflowStopped
 from cylc.flow.id import Tokens
@@ -80,16 +80,31 @@ class DataStoreMgr:
     RECONCILE_TIMEOUT = 5.  # seconds
     PENDING_DELTA_CHECK_INTERVAL = 0.5
 
+    SUBSCRIPTION_LEVELS = {
+        'all': (
+            'pb_entire_workflow',
+            {ALL_DELTAS.encode('utf-8'), b'shutdown'},
+        ),
+        'workflow': (
+            'pb_workflow_only',
+            {WORKFLOW},
+        ),
+    }
+
     def __init__(self, workflows_mgr, log):
         self.workflows_mgr = workflows_mgr
         self.log = log
         self.data = {}
         self.w_subs: Dict[str, WorkflowSubscriber] = {}
-        self.topics = {ALL_DELTAS.encode('utf-8'), b'shutdown'}
+        # self.all_topics = {ALL_DELTAS.encode('utf-8'), b'shutdown'}
+        # self.workflow_topics = {WORKFLOW}
         self.loop = None
-        self.executor = ThreadPoolExecutor()
+        self.executor = ThreadPoolExecutor(max_workers=32)
         self.delta_queues = {}
 
+        # last known contact data
+        self._contact_data = {}
+
     @log_call
     async def register_workflow(self, w_id: str, is_active: bool) -> None:
         """Register a new workflow with the data store.
@@ -126,7 +141,7 @@ class DataStoreMgr:
         self._purge_workflow(w_id)
 
     @log_call
-    async def connect_workflow(self, w_id, contact_data):
+    async def connect_workflow(self, w_id, contact_data, sub_level='workflow'):
         """Initiate workflow subscriptions.
 
         Call this when a workflow has started.
@@ -134,8 +149,10 @@ class DataStoreMgr:
         Subscriptions and sync management is instantiated and run in
         a separate thread for each workflow. This is to avoid the sync loop
         blocking the main loop.
-
         """
+        # update last known contact data
+        self._contact_data[w_id] = contact_data
+
         if self.loop is None:
             self.loop = asyncio.get_running_loop()
 
@@ -152,9 +169,13 @@ class DataStoreMgr:
             w_id,
             contact_data['name'],
             contact_data[CFF.HOST],
-            contact_data[CFF.PUBLISH_PORT]
+            contact_data[CFF.PUBLISH_PORT],
+            self.SUBSCRIPTION_LEVELS[sub_level][1],
+        )
+        successful_updates = await self._workflow_update(
+            [w_id],
+            self.SUBSCRIPTION_LEVELS[sub_level][0],
         )
-        successful_updates = await self._entire_workflow_update(ids=[w_id])
 
         if w_id not in successful_updates:
             # something went wrong, undo any changes to allow for subsequent
@@ -192,6 +213,33 @@ class DataStoreMgr:
             self.w_subs[w_id].stop()
             del self.w_subs[w_id]
 
+    async def _update_subscription_level(self, w_id, sub_level):
+        sub = self.w_subs.get(w_id)
+        topics = self.SUBSCRIPTION_LEVELS[sub_level][1]
+        if sub:
+            if sub.topics == topics:
+                # workflow already subscribed to all topics
+                return True
+            else:
+                self.disconnect_workflow(w_id, update_contact=False)
+                await self.connect_workflow(
+                    w_id,
+                    self._contact_data[w_id],
+                    sub_level,
+                )
+                return True
+        else:
+            # we have to wait for this workflow to be detected before we can
+            # connect to it
+            # TODO: consider awaiting a scan here?
+            return False
+
+    async def premote_subscription_full(self, w_id):
+        await self._update_subscription_level(w_id, 'all')
+
+    async def demote_subscription_workflow(self, w_id):
+        await self._update_subscription_level(w_id, 'workflow')
+
     def get_workflows(self):
         """Return all workflows the data store is currently tracking.
 
@@ -221,7 +269,7 @@ class DataStoreMgr:
         if w_id in self.delta_queues:
             del self.delta_queues[w_id]
 
-    def _start_subscription(self, w_id, reg, host, port):
+    def _start_subscription(self, w_id, reg, host, port, topics):
         """Instantiate and run subscriber data-store sync.
 
         Args:
@@ -236,7 +284,7 @@ class DataStoreMgr:
             host=host,
             port=port,
             context=self.workflows_mgr.context,
-            topics=self.topics
+            topics=topics
         )
         self.w_subs[w_id].loop.run_until_complete(
             self.w_subs[w_id].subscribe(
@@ -333,7 +381,8 @@ class DataStoreMgr:
                     workflow_request(
                         self.workflows_mgr.workflows[w_id]['req_client'],
                         'pb_data_elements',
-                        args={'element_type': topic}
+                        args={'element_type': topic},
+                        timeout=self.RECONCILE_TIMEOUT,
                     ),
                     self.loop
                 )
@@ -352,8 +401,9 @@ class DataStoreMgr:
             except Exception as exc:
                 self.log.exception(exc)
 
-    async def _entire_workflow_update(
-        self, ids: Optional[list] = None
+    @log_call
+    async def _workflow_update(
+        self, ids: List[str], req_method: str,
     ) -> Set[str]:
         """Update entire local data-store of workflow(s).
 
@@ -364,9 +414,6 @@ class DataStoreMgr:
         if ids is None:
             ids = []
 
-        # Request new data
-        req_method = 'pb_entire_workflow'
-
         requests = {
             w_id: workflow_request(
                 client=info['req_client'], command=req_method, log=self.log

Cylc Flow:

diff --git a/cylc/flow/data_store_mgr.py b/cylc/flow/data_store_mgr.py
index 744daeb4d..3c519468d 100644
--- a/cylc/flow/data_store_mgr.py
+++ b/cylc/flow/data_store_mgr.py
@@ -2699,6 +2699,29 @@ class DataStoreMgr:
 
         return workflow_msg
 
+    def get_workflow_only(self):
+        """Gather workflow-level data elements into single Protobuf message.
+
+        No tasks / cycles, etc, just workflow stuff.
+
+        Returns:
+            cylc.flow.data_messages_pb2.PbEntireWorkflow
+
+        """
+
+        data = self.data[self.workflow_id]
+
+        workflow_msg = PbEntireWorkflow()
+        workflow_msg.workflow.CopyFrom(data[WORKFLOW])
+        # workflow_msg.tasks.extend(data[TASKS].values())
+        # workflow_msg.task_proxies.extend(data[TASK_PROXIES].values())
+        # workflow_msg.jobs.extend(data[JOBS].values())
+        # workflow_msg.families.extend(data[FAMILIES].values())
+        # workflow_msg.family_proxies.extend(data[FAMILY_PROXIES].values())
+        # workflow_msg.edges.extend(data[EDGES].values())
+
+        return workflow_msg
+
     def get_publish_deltas(self):
         """Return deltas for publishing."""
         all_deltas = DELTAS_MAP[ALL_DELTAS]()
diff --git a/cylc/flow/network/server.py b/cylc/flow/network/server.py
index 5c0704720..10903fe5d 100644
--- a/cylc/flow/network/server.py
+++ b/cylc/flow/network/server.py
@@ -46,6 +46,7 @@ if TYPE_CHECKING:
 # maps server methods to the protobuf message (for client/UIS import)
 PB_METHOD_MAP = {
     'pb_entire_workflow': PbEntireWorkflow,
+    'pb_workflow_only': PbEntireWorkflow,
     'pb_data_elements': DELTAS_MAP
 }
 
@@ -411,11 +412,20 @@ class WorkflowRuntimeServer:
         """Send the entire data-store in a single Protobuf message.
 
         Returns serialised Protobuf message
-
         """
         pb_msg = self.schd.data_store_mgr.get_entire_workflow()
         return pb_msg.SerializeToString()
 
+    @authorise()
+    @expose
+    def pb_workflow_only(self, **_kwargs) -> bytes:
+        """Send only the workflow data, not tasks etc.
+
+        Returns serialised Protobuf message
+        """
+        pb_msg = self.schd.data_store_mgr.get_workflow_only()
+        return pb_msg.SerializeToString()
+
     @authorise()
     @expose
     def pb_data_elements(self, element_type: str, **_kwargs) -> bytes:

However, determining the set of workflows that we require ALL subscriptions for (by looking at GraphQL subs) is somewhat more difficult.

Middleware allows you to work out what fields queries are looking at, however, I don't think you can monitor whether a subscription is still active from middleware?

The Tornado integration layer has access to the subscription lifecycle, but does not appear to have access to the parsed request, making it hard to determine what the subscription is looking at. The best I could come up with after a few mins is:

Tornado Patch
diff --git a/cylc/uiserver/websockets/tornado.py b/cylc/uiserver/websockets/tornado.py
index aaf2e28..5b8579f 100644
--- a/cylc/uiserver/websockets/tornado.py
+++ b/cylc/uiserver/websockets/tornado.py
@@ -63,12 +63,14 @@ class TornadoSubscriptionServer(BaseAsyncSubscriptionServer):
         loop=None,
         backend=None,
         middleware=None,
-        auth=None
+        auth=None,
+        data_store_mgr=None,
     ):
         self.loop = loop
         self.backend = backend or None
         self.middleware = middleware
         self.auth = auth
+        self.data_store_mgr=data_store_mgr
         super().__init__(schema, keep_alive)
 
     @staticmethod
@@ -131,6 +133,25 @@ class TornadoSubscriptionServer(BaseAsyncSubscriptionServer):
         # with this id.
         await connection_context.unsubscribe(op_id)
 
+        # Work out what workflows this query is subscribing to by looking at
+        # the request parameters. This is a faulted method which assumes that
+        # the GraphQL variable that controls which workflows we are subscribing
+        # to is called "WorkflowId" or "WorkflowIds", but of course, this
+        # variable could be named anything, its value could be a pattern and it
+        # doesn't have to be an argument at all, it could be hardcoded in the
+        # request.
+        _workflow_ids = []
+        for key, value in params['variable_values'].items():
+            if key.lower() == 'workflowid':
+                _workflow_ids.append(value)
+            elif key.lower() == 'workflowids':
+                _workflow_ids.extend(value)
+        for workflow_id in _workflow_ids:
+            import asyncio
+            while not await self.data_store_mgr.premote_subscription_full(workflow_id):
+                # workflow not in the store yet, wait
+                await asyncio.sleep(1)
+
         params['root_value'] = op_id
         execution_result = self.execute(params)
         try:
@@ -158,7 +179,6 @@ class TornadoSubscriptionServer(BaseAsyncSubscriptionServer):
         with suppress(KeyError):
             connection_context.request_context['sub_statuses'].pop(op_id)
 
-
     async def send_execution_result(self, connection_context, op_id, execution_result):
         # Resolve any pending promises
         if execution_result.data and 'logs' not in execution_result.data:
@dwsutherland
Copy link
Member

dwsutherland commented May 29, 2024

Presently, the UIS subscribes to all topics for all active workflows

Well, no, just two topics .. all (which is all data), and shutdown... (also, shutdown should always be a topic)
But the all data topic can be reduced based on the data level..

The way to intercept all the required info would be to modify the async generator (subscriber)

I hadn't actually read this before my first attempt, but I might adopt some things (levels) and topic adjustments.

One thing we need to be able to handle is general queries hitting the UIS, as we'll want CLI via UIS in at some point and to be able to handle random queries.. For this I think we can use that timer concept, keeping a full sync of workflows queried for some amount of time..

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
efficiency question Flag this as a question for the next Cylc project meeting.
Projects
None yet
Development

No branches or pull requests

2 participants