Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions catalog/output/workflows.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"ploidy": "ANY",
"taxonomyId": "11158",
"trsId": "#workflow/github.com/iwc-workflows/generic-non-segmented-viral-variant-calling/main/versions/v0.1",
"workflowDescription": "Variant calling and consensus sequence generation for batches of Illumina PE sequenced viruses with uncomplicated and stable genome structure (like e.g. Morbilliviruses). It can handle both ampliconic and non-ampliconic data.",
"workflowDescription": "Variant calling and consensus sequence generation for batches of Illumina PE sequenced viruses with uncomplicated and stable genome structure (like e.g. Morbilliviruses).",
"workflowName": "Variant calling and consensus construction from paired end short read data of non-segmented viral genomes"
},
{
Expand Down Expand Up @@ -323,7 +323,7 @@
"ploidy": "ANY",
"taxonomyId": "11158",
"trsId": "#workflow/github.com/iwc-workflows/generic-non-segmented-viral-variant-calling/main/versions/v0.1",
"workflowDescription": "Variant calling and consensus sequence generation for batches of Illumina PE sequenced viruses with uncomplicated and stable genome structure (like e.g. Morbilliviruses). It can handle both ampliconic and non-ampliconic data.",
"workflowDescription": "Variant calling and consensus sequence generation for batches of Illumina PE sequenced viruses with uncomplicated and stable genome structure (like e.g. Morbilliviruses).",
"workflowName": "Variant calling and consensus construction from paired end short read data of non-segmented viral genomes"
},
{
Expand Down Expand Up @@ -360,7 +360,7 @@
"taxonomyId": "2",
"trsId": "#workflow/github.com/iwc-workflows/amr_gene_detection/main/versions/v1.1.5",
"workflowDescription": "Antimicrobial resistance gene detection from assembled bacterial genomes",
"workflowName": "AMR gene detection"
"workflowName": "amr_gene_detection"
},
{
"iwcId": "lncrnas-annotation-main",
Expand Down
174 changes: 151 additions & 23 deletions catalog/py_package/catalog_build/iwc_manifest_to_workflows_yaml.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import argparse
import json
import os
import re
import subprocess
from typing import Dict
import time
from typing import Dict, List

import requests
import yaml
Expand Down Expand Up @@ -33,7 +35,7 @@
)


def read_existing_yaml(workflows_path):
def read_existing_yaml(workflows_path: str) -> Dict[str, Workflow]:
if os.path.exists(workflows_path):
with open(workflows_path) as fh:
workflows = Workflows.model_validate(yaml.safe_load(fh)).workflows
Expand All @@ -44,7 +46,9 @@ def read_existing_yaml(workflows_path):
return by_trs_id


def get_workflow_categories_from_collections(collections):
def get_workflow_categories_from_collections(
collections: List[str],
) -> List[WorkflowCategoryId]:
return sorted(
list(
set(
Expand All @@ -57,10 +61,10 @@ def get_workflow_categories_from_collections(collections):
)


def get_input_types(workflow_definition):
def get_input_types(workflow_definition: dict) -> List[WorkflowParameter]:
# get all input types
INPUT_TYPES = ["data_input", "data_collection_input", "parameter_input"]
inputs: list[WorkflowParameter] = []
inputs: List[WorkflowParameter] = []
for step in workflow_definition["steps"].values():
step_label = step["label"]
step_type = step["type"]
Expand Down Expand Up @@ -94,18 +98,74 @@ def get_input_types(workflow_definition):
return inputs


def generate_current_workflows():
def verify_trs_version_exists(trs_id: str, skip_validation: bool = False) -> bool:
"""Check if a workflow version exists on Dockstore via TRS API."""
if skip_validation:
return True

# Parse the TRS ID to extract components
match = re.match(
r"#workflow/github\.com/iwc-workflows/([^/]+)/([^/]+)/versions/v(.+)", trs_id
)
if not match:
print(f"Warning: Cannot parse TRS ID for validation: {trs_id}")
return True # We can't look this up, but someone put it in -- don't fail

repo, workflow_name, version = match.groups()

# The workflow ID format for Dockstore is the full TRS ID without the version part
workflow_id = f"#workflow/github.com/iwc-workflows/{repo}/{workflow_name}"
# URL encode the workflow ID and version
encoded_id = requests.utils.quote(workflow_id, safe="")
encoded_version = requests.utils.quote(f"v{version}", safe="")

dockstore_url = f"https://dockstore.org/api/ga4gh/trs/v2/tools/{encoded_id}/versions/{encoded_version}"

try:
response = requests.get(dockstore_url, timeout=10)
if response.status_code == 200:
return True
elif response.status_code == 404:
return False
else:
print(
f"Warning: Unexpected status {response.status_code} checking {trs_id} at Dockstore"
)
return True # Don't drop workflows on weirdness
except requests.RequestException as e:
print(f"Warning: Error checking version {trs_id}: {e}")
return True
finally:
# Don't slam dockstore
time.sleep(0.1)


def generate_current_workflows(skip_validation: bool = False) -> Dict[str, Workflow]:
manifest_data = requests.get(URL).json()
by_trs_id: Dict[str, Workflow] = {}
version_warnings = []

for repo in manifest_data:
for workflow in repo["workflows"]:
if "tests" not in workflow:
# probably fixed on main branch of iwc ?
# this branch is pretty out of date
continue

trs_id = (
f"{workflow['trsID']}/versions/v{workflow['definition']['release']}"
)

if not verify_trs_version_exists(trs_id, skip_validation):
# This is just informational - we'll keep the workflow with whatever
# version is already in workflows.yml (handled in merge_into_existing)
version_warnings.append(
f"Info: IWC manifest has v{workflow['definition']['release']} for {workflow['trsID']} but it's not on Dockstore yet"
)

workflow_input = Workflow(
active=False,
trs_id=f"{workflow['trsID']}/versions/v{workflow['definition']['release']}",
trs_id=trs_id,
workflow_name=workflow["definition"]["name"],
categories=get_workflow_categories_from_collections(
workflow["collections"]
Expand All @@ -118,6 +178,12 @@ def generate_current_workflows():
parameters=get_input_types(workflow["definition"]),
)
by_trs_id[workflow["trsID"]] = workflow_input

if version_warnings and not skip_validation:
print("\nVersion status notes:")
for warning in version_warnings:
print(f" {warning}")

return by_trs_id


Expand Down Expand Up @@ -149,29 +215,82 @@ def add_missing_parameters(
existing_workflow_input.parameters.append(param)


def merge_into_existing(workflows_path):
def merge_into_existing(
workflows_path: str, skip_validation: bool = False
) -> Dict[str, Workflow]:
existing = read_existing_yaml(workflows_path)
current = generate_current_workflows()
current = generate_current_workflows(skip_validation)
merged: Dict[str, Workflow] = {}
invalid_versions = []
versions_kept = []

for versionless_trs_id, current_workflow_input in current.items():
existing_workflow_input = existing.get(versionless_trs_id)
if existing_workflow_input:
# we'll keep whatever has been specified in the brc repo,
# and only update values that are in the iwc manifest
exisiting_dict = existing_workflow_input.model_dump()
new_dict = current_workflow_input.model_dump()
for key in MANIFEST_SOURCE_OF_TRUTH:
exisiting_dict[key] = new_dict[key]
ensure_parameters_exist(current_workflow_input, existing_workflow_input)
updated_existing_workflow = Workflow(**exisiting_dict)
add_missing_parameters(current_workflow_input, updated_existing_workflow)
current_workflow_input = updated_existing_workflow
if not existing_workflow_input:
merged[versionless_trs_id] = current_workflow_input
continue

iwc_version_valid = verify_trs_version_exists(
current_workflow_input.trs_id, skip_validation
)
existing_version_valid = verify_trs_version_exists(
existing_workflow_input.trs_id, skip_validation
)

# Decide which version to use
if not iwc_version_valid and existing_version_valid:
# IWC version not on Dockstore yet, but existing version is valid
versions_kept.append(
f"Keeping {existing_workflow_input.trs_id} (IWC has newer unreleased version)"
)
current_workflow_input.trs_id = existing_workflow_input.trs_id
elif not existing_version_valid:
# Existing version is invalid (manually edited to bad version)
if iwc_version_valid:
print(
f"Error: Invalid version {existing_workflow_input.trs_id} doesn't exist on Dockstore"
)
print(f" -> Reverting to IWC version: {current_workflow_input.trs_id}")
invalid_versions.append(existing_workflow_input.trs_id)
else:
# Both versions are invalid - this shouldn't happen often
print(
f"Error: Neither existing nor IWC version exists on Dockstore for {versionless_trs_id}"
)
# Keep what we have
current_workflow_input.trs_id = existing_workflow_input.trs_id

# Build the merged workflow
existing_dict = existing_workflow_input.model_dump()
new_dict = current_workflow_input.model_dump()

# Update manifest-controlled fields
for key in MANIFEST_SOURCE_OF_TRUTH:
existing_dict[key] = new_dict[key]

ensure_parameters_exist(current_workflow_input, existing_workflow_input)
updated_existing_workflow = Workflow(**existing_dict)
add_missing_parameters(current_workflow_input, updated_existing_workflow)
current_workflow_input = updated_existing_workflow
merged[versionless_trs_id] = current_workflow_input

if versions_kept and not skip_validation:
print(
f"\nKept {len(versions_kept)} existing versions (newer IWC versions not on Dockstore yet)"
)
for msg in versions_kept:
print(f" {msg}")

if invalid_versions:
print(f"\nFixed {len(invalid_versions)} invalid versions in workflows.yml")

return merged


def to_workflows_yaml(workflows_path: str, exclude_other: bool):
by_trs_id = merge_into_existing(workflows_path)
def to_workflows_yaml(
workflows_path: str, exclude_other: bool, skip_validation: bool = False
):
by_trs_id = merge_into_existing(workflows_path, skip_validation)
# sort by trs id, should play nicer with git diffs
sorted_workflows = list(dict(sorted(by_trs_id.items())).values())
if exclude_other:
Expand Down Expand Up @@ -209,5 +328,14 @@ def to_workflows_yaml(workflows_path: str, exclude_other: bool):
action="store_true",
help="Exclude other items from processing.",
)
parser.add_argument(
"--skip-validation",
action="store_true",
help="Skip validation of workflow versions against TRS API.",
)
args = parser.parse_args()
to_workflows_yaml(args.workflows_path, exclude_other=args.exclude_other)
to_workflows_yaml(
args.workflows_path,
exclude_other=args.exclude_other,
skip_validation=args.skip_validation,
)
Loading
Loading