diff --git a/thoth/common/openshift.py b/thoth/common/openshift.py index f9f9e1ea..e4adfb64 100644 --- a/thoth/common/openshift.py +++ b/thoth/common/openshift.py @@ -1081,6 +1081,36 @@ def schedule_package_extract( }, ) + def schedule_sync_job( + self, + document_type: Optional[str], + force_sync: bool = False, + graceful: bool = True, + job_id: Optional[str] = None, + ) -> Optional[str]: + """Schedule graph sync for specific document id.""" + if not self.middletier_namespace: + raise ConfigurationError( + "Unable to schedule graph-sync without middletier namespace being set" + ) + + sync_id = job_id or self.generate_id("sync") + template_parameters = { + "THOTH_SYNC_JOB_ID": sync_id, + "THOTH_DOCUMENT_TYPE": document_type, + "THOTH_SYNC_GRACEFUL": graceful, + "THOTH_SYNC_FORCE_SYNC": force_sync, + } + workflow_parameters = self._assign_workflow_parameters_for_ceph() + + return self._schedule_workflow( + workflow=self.workflow_manager.submit_sync_job, + parameters={ + "template_parameters": template_parameters, + "workflow_parameters": workflow_parameters, + }, + ) + def create_config_map( self, configmap_name: str, diff --git a/thoth/common/workflows.py b/thoth/common/workflows.py index 8852c552..862420e0 100644 --- a/thoth/common/workflows.py +++ b/thoth/common/workflows.py @@ -861,6 +861,27 @@ def submit_build_analysis( return workflow_id + def submit_sync_job( + self, + template_parameters: Optional[Dict[str, str]] = None, + workflow_parameters: Optional[Dict[str, Any]] = None, + ) -> Optional[str]: + """Submit graph-sync workflow.""" + if not self.openshift.middletier_namespace: + raise ConfigurationError("Middletier namespace was not provided.") + + template_parameters = template_parameters or {} + workflow_parameters = workflow_parameters or {} + + workflow_id: Optional[str] = self.submit_workflow_from_template( + self.openshift.middletier_namespace, + label_selector="template=sync", + template_parameters=template_parameters, + workflow_parameters=workflow_parameters, + workflow_namespace=self.openshift.graph_namespace, + ) + return workflow_id + def submit_graph_sync( self, template_parameters: Optional[Dict[str, str]] = None,