diff --git a/dev-requirements.txt b/dev-requirements.txt index 45f813d09..aadece56d 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -8,7 +8,9 @@ Werkzeug # For testing tox pytest +pytest-mock coverage +responses>=0.23.0 #Building Docs docutils diff --git a/planemo/commands/cmd_workflow_job_init.py b/planemo/commands/cmd_workflow_job_init.py index cc90840cf..462529ea8 100644 --- a/planemo/commands/cmd_workflow_job_init.py +++ b/planemo/commands/cmd_workflow_job_init.py @@ -9,9 +9,12 @@ from planemo import options from planemo.cli import command_function from planemo.galaxy.workflows import ( + DOCKSTORE_TRS_BASE, get_workflow_from_invocation_id, + is_trs_identifier, job_template_with_metadata, new_workflow_associated_path, + TRS_WORKFLOWS_PREFIX, ) from planemo.io import can_write_to_path @@ -70,6 +73,41 @@ def _build_commented_yaml(job, metadata): return commented +def _trs_id_to_job_filename(trs_id: str) -> str: + """Generate a job filename from a TRS ID. + + Args: + trs_id: TRS ID in various formats: + - workflow/github.com/org/repo/name[/version] + - #workflow/github.com/org/repo/name[/version] + - trs://workflow/github.com/org/repo/name[/version] + - https://dockstore.org/api/ga4gh/trs/v2/tools/#workflow/... + + Returns: + A job filename like "workflow-name-job.yml" + """ + # Strip common prefixes + identifier = trs_id + if identifier.startswith(DOCKSTORE_TRS_BASE): + identifier = identifier[len(DOCKSTORE_TRS_BASE) :] + if identifier.startswith(TRS_WORKFLOWS_PREFIX): + identifier = identifier[len(TRS_WORKFLOWS_PREFIX) :] + if identifier.startswith("#"): + identifier = identifier[1:] + + # Parse the TRS ID path: workflow/github.com/org/repo/name[/version] + parts = identifier.split("/") + if len(parts) >= 5: + # Get the workflow name (5th element) and optionally version + workflow_name = parts[4] + # Sanitize the name for use as filename + workflow_name = workflow_name.replace(" ", "-").replace("/", "-") + return f"{workflow_name}-job.yml" + else: + # Fallback to a generic name + return "workflow-job.yml" + + @click.command("workflow_job_init") @options.required_workflow_arg() @options.force_option() @@ -102,9 +140,14 @@ def cli(ctx, workflow_identifier, output=None, **kwds): job, metadata = job_template_with_metadata(workflow_identifier, **kwds) if output is None: - output = new_workflow_associated_path( - path_basename if kwds["from_invocation"] else workflow_identifier, suffix="job" - ) + if kwds["from_invocation"]: + output = new_workflow_associated_path(path_basename, suffix="job") + elif is_trs_identifier(workflow_identifier): + # Generate output filename from TRS ID + # Extract workflow name from TRS ID (e.g., workflow/github.com/org/repo/name -> name-job.yml) + output = _trs_id_to_job_filename(workflow_identifier) + else: + output = new_workflow_associated_path(workflow_identifier, suffix="job") if not can_write_to_path(output, **kwds): ctx.exit(1) diff --git a/planemo/galaxy/config.py b/planemo/galaxy/config.py index d0e85f981..fc4fdb919 100644 --- a/planemo/galaxy/config.py +++ b/planemo/galaxy/config.py @@ -42,7 +42,9 @@ from planemo.docker import docker_host_args from planemo.galaxy.workflows import ( get_toolshed_url_for_tool_id, + install_shed_repos_for_workflow_id, remote_runnable_to_workflow_id, + TRS_WORKFLOWS_PREFIX, ) from planemo.io import ( communicate, @@ -67,7 +69,9 @@ from .run import setup_venv from .workflows import ( find_tool_ids, + GalaxyTrsImporter, import_workflow, + import_workflow_from_trs, install_shed_repos, MAIN_TOOLSHED_URL, ) @@ -546,7 +550,9 @@ def _all_tool_paths( runnables: List["Runnable"], galaxy_root: Optional[str] = None, extra_tools: Optional[List[str]] = None ) -> Set[str]: extra_tools = extra_tools or [] - all_tool_paths = {r.path for r in runnables if r.has_tools and not r.data_manager_conf_path} + all_tool_paths = { + r.path for r in runnables if r.has_tools and not r.data_manager_conf_path and not r.is_remote_workflow_uri + } extra_tools = _expand_paths(galaxy_root, extra_tools=extra_tools) all_tool_paths.update(extra_tools) for tool_id in get_tool_ids_for_runnables(runnables): @@ -814,10 +820,38 @@ def ready(): def install_workflows(self): for runnable in self.runnables: - if runnable.type.name in ["galaxy_workflow", "cwl_workflow"] and not runnable.is_remote_workflow_uri: + # Install local workflows and TRS workflows, but skip already-imported Galaxy workflows + is_importable = runnable.type.name in ["galaxy_workflow", "cwl_workflow"] + is_trs = runnable.uri.startswith(TRS_WORKFLOWS_PREFIX) + + if is_importable and (not runnable.is_remote_workflow_uri or is_trs): self._install_workflow(runnable) def _install_workflow(self, runnable): + # Check if this is a TRS workflow + if runnable.uri.startswith(TRS_WORKFLOWS_PREFIX): + # Import from TRS using Galaxy's TRS API + importer = GalaxyTrsImporter(self.user_gi) + workflow = import_workflow_from_trs(runnable.uri, importer) + self._workflow_ids[runnable.uri] = workflow["id"] + + # Install required tools from the toolshed if shed_install is enabled + if self._kwds.get("shed_install") and ( + self._kwds.get("engine") != "external_galaxy" or self._kwds.get("galaxy_admin_key") + ): + workflow_repos = install_shed_repos_for_workflow_id( + workflow["id"], + self.user_gi, + self.gi, + self._kwds.get("ignore_dependency_problems", False), + self._kwds.get("install_tool_dependencies", False), + self._kwds.get("install_resolver_dependencies", True), + self._kwds.get("install_repository_dependencies", True), + self._kwds.get("install_most_recent_revision", False), + ) + self.installed_repos[runnable.uri], self.updated_repos[runnable.uri] = workflow_repos + return + if self._kwds.get("shed_install") and ( self._kwds.get("engine") != "external_galaxy" or self._kwds.get("galaxy_admin_key") ): @@ -839,7 +873,12 @@ def _install_workflow(self, runnable): self._workflow_ids[runnable.path] = workflow["id"] def workflow_id_for_runnable(self, runnable): - if runnable.is_remote_workflow_uri: + if runnable.uri.startswith(TRS_WORKFLOWS_PREFIX): + # TRS workflows are imported and their IDs are stored by URI + workflow_id = self._workflow_ids.get(runnable.uri) + if not workflow_id: + raise ValueError(f"TRS workflow not imported: {runnable.uri}") + elif runnable.is_remote_workflow_uri: workflow_id = remote_runnable_to_workflow_id(runnable) else: workflow_id = self.workflow_id(runnable.path) @@ -1137,43 +1176,43 @@ def _find_galaxy_root(ctx, **kwds): def _find_test_data(runnables, **kwds): + # Find test data directory associated with path. + test_data = kwds.get("test_data", None) + if test_data: + return os.path.abspath(test_data) + test_data_search_path = "." - runnables = [r for r in runnables if r.has_tools] + runnables = [r for r in runnables if r.has_tools and not r.is_remote_workflow_uri] if len(runnables) > 0: test_data_search_path = runnables[0].test_data_search_path - # Find test data directory associated with path. - test_data = kwds.get("test_data", None) + test_data = _search_tool_path_for(test_data_search_path, "test-data") if test_data: - return os.path.abspath(test_data) - else: - test_data = _search_tool_path_for(test_data_search_path, "test-data") - if test_data: - return test_data + return test_data warn(NO_TEST_DATA_MESSAGE) return None def _find_tool_data_table(runnables, test_data_dir, **kwds) -> Optional[List[str]]: + tool_data_table = kwds.get("tool_data_table", None) + if tool_data_table: + return [os.path.abspath(table_path) for table_path in tool_data_table] + tool_data_search_path = "." - runnables = [r for r in runnables if r.has_tools] + runnables = [r for r in runnables if r.has_tools and not r.is_remote_workflow_uri] if len(runnables) > 0: tool_data_search_path = runnables[0].tool_data_search_path - tool_data_table = kwds.get("tool_data_table", None) + extra_paths = [test_data_dir] if test_data_dir else [] + tool_data_table = _search_tool_path_for( + tool_data_search_path, + "tool_data_table_conf.xml.test", + extra_paths, + ) or _search_tool_path_for( # if all else fails just use sample + tool_data_search_path, "tool_data_table_conf.xml.sample" + ) if tool_data_table: - return [os.path.abspath(table_path) for table_path in tool_data_table] - else: - extra_paths = [test_data_dir] if test_data_dir else [] - tool_data_table = _search_tool_path_for( - tool_data_search_path, - "tool_data_table_conf.xml.test", - extra_paths, - ) or _search_tool_path_for( # if all else fails just use sample - tool_data_search_path, "tool_data_table_conf.xml.sample" - ) - if tool_data_table: - return [tool_data_table] + return [tool_data_table] return None diff --git a/planemo/galaxy/workflows.py b/planemo/galaxy/workflows.py index 93abd7b81..23d7e4953 100644 --- a/planemo/galaxy/workflows.py +++ b/planemo/galaxy/workflows.py @@ -11,8 +11,18 @@ Dict, List, Optional, + Protocol, + runtime_checkable, + Tuple, + TYPE_CHECKING, ) -from urllib.parse import urlparse +from urllib.parse import ( + quote, + urlparse, +) + +if TYPE_CHECKING: + from bioblend.galaxy import GalaxyInstance import requests import yaml @@ -39,9 +49,276 @@ FAILED_REPOSITORIES_MESSAGE = "Failed to install one or more repositories." GALAXY_WORKFLOWS_PREFIX = "gxid://workflows/" GALAXY_WORKFLOW_INSTANCE_PREFIX = "gxid://workflow-instance/" +TRS_WORKFLOWS_PREFIX = "trs://" MAIN_TOOLSHED_URL = "https://toolshed.g2.bx.psu.edu" +def parse_trs_id(trs_id: str) -> Optional[Dict[str, str]]: + """Parse a TRS ID into a full TRS URL. + + Args: + trs_id: TRS ID in format: [#]workflow/github.com/org/repo/workflow_name[/version] + Examples: + - workflow/github.com/org/repo/main + - #workflow/github.com/org/repo/main/v0.1.14 + - workflow/github.com/iwc-workflows/parallel-accession-download/main + + Returns: + Dict with key 'trs_url' containing the full TRS API URL, + or None if invalid + """ + # Remove leading # if present + if trs_id.startswith("#"): + trs_id = trs_id[1:] + + # Expected format: workflow/github.com/org/repo[/workflow_name][/version] + # Some workflows use the repo name as the workflow name (4 parts for tool ID) + # Others have a separate workflow name (5 parts for tool ID) + parts = trs_id.split("/") + if len(parts) < 4: + return None + + artifact_type = parts[0] # workflow or tool + service = parts[1] # github.com + owner = parts[2] + repo = parts[3] + + # Determine if we have a workflow name and/or version + # Format could be: + # workflow/github.com/org/repo (4 parts) - no workflow name, no version + # workflow/github.com/org/repo/workflow_name (5 parts) - with workflow name, no version + # workflow/github.com/org/repo/workflow_name/version (6 parts) - with both + # workflow/github.com/org/repo/workflow_name/versions/version (7 parts) - full URL format + if len(parts) == 4: + workflow_name = None + version = None + elif len(parts) == 5: + # 5th part is the workflow name (e.g., "main" in most cases) + workflow_name = parts[4] + version = None + elif len(parts) >= 6: + # 6+ parts: 5th is workflow name, 6th might be "versions" keyword or version + workflow_name = parts[4] + # Check if this is full URL format with "versions" keyword + if len(parts) >= 7 and parts[5] == "versions": + # Full URL format: .../workflow_name/versions/version + version = parts[6] + else: + # Short format: .../workflow_name/version + version = parts[5] + else: + workflow_name = None + version = None + + # Build the TRS tool ID + # Format: #workflow/github.com/org/repo[/workflow_name] + if workflow_name: + trs_tool_id = f"#{artifact_type}/{service}/{owner}/{repo}/{workflow_name}" + else: + trs_tool_id = f"#{artifact_type}/{service}/{owner}/{repo}" + # URL-encode the tool ID for API calls + encoded_tool_id = quote(trs_tool_id, safe="") + + # Build the full TRS URL + # Dockstore is the primary TRS server for GitHub workflows + trs_base_url = "https://dockstore.org/api/ga4gh/trs/v2/tools/" + + if version: + # Specific version requested + trs_url = f"{trs_base_url}{trs_tool_id}/versions/{version}" + else: + # No version specified - fetch latest version from Dockstore + # Galaxy requires /versions/{version_id} in TRS URLs, so we must provide a version + default_version = workflow_name if workflow_name else "main" + + try: + # Query Dockstore API to get available versions (using encoded URL) + versions_url = f"{trs_base_url}{encoded_tool_id}/versions" + response = requests.get(versions_url, timeout=10) + response.raise_for_status() + versions = response.json() + + if versions and len(versions) > 0: + # Get the first version (usually the latest/default) + latest_version = versions[0].get("name") or versions[0].get("id") + if latest_version: + trs_url = f"{trs_base_url}{trs_tool_id}/versions/{latest_version}" + else: + # Use default version as fallback + trs_url = f"{trs_base_url}{trs_tool_id}/versions/{default_version}" + else: + # No versions found, use default version + trs_url = f"{trs_base_url}{trs_tool_id}/versions/{default_version}" + except Exception: + # If we can't fetch versions, use default version + # Galaxy requires the /versions/ part in TRS URLs + trs_url = f"{trs_base_url}{trs_tool_id}/versions/{default_version}" + + return {"trs_url": trs_url} + + +def parse_trs_uri(trs_uri: str) -> Optional[Dict[str, str]]: + """Parse a TRS URI into a full TRS URL. + + Args: + trs_uri: TRS URI in format: trs://[#]workflow/github.com/org/repo/workflow_name[/version] + or trs:// + + Returns: + Dict with key 'trs_url' containing the full TRS API URL, + or None if invalid + """ + if not trs_uri.startswith(TRS_WORKFLOWS_PREFIX): + return None + + # Remove trs:// prefix + trs_content = trs_uri[len(TRS_WORKFLOWS_PREFIX) :] + + # Parse as a TRS ID (workflow/... or #workflow/...) to resolve versions + return parse_trs_id(trs_content) + + +@runtime_checkable +class TrsImporter(Protocol): + """Interface for importing workflows from TRS.""" + + def import_from_trs(self, trs_url: str) -> Dict[str, Any]: + """Import a workflow from a TRS URL. + + Args: + trs_url: Full TRS URL to import from + + Returns: + Workflow dict with 'id' and other metadata + """ + ... + + +class GalaxyTrsImporter: + """Import TRS workflows via Galaxy API.""" + + def __init__(self, gi: "GalaxyInstance"): + """Initialize with Galaxy instance. + + Args: + gi: BioBlend GalaxyInstance for API access + """ + self._gi = gi + + def import_from_trs(self, trs_url: str) -> Dict[str, Any]: + """Import a workflow from a TRS URL via Galaxy API. + + Args: + trs_url: Full TRS URL to import from + + Returns: + Workflow dict with 'id' and other metadata + """ + trs_payload = {"archive_source": "trs_tool", "trs_url": trs_url} + url = self._gi.workflows._make_url() + return self._gi.workflows._post(url=url, payload=trs_payload) + + +def import_workflow_from_trs(trs_uri: str, importer: TrsImporter) -> Dict[str, Any]: + """Import a workflow from a TRS endpoint. + + Args: + trs_uri: TRS URI in format trs://[#]workflow/github.com/org/repo/workflow_name[/version] + importer: TrsImporter implementation to use for importing + + Returns: + Workflow dict with 'id' and other metadata + """ + trs_info = parse_trs_uri(trs_uri) + if not trs_info: + raise ValueError(f"Invalid TRS URI: {trs_uri}") + + return importer.import_from_trs(trs_info["trs_url"]) + + +DOCKSTORE_TRS_BASE = "https://dockstore.org/api/ga4gh/trs/v2/tools/" + + +def _resolve_trs_url(trs_id: str) -> str: + """Resolve a TRS identifier to a full TRS URL.""" + if trs_id.startswith(DOCKSTORE_TRS_BASE): + return trs_id + if trs_id.startswith(TRS_WORKFLOWS_PREFIX): + trs_info = parse_trs_uri(trs_id) + if not trs_info: + raise ValueError(f"Invalid TRS URI: {trs_id}") + return trs_info["trs_url"] + # It's a short TRS ID + trs_info = parse_trs_id(trs_id) + if not trs_info: + raise ValueError(f"Invalid TRS ID: {trs_id}") + return trs_info["trs_url"] + + +def _encode_trs_url(trs_url: str) -> str: + """URL-encode the tool ID portion of a TRS URL for API calls.""" + if not trs_url.startswith(DOCKSTORE_TRS_BASE): + return trs_url + tool_id_and_version = trs_url[len(DOCKSTORE_TRS_BASE) :] + if "/versions/" in tool_id_and_version: + tool_id, version = tool_id_and_version.split("/versions/", 1) + return f"{DOCKSTORE_TRS_BASE}{quote(tool_id, safe='')}/versions/{version}" + return f"{DOCKSTORE_TRS_BASE}{quote(tool_id_and_version, safe='')}" + + +def _find_primary_descriptor(files: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]: + """Find the primary descriptor file from a list of TRS files.""" + for f in files: + if f.get("file_type") == "PRIMARY_DESCRIPTOR": + return f + for f in files: + if f.get("path", "").endswith((".ga", ".gxwf.yml", ".gxwf.yaml")): + return f + return files[0] if files else None + + +def fetch_workflow_from_trs(trs_id: str) -> Dict[str, Any]: + """Fetch a workflow definition directly from a TRS endpoint. + + Args: + trs_id: TRS ID in format: [#]workflow/github.com/org/repo/workflow_name[/version] + or a full TRS URL + + Returns: + Workflow definition dict (gxformat2 or GA format) + + Raises: + ValueError: If the TRS ID is invalid or workflow cannot be fetched + """ + trs_url = _encode_trs_url(_resolve_trs_url(trs_id)) + files_url = f"{trs_url}/GALAXY/files" + + try: + response = requests.get(files_url, timeout=30) + response.raise_for_status() + files = response.json() + + primary_file = _find_primary_descriptor(files) + if not primary_file: + raise ValueError(f"No workflow file found at TRS endpoint: {trs_url}") + + descriptor_url = f"{trs_url}/GALAXY/descriptor/{primary_file.get('path', '')}" + file_response = requests.get(descriptor_url, timeout=30) + file_response.raise_for_status() + content = file_response.json().get("content", "") + + if not content: + raise ValueError(f"Empty workflow content from TRS endpoint: {trs_url}") + + try: + return json.loads(content) + except json.JSONDecodeError: + return yaml.safe_load(content) + + except requests.RequestException as e: + raise ValueError(f"Failed to fetch workflow from TRS endpoint {trs_url}: {e}") + + @lru_cache(maxsize=None) def guess_tool_shed_url(tool_shed_fqdn: str) -> Optional[str]: if tool_shed_fqdn in MAIN_TOOLSHED_URL: @@ -105,6 +382,46 @@ def load_shed_repos(runnable): return tools +def _install_shed_repos_from_tools_info( + tools_info: List[Dict[str, Any]], + admin_gi: "GalaxyInstance", + ignore_dependency_problems: bool, + install_tool_dependencies: bool = False, + install_resolver_dependencies: bool = True, + install_repository_dependencies: bool = True, + install_most_recent_revision: bool = False, +) -> Tuple[Optional[List[Any]], Optional[List[Any]]]: + """Common logic for installing tool shed repositories from a tools_info list.""" + if not tools_info: + return None, None + + install_tool_manager = shed_tools.InstallRepositoryManager(admin_gi) + install_results = install_tool_manager.install_repositories( + tools_info, + default_install_tool_dependencies=install_tool_dependencies, + default_install_resolver_dependencies=install_resolver_dependencies, + default_install_repository_dependencies=install_repository_dependencies, + ) + if install_most_recent_revision: # for workflow autoupdates we also need the most recent tool versions + update_results = install_tool_manager.update_repositories( + tools_info, + default_install_tool_dependencies=install_tool_dependencies, + default_install_resolver_dependencies=install_resolver_dependencies, + default_install_repository_dependencies=install_repository_dependencies, + ) + install_results.errored_repositories.extend(update_results.errored_repositories) + updated_repos = update_results.installed_repositories + else: + updated_repos = None + + if install_results.errored_repositories: + if ignore_dependency_problems: + warn(FAILED_REPOSITORIES_MESSAGE) + else: + raise Exception(FAILED_REPOSITORIES_MESSAGE) + return install_results.installed_repositories, updated_repos + + def install_shed_repos( runnable, admin_gi, @@ -115,34 +432,82 @@ def install_shed_repos( install_most_recent_revision=False, ): tools_info = load_shed_repos(runnable) - if tools_info: - install_tool_manager = shed_tools.InstallRepositoryManager(admin_gi) - install_results = install_tool_manager.install_repositories( - tools_info, - default_install_tool_dependencies=install_tool_dependencies, - default_install_resolver_dependencies=install_resolver_dependencies, - default_install_repository_dependencies=install_repository_dependencies, - ) - if install_most_recent_revision: # for workflow autoupdates we also need the most recent tool versions - update_results = install_tool_manager.update_repositories( - tools_info, - default_install_tool_dependencies=install_tool_dependencies, - default_install_resolver_dependencies=install_resolver_dependencies, - default_install_repository_dependencies=install_repository_dependencies, + return _install_shed_repos_from_tools_info( + tools_info, + admin_gi, + ignore_dependency_problems, + install_tool_dependencies, + install_resolver_dependencies, + install_repository_dependencies, + install_most_recent_revision, + ) + + +def install_shed_repos_for_workflow_id( + workflow_id: str, + user_gi: "GalaxyInstance", + admin_gi: "GalaxyInstance", + ignore_dependency_problems: bool, + install_tool_dependencies: bool = False, + install_resolver_dependencies: bool = True, + install_repository_dependencies: bool = True, + install_most_recent_revision: bool = False, +) -> Tuple[Optional[List[Any]], Optional[List[Any]]]: + """Install tool shed repositories for a workflow that's already in Galaxy. + + This is used for TRS workflows that are imported via Galaxy's TRS API. + We fetch the workflow definition from Galaxy and extract tool requirements. + """ + # Fetch the workflow from Galaxy to get the GA format + workflow_dict = user_gi.workflows.export_workflow_dict(workflow_id) + + # Use ephemeris to generate the tool list from the workflow + with tempfile.NamedTemporaryFile(mode="w", suffix=".ga", delete=False) as wf_file: + json.dump(workflow_dict, wf_file) + wf_file.flush() + wf_path = wf_file.name + + try: + with tempfile.NamedTemporaryFile(mode="w", suffix=".yml", delete=False) as tool_file: + tool_path = tool_file.name + + try: + # Generate tool list from the GA workflow + generate_tool_list_from_ga_workflow_files.generate_tool_list_from_workflow( + [wf_path], "Tools from TRS workflow", tool_path ) - install_results.errored_repositories.extend(update_results.errored_repositories) - updated_repos = update_results.installed_repositories - else: - updated_repos = None - if install_results.errored_repositories: - if ignore_dependency_problems: - warn(FAILED_REPOSITORIES_MESSAGE) - else: - raise Exception(FAILED_REPOSITORIES_MESSAGE) - return install_results.installed_repositories, updated_repos - else: - return None, None + # Load the generated tool list + with open(tool_path) as f: + tools_data = yaml.safe_load(f) + tools_info = tools_data.get("tools", []) if tools_data else [] + + # Add tool shed URLs + for repo in tools_info: + tool_shed = repo.get("tool_shed") + if tool_shed: + tool_shed_url = guess_tool_shed_url(tool_shed) + if tool_shed_url: + repo["tool_shed_url"] = tool_shed_url + + # Use common installation logic + return _install_shed_repos_from_tools_info( + tools_info, + admin_gi, + ignore_dependency_problems, + install_tool_dependencies, + install_resolver_dependencies, + install_repository_dependencies, + install_most_recent_revision, + ) + finally: + # Clean up tool list file + if os.path.exists(tool_path): + os.unlink(tool_path) + finally: + # Clean up workflow file + if os.path.exists(wf_path): + os.unlink(wf_path) def import_workflow(path, admin_gi, user_gi, from_path=False): @@ -375,23 +740,19 @@ def _collection_elements_for_type(collection_type): ] -def job_template_with_metadata(workflow_path, **kwds): - """Return a job template with metadata for each input. +def _build_template_and_metadata_from_inputs( + all_inputs: List[Dict[str, Any]], +) -> Tuple[Dict[str, Any], Dict[str, Any]]: + """Build job template and metadata from normalized workflow inputs. - Returns a tuple of (template_dict, metadata_dict) where metadata_dict - contains type, doc (description), optional status, default value, and format for each input label. - """ - if kwds.get("from_invocation"): - # For invocation-based templates, we don't have metadata - return _job_inputs_template_from_invocation(workflow_path, kwds["galaxy_url"], kwds["galaxy_user_key"]), {} - - try: - all_inputs = inputs_normalized(workflow_path=workflow_path) - except Exception: - raise Exception("Input workflow could not be successfully normalized - try linting with planemo workflow_lint.") + Args: + all_inputs: List of normalized input step definitions from gxformat2 - template = {} - metadata = {} + Returns: + Tuple of (template_dict, metadata_dict) + """ + template: Dict[str, Any] = {} + metadata: Dict[str, Any] = {} for input_step in all_inputs: i_label = input_label(input_step) input_type = input_step["type"] @@ -439,6 +800,97 @@ def job_template_with_metadata(workflow_path, **kwds): return template, metadata +def _job_template_with_metadata_from_dict(workflow_dict: Dict[str, Any]): + """Generate job template and metadata from a workflow dictionary. + + This handles both GA format (.ga) and gxformat2 format workflows. + + Args: + workflow_dict: Workflow definition dict + + Returns: + Tuple of (template_dict, metadata_dict) + """ + # Write workflow to temp file and use inputs_normalized + # This handles both GA and gxformat2 formats consistently + is_ga_format = ( + workflow_dict.get("a_galaxy_workflow", False) + or "steps" in workflow_dict + and isinstance(workflow_dict.get("steps"), dict) + ) + + suffix = ".ga" if is_ga_format else ".gxwf.yml" + + with tempfile.NamedTemporaryFile(mode="w", suffix=suffix, delete=False) as wf_file: + if is_ga_format: + json.dump(workflow_dict, wf_file) + else: + yaml.dump(workflow_dict, wf_file) + wf_file.flush() + wf_path = wf_file.name + + try: + all_inputs = inputs_normalized(workflow_path=wf_path) + except Exception: + raise Exception("Input workflow could not be successfully normalized from TRS endpoint.") + finally: + if os.path.exists(wf_path): + os.unlink(wf_path) + + return _build_template_and_metadata_from_inputs(all_inputs) + + +def is_trs_identifier(identifier: str) -> bool: + """Check if the identifier is a TRS ID or TRS URI. + + Args: + identifier: The workflow identifier to check + + Returns: + True if it's a TRS ID or URI, False otherwise + """ + # Full Dockstore TRS URL + if identifier.startswith(DOCKSTORE_TRS_BASE): + return True + # TRS URI + if identifier.startswith(TRS_WORKFLOWS_PREFIX): + return True + # Short TRS ID format + if identifier.startswith(("workflow/", "tool/", "#workflow/", "#tool/")) and "/github.com/" in identifier: + return True + return False + + +def job_template_with_metadata(workflow_path, **kwds): + """Return a job template with metadata for each input. + + Returns a tuple of (template_dict, metadata_dict) where metadata_dict + contains type, doc (description), optional status, default value, and format for each input label. + + The workflow_path can be: + - A local file path to a workflow file + - A TRS ID (e.g., workflow/github.com/org/repo/workflow_name) + - A TRS URI (e.g., trs://workflow/github.com/org/repo/workflow_name) + - A full Dockstore TRS URL + """ + if kwds.get("from_invocation"): + # For invocation-based templates, we don't have metadata + return _job_inputs_template_from_invocation(workflow_path, kwds["galaxy_url"], kwds["galaxy_user_key"]), {} + + # Check if this is a TRS identifier + if is_trs_identifier(workflow_path): + # Fetch workflow from TRS and write to temp file for processing + workflow_dict = fetch_workflow_from_trs(workflow_path) + return _job_template_with_metadata_from_dict(workflow_dict) + + try: + all_inputs = inputs_normalized(workflow_path=workflow_path) + except Exception: + raise Exception("Input workflow could not be successfully normalized - try linting with planemo workflow_lint.") + + return _build_template_and_metadata_from_inputs(all_inputs) + + def new_workflow_associated_path(workflow_path, suffix="tests"): """Generate path for test or job YAML file next to workflow.""" base, input_ext = os.path.splitext(workflow_path) @@ -580,6 +1032,11 @@ def _job_outputs_template_from_invocation(invocation_id, galaxy_url, galaxy_api_ __all__ = ( - "import_workflow", "describe_outputs", + "DOCKSTORE_TRS_BASE", + "fetch_workflow_from_trs", + "import_workflow", + "import_workflow_from_trs", + "is_trs_identifier", + "TRS_WORKFLOWS_PREFIX", ) diff --git a/planemo/runnable.py b/planemo/runnable.py index 79cd50396..4c5e6eaf0 100644 --- a/planemo/runnable.py +++ b/planemo/runnable.py @@ -54,6 +54,7 @@ TEST_FILE_NOT_LIST_MESSAGE = "Invalid test definition file [%s] - file must contain a list of tests" TEST_FIELD_MISSING_MESSAGE = "Invalid test definition [test #%d in %s] -defintion must field [%s]." GALAXY_TOOLS_PREFIX = "gxid://tools/" +TRS_WORKFLOWS_PREFIX = "trs://" class RunnableType(Enum): @@ -115,7 +116,12 @@ def has_path(self): @property def is_remote_workflow_uri(self) -> bool: - return self.uri.startswith((GALAXY_WORKFLOWS_PREFIX, GALAXY_WORKFLOW_INSTANCE_PREFIX)) + return self.uri.startswith((GALAXY_WORKFLOWS_PREFIX, GALAXY_WORKFLOW_INSTANCE_PREFIX, TRS_WORKFLOWS_PREFIX)) + + @property + def is_trs_workflow_uri(self) -> bool: + """Check if this is a TRS workflow URI.""" + return self.uri.startswith(TRS_WORKFLOWS_PREFIX) @property def test_data_search_path(self) -> str: diff --git a/planemo/runnable_resolve.py b/planemo/runnable_resolve.py index ca8c9793b..0828ca613 100644 --- a/planemo/runnable_resolve.py +++ b/planemo/runnable_resolve.py @@ -16,6 +16,7 @@ for_path, for_uri, GALAXY_TOOLS_PREFIX, + TRS_WORKFLOWS_PREFIX, ) @@ -24,14 +25,38 @@ def for_runnable_identifier(ctx, runnable_identifier, kwds: Dict[str, Any]): # could be a URI, path, or alias current_profile = kwds.get("profile") runnable_identifier = translate_alias(ctx, runnable_identifier, current_profile) - if not runnable_identifier.startswith((GALAXY_WORKFLOWS_PREFIX, GALAXY_WORKFLOW_INSTANCE_PREFIX)): + + # Check if it's a full Dockstore TRS URL + if runnable_identifier.startswith("https://dockstore.org/api/ga4gh/trs/v2/tools/"): + # Extract the TRS tool ID from the URL + # Format: https://dockstore.org/api/ga4gh/trs/v2/tools/#workflow/github.com/org/repo/workflow_name[/versions/version] + trs_base = "https://dockstore.org/api/ga4gh/trs/v2/tools/" + trs_path = runnable_identifier[len(trs_base) :] + # Convert to TRS URI by wrapping with trs:// + runnable_identifier = f"{TRS_WORKFLOWS_PREFIX}{trs_path}" + return for_uri(runnable_identifier) + + # Check if it's a TRS ID - convert to TRS URI (don't download) + # Support both formats: workflow/... and #workflow/... + is_trs_id = ( + runnable_identifier.startswith(("workflow/", "tool/", "#workflow/", "#tool/")) + and "/github.com/" in runnable_identifier + ) + if is_trs_id: + # This is a TRS ID, convert to TRS URI + runnable_identifier = f"{TRS_WORKFLOWS_PREFIX}{runnable_identifier}" + return for_uri(runnable_identifier) + + if not runnable_identifier.startswith( + (GALAXY_WORKFLOWS_PREFIX, GALAXY_WORKFLOW_INSTANCE_PREFIX, TRS_WORKFLOWS_PREFIX) + ): runnable_identifier = uri_to_path(ctx, runnable_identifier) if os.path.exists(runnable_identifier): runnable = for_path(runnable_identifier) else: # assume galaxy workflow or tool id if "/repos/" in runnable_identifier: runnable_identifier = f"{GALAXY_TOOLS_PREFIX}{runnable_identifier}" - elif not runnable_identifier.startswith("gxid://"): + elif not runnable_identifier.startswith("gxid://") and not runnable_identifier.startswith("trs://"): runnable_identifier = f"{GALAXY_WORKFLOWS_PREFIX}{runnable_identifier}" runnable = for_uri(runnable_identifier) return runnable diff --git a/tests/data/input_accession_single_end.txt b/tests/data/input_accession_single_end.txt new file mode 100644 index 000000000..cab34ea36 --- /dev/null +++ b/tests/data/input_accession_single_end.txt @@ -0,0 +1 @@ +SRR000001 diff --git a/tests/data/wf3-job.yml b/tests/data/wf3-job.yml new file mode 100644 index 000000000..b82420a94 --- /dev/null +++ b/tests/data/wf3-job.yml @@ -0,0 +1,3 @@ +input1: + class: File + path: hello.txt diff --git a/tests/fake_trs.py b/tests/fake_trs.py new file mode 100644 index 000000000..ef5dc2758 --- /dev/null +++ b/tests/fake_trs.py @@ -0,0 +1,33 @@ +"""Test fakes for TRS functionality.""" + +from typing import ( + Any, + Dict, + List, + Optional, +) + + +class FakeTrsImporter: + """Test fake that records imports without calling Galaxy.""" + + def __init__(self, return_workflow: Optional[Dict[str, Any]] = None): + """Initialize fake importer. + + Args: + return_workflow: Workflow dict to return from imports (default: {"id": "fake_wf_id"}) + """ + self.imported_urls: List[str] = [] + self._return = return_workflow or {"id": "fake_wf_id"} + + def import_from_trs(self, trs_url: str) -> Dict[str, Any]: + """Record the import and return fake workflow. + + Args: + trs_url: TRS URL being imported + + Returns: + Fake workflow dict + """ + self.imported_urls.append(trs_url) + return self._return diff --git a/tests/fixtures/dockstore/versions_empty.json b/tests/fixtures/dockstore/versions_empty.json new file mode 100644 index 000000000..fe51488c7 --- /dev/null +++ b/tests/fixtures/dockstore/versions_empty.json @@ -0,0 +1 @@ +[] diff --git a/tests/fixtures/dockstore/versions_iwc_parallel_accession.json b/tests/fixtures/dockstore/versions_iwc_parallel_accession.json new file mode 100644 index 000000000..1531c4470 --- /dev/null +++ b/tests/fixtures/dockstore/versions_iwc_parallel_accession.json @@ -0,0 +1,356 @@ +[ + { + "author": [ + "IWC" + ], + "containerfile": false, + "descriptor_type": [ + "GALAXY" + ], + "descriptor_type_version": { + "GALAXY": [ + "gxformat1" + ] + }, + "id": "#workflow/github.com/iwc-workflows/parallel-accession-download/main:main", + "images": [], + "included_apps": [], + "is_production": false, + "meta_version": "2024-06-10 13:43:22.0", + "name": "main", + "signed": false, + "url": "https://dockstore.org/api/ga4gh/trs/v2/tools/%23workflow%2Fgithub.com%2Fiwc-workflows%2Fparallel-accession-download%2Fmain/versions/main", + "verified": false, + "verified_source": [] + }, + { + "author": [ + "IWC" + ], + "containerfile": false, + "descriptor_type": [ + "GALAXY" + ], + "descriptor_type_version": { + "GALAXY": [ + "gxformat1" + ] + }, + "id": "#workflow/github.com/iwc-workflows/parallel-accession-download/main:v0.1.14", + "images": [], + "included_apps": [], + "is_production": true, + "meta_version": "2024-06-10 13:43:22.0", + "name": "v0.1.14", + "signed": false, + "url": "https://dockstore.org/api/ga4gh/trs/v2/tools/%23workflow%2Fgithub.com%2Fiwc-workflows%2Fparallel-accession-download%2Fmain/versions/v0.1.14", + "verified": false, + "verified_source": [] + }, + { + "author": [ + "IWC" + ], + "containerfile": false, + "descriptor_type": [ + "GALAXY" + ], + "descriptor_type_version": { + "GALAXY": [ + "gxformat1" + ] + }, + "id": "#workflow/github.com/iwc-workflows/parallel-accession-download/main:v0.1.13", + "images": [], + "included_apps": [], + "is_production": false, + "meta_version": "2024-04-22 11:45:28.0", + "name": "v0.1.13", + "signed": false, + "url": "https://dockstore.org/api/ga4gh/trs/v2/tools/%23workflow%2Fgithub.com%2Fiwc-workflows%2Fparallel-accession-download%2Fmain/versions/v0.1.13", + "verified": false, + "verified_source": [] + }, + { + "author": [ + "IWC" + ], + "containerfile": false, + "descriptor_type": [ + "GALAXY" + ], + "descriptor_type_version": { + "GALAXY": [ + "gxformat1" + ] + }, + "id": "#workflow/github.com/iwc-workflows/parallel-accession-download/main:v0.1.12", + "images": [], + "included_apps": [], + "is_production": false, + "meta_version": "2024-04-16 13:56:19.0", + "name": "v0.1.12", + "signed": false, + "url": "https://dockstore.org/api/ga4gh/trs/v2/tools/%23workflow%2Fgithub.com%2Fiwc-workflows%2Fparallel-accession-download%2Fmain/versions/v0.1.12", + "verified": false, + "verified_source": [] + }, + { + "author": [ + "IWC" + ], + "containerfile": false, + "descriptor_type": [ + "GALAXY" + ], + "descriptor_type_version": { + "GALAXY": [ + "gxformat1" + ] + }, + "id": "#workflow/github.com/iwc-workflows/parallel-accession-download/main:v0.1.11", + "images": [], + "included_apps": [], + "is_production": false, + "meta_version": "2024-04-09 08:38:01.0", + "name": "v0.1.11", + "signed": false, + "url": "https://dockstore.org/api/ga4gh/trs/v2/tools/%23workflow%2Fgithub.com%2Fiwc-workflows%2Fparallel-accession-download%2Fmain/versions/v0.1.11", + "verified": false, + "verified_source": [] + }, + { + "author": [ + "IWC" + ], + "containerfile": false, + "descriptor_type": [ + "GALAXY" + ], + "descriptor_type_version": { + "GALAXY": [ + "gxformat1" + ] + }, + "id": "#workflow/github.com/iwc-workflows/parallel-accession-download/main:v0.1.10", + "images": [], + "included_apps": [], + "is_production": false, + "meta_version": "2024-03-11 05:01:16.0", + "name": "v0.1.10", + "signed": false, + "url": "https://dockstore.org/api/ga4gh/trs/v2/tools/%23workflow%2Fgithub.com%2Fiwc-workflows%2Fparallel-accession-download%2Fmain/versions/v0.1.10", + "verified": false, + "verified_source": [] + }, + { + "author": [ + "IWC" + ], + "containerfile": false, + "descriptor_type": [ + "GALAXY" + ], + "descriptor_type_version": { + "GALAXY": [ + "gxformat1" + ] + }, + "id": "#workflow/github.com/iwc-workflows/parallel-accession-download/main:v0.1.9", + "images": [], + "included_apps": [], + "is_production": false, + "meta_version": "2023-11-27 09:42:47.0", + "name": "v0.1.9", + "signed": false, + "url": "https://dockstore.org/api/ga4gh/trs/v2/tools/%23workflow%2Fgithub.com%2Fiwc-workflows%2Fparallel-accession-download%2Fmain/versions/v0.1.9", + "verified": false, + "verified_source": [] + }, + { + "author": [ + "IWC" + ], + "containerfile": false, + "descriptor_type": [ + "GALAXY" + ], + "descriptor_type_version": { + "GALAXY": [ + "gxformat1" + ] + }, + "id": "#workflow/github.com/iwc-workflows/parallel-accession-download/main:v0.1.8", + "images": [], + "included_apps": [], + "is_production": false, + "meta_version": "2023-11-20 08:41:07.0", + "name": "v0.1.8", + "signed": false, + "url": "https://dockstore.org/api/ga4gh/trs/v2/tools/%23workflow%2Fgithub.com%2Fiwc-workflows%2Fparallel-accession-download%2Fmain/versions/v0.1.8", + "verified": false, + "verified_source": [] + }, + { + "author": [ + "IWC" + ], + "containerfile": false, + "descriptor_type": [ + "GALAXY" + ], + "descriptor_type_version": { + "GALAXY": [ + "gxformat1" + ] + }, + "id": "#workflow/github.com/iwc-workflows/parallel-accession-download/main:v0.1.7", + "images": [], + "included_apps": [], + "is_production": false, + "meta_version": "2023-11-16 12:15:07.0", + "name": "v0.1.7", + "signed": false, + "url": "https://dockstore.org/api/ga4gh/trs/v2/tools/%23workflow%2Fgithub.com%2Fiwc-workflows%2Fparallel-accession-download%2Fmain/versions/v0.1.7", + "verified": false, + "verified_source": [] + }, + { + "author": [ + "IWC" + ], + "containerfile": false, + "descriptor_type": [ + "GALAXY" + ], + "descriptor_type_version": { + "GALAXY": [ + "gxformat1" + ] + }, + "id": "#workflow/github.com/iwc-workflows/parallel-accession-download/main:v0.1.6", + "images": [], + "included_apps": [], + "is_production": false, + "meta_version": "2023-10-03 13:42:30.0", + "name": "v0.1.6", + "signed": false, + "url": "https://dockstore.org/api/ga4gh/trs/v2/tools/%23workflow%2Fgithub.com%2Fiwc-workflows%2Fparallel-accession-download%2Fmain/versions/v0.1.6", + "verified": false, + "verified_source": [] + }, + { + "author": [ + "IWC" + ], + "containerfile": false, + "descriptor_type": [ + "GALAXY" + ], + "descriptor_type_version": { + "GALAXY": [ + "gxformat1" + ] + }, + "id": "#workflow/github.com/iwc-workflows/parallel-accession-download/main:v0.1.5", + "images": [], + "included_apps": [], + "is_production": false, + "meta_version": "2023-09-12 13:46:51.0", + "name": "v0.1.5", + "signed": false, + "url": "https://dockstore.org/api/ga4gh/trs/v2/tools/%23workflow%2Fgithub.com%2Fiwc-workflows%2Fparallel-accession-download%2Fmain/versions/v0.1.5", + "verified": false, + "verified_source": [] + }, + { + "author": [ + "IWC" + ], + "containerfile": false, + "descriptor_type": [ + "GALAXY" + ], + "descriptor_type_version": { + "GALAXY": [ + "gxformat1" + ] + }, + "id": "#workflow/github.com/iwc-workflows/parallel-accession-download/main:v0.1.4", + "images": [], + "included_apps": [], + "is_production": false, + "meta_version": "2023-09-05 08:29:57.0", + "name": "v0.1.4", + "signed": false, + "url": "https://dockstore.org/api/ga4gh/trs/v2/tools/%23workflow%2Fgithub.com%2Fiwc-workflows%2Fparallel-accession-download%2Fmain/versions/v0.1.4", + "verified": false, + "verified_source": [] + }, + { + "author": [], + "containerfile": false, + "descriptor_type": [ + "GALAXY" + ], + "descriptor_type_version": { + "GALAXY": [ + "gxformat1" + ] + }, + "id": "#workflow/github.com/iwc-workflows/parallel-accession-download/main:v0.1.3", + "images": [], + "included_apps": [], + "is_production": false, + "meta_version": "2022-02-04 12:20:44.0", + "name": "v0.1.3", + "signed": false, + "url": "https://dockstore.org/api/ga4gh/trs/v2/tools/%23workflow%2Fgithub.com%2Fiwc-workflows%2Fparallel-accession-download%2Fmain/versions/v0.1.3", + "verified": false, + "verified_source": [] + }, + { + "author": [], + "containerfile": false, + "descriptor_type": [ + "GALAXY" + ], + "descriptor_type_version": { + "GALAXY": [ + "gxformat1" + ] + }, + "id": "#workflow/github.com/iwc-workflows/parallel-accession-download/main:v0.1.2", + "images": [], + "included_apps": [], + "is_production": false, + "meta_version": "2021-12-20 17:05:06.0", + "name": "v0.1.2", + "signed": false, + "url": "https://dockstore.org/api/ga4gh/trs/v2/tools/%23workflow%2Fgithub.com%2Fiwc-workflows%2Fparallel-accession-download%2Fmain/versions/v0.1.2", + "verified": false, + "verified_source": [] + }, + { + "author": [], + "containerfile": false, + "descriptor_type": [ + "GALAXY" + ], + "descriptor_type_version": { + "GALAXY": [ + "gxformat1" + ] + }, + "id": "#workflow/github.com/iwc-workflows/parallel-accession-download/main:v0.1.1", + "images": [], + "included_apps": [], + "is_production": false, + "meta_version": "2021-07-26 10:22:30.0", + "name": "v0.1.1", + "signed": false, + "url": "https://dockstore.org/api/ga4gh/trs/v2/tools/%23workflow%2Fgithub.com%2Fiwc-workflows%2Fparallel-accession-download%2Fmain/versions/v0.1.1", + "verified": false, + "verified_source": [] + } +] \ No newline at end of file diff --git a/tests/fixtures/dockstore/versions_jmchilton_example1.json b/tests/fixtures/dockstore/versions_jmchilton_example1.json new file mode 100644 index 000000000..04f61f7a3 --- /dev/null +++ b/tests/fixtures/dockstore/versions_jmchilton_example1.json @@ -0,0 +1,24 @@ +[ + { + "author": [], + "containerfile": false, + "descriptor_type": [ + "GALAXY" + ], + "descriptor_type_version": { + "GALAXY": [ + "gxformat2" + ] + }, + "id": "#workflow/github.com/jmchilton/galaxy-workflow-dockstore-example-1/mycoolworkflow:master", + "images": [], + "included_apps": [], + "is_production": false, + "meta_version": "2019-11-25 18:25:53.0", + "name": "master", + "signed": false, + "url": "https://dockstore.org/api/ga4gh/trs/v2/tools/%23workflow%2Fgithub.com%2Fjmchilton%2Fgalaxy-workflow-dockstore-example-1%2Fmycoolworkflow/versions/master", + "verified": false, + "verified_source": [] + } +] \ No newline at end of file diff --git a/tests/test_run.py b/tests/test_run.py index bafad4e2b..68189befc 100644 --- a/tests/test_run.py +++ b/tests/test_run.py @@ -2,6 +2,8 @@ import os +import pytest + from .test_utils import ( CliTestCase, CWL_DRAFT3_DIR, @@ -175,3 +177,36 @@ def test_run_export_invocation(self): with zipfile.ZipFile(export_path, "r") as zip_ref: # Should contain some files for a valid RO-Crate assert len(zip_ref.namelist()) > 0 + + @pytest.mark.skipif( + target_galaxy_branch() == "release_22.05", + reason="Skipping test on Galaxy 22.05, TRS import not supported.", + ) + @skip_if_environ("PLANEMO_SKIP_GALAXY_TESTS") + @mark.tests_galaxy_branch + def test_run_trs_id(self): + """Test importing and running a workflow using a TRS ID from GitHub.""" + with self._isolate() as f: + # Use a TRS ID format: #workflow/github.com/org/repo/workflow_name[/version] + # Testing with a simple workflow that uses the cat tool + # This workflow exists on Dockstore and has a "master" version + trs_id = "#workflow/github.com/jmchilton/galaxy-workflow-dockstore-example-1/mycoolworkflow" + cat = os.path.join(PROJECT_TEMPLATES_DIR, "demo", "cat.xml") + wf_job = os.path.join(TEST_DATA_DIR, "wf3-job.yml") + + test_cmd = [ + "--verbose", + "run", + "--no_dependency_resolution", + "--extra_tools", + cat, + "--galaxy_branch", + target_galaxy_branch(), + "--test_data", + TEST_DATA_DIR, + trs_id, + wf_job, + ] + self._check_exit_code(test_cmd) + assert os.path.exists(os.path.join(f, "tool_test_output.html")) + assert os.path.exists(os.path.join(f, "tool_test_output.json")) diff --git a/tests/test_trs_id.py b/tests/test_trs_id.py new file mode 100644 index 000000000..1e949a6a0 --- /dev/null +++ b/tests/test_trs_id.py @@ -0,0 +1,265 @@ +"""Tests for TRS ID resolution functionality.""" + +import json +from pathlib import Path +from unittest.mock import ( + Mock, + patch, +) + +import responses + +import planemo.cli +from planemo.galaxy.workflows import ( + import_workflow_from_trs, + parse_trs_id, + parse_trs_uri, + TRS_WORKFLOWS_PREFIX, +) +from planemo.runnable import RunnableType +from planemo.runnable_resolve import for_runnable_identifier +from .fake_trs import FakeTrsImporter + +FIXTURES = Path(__file__).parent / "fixtures" / "dockstore" + + +class TestTRSIdParsing: + """Test TRS ID parsing to full URLs.""" + + @responses.activate + def test_parse_trs_id_workflow_without_version(self): + """Test parsing a workflow TRS ID without specific version fetches latest.""" + # Load fixture data from recorded API response + fixture = json.loads((FIXTURES / "versions_iwc_parallel_accession.json").read_text()) + + responses.add( + responses.GET, + "https://dockstore.org/api/ga4gh/trs/v2/tools/%23workflow%2Fgithub.com%2Fiwc-workflows%2Fparallel-accession-download%2Fmain/versions", + json=fixture, + status=200, + ) + + trs_id = "workflow/github.com/iwc-workflows/parallel-accession-download/main" + result = parse_trs_id(trs_id) + + assert result is not None + # Should fetch the first version from the list + first_version = fixture[0]["name"] + assert ( + result["trs_url"] + == f"https://dockstore.org/api/ga4gh/trs/v2/tools/#workflow/github.com/iwc-workflows/parallel-accession-download/main/versions/{first_version}" + ) + + def test_parse_trs_id_workflow_with_version(self): + """Test parsing a workflow TRS ID with specific version.""" + trs_id = "workflow/github.com/iwc-workflows/parallel-accession-download/main/v0.1.14" + result = parse_trs_id(trs_id) + + assert result is not None + assert ( + result["trs_url"] + == "https://dockstore.org/api/ga4gh/trs/v2/tools/#workflow/github.com/iwc-workflows/parallel-accession-download/main/versions/v0.1.14" + ) + + def test_parse_trs_id_with_hash_prefix(self): + """Test parsing a TRS ID with # prefix.""" + trs_id = "#workflow/github.com/iwc-workflows/parallel-accession-download/main/v0.1.14" + result = parse_trs_id(trs_id) + + assert result is not None + assert ( + result["trs_url"] + == "https://dockstore.org/api/ga4gh/trs/v2/tools/#workflow/github.com/iwc-workflows/parallel-accession-download/main/versions/v0.1.14" + ) + + def test_parse_trs_id_tool(self): + """Test parsing a tool TRS ID.""" + trs_id = "tool/github.com/galaxyproject/example-tool/main/v1.0" + result = parse_trs_id(trs_id) + + assert result is not None + assert ( + result["trs_url"] + == "https://dockstore.org/api/ga4gh/trs/v2/tools/#tool/github.com/galaxyproject/example-tool/main/versions/v1.0" + ) + + @responses.activate + def test_parse_trs_id_version_fetch_failure(self): + """Test parsing when version fetch fails falls back to default version.""" + # Simulate a failed API request + responses.add( + responses.GET, + "https://dockstore.org/api/ga4gh/trs/v2/tools/%23workflow%2Fgithub.com%2Forg%2Frepo%2Fmain/versions", + json={"error": "Not found"}, + status=404, + ) + + trs_id = "workflow/github.com/org/repo/main" + result = parse_trs_id(trs_id) + + assert result is not None + # Should fallback to using workflow_name as default version (Galaxy requires /versions/) + assert ( + result["trs_url"] + == "https://dockstore.org/api/ga4gh/trs/v2/tools/#workflow/github.com/org/repo/main/versions/main" + ) + + def test_parse_trs_id_invalid(self): + """Test parsing invalid TRS IDs.""" + assert parse_trs_id("invalid") is None + assert parse_trs_id("workflow/github.com/org") is None # Too few parts + + +class TestTRSUriParsing: + """Test TRS URI parsing.""" + + @patch("planemo.galaxy.workflows.requests.get") + def test_parse_trs_uri_workflow(self, mock_get): + """Test parsing a workflow TRS URI.""" + # Mock the Dockstore API response + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = [{"name": "v0.1.14"}] + mock_get.return_value = mock_response + + trs_uri = "trs://workflow/github.com/iwc-workflows/parallel-accession-download/main" + result = parse_trs_uri(trs_uri) + + assert result is not None + assert ( + result["trs_url"] + == "https://dockstore.org/api/ga4gh/trs/v2/tools/#workflow/github.com/iwc-workflows/parallel-accession-download/main/versions/v0.1.14" + ) + + def test_parse_trs_uri_with_version(self): + """Test parsing a TRS URI with version.""" + trs_uri = "trs://workflow/github.com/org/repo/main/v0.1.14" + result = parse_trs_uri(trs_uri) + + assert result is not None + assert ( + result["trs_url"] + == "https://dockstore.org/api/ga4gh/trs/v2/tools/#workflow/github.com/org/repo/main/versions/v0.1.14" + ) + + def test_parse_trs_uri_from_full_url(self): + """Test parsing a TRS URI created from a full Dockstore URL.""" + # This simulates what happens when user provides a full URL + trs_uri = "trs://#workflow/github.com/iwc-workflows/parallel-accession-download/main/versions/v0.1.14" + result = parse_trs_uri(trs_uri) + + assert result is not None + expected_url = "https://dockstore.org/api/ga4gh/trs/v2/tools/#workflow/github.com/iwc-workflows/parallel-accession-download/main/versions/v0.1.14" + assert result["trs_url"] == expected_url + + def test_parse_trs_uri_invalid(self): + """Test parsing an invalid TRS URI.""" + assert parse_trs_uri("invalid") is None + assert parse_trs_uri("trs://invalid") is None + assert parse_trs_uri("gxid://workflows/abc") is None + + +class TestTRSWorkflowImport: + """Test TRS workflow import.""" + + def test_import_workflow_from_trs(self): + """Test importing a workflow from TRS.""" + fake = FakeTrsImporter(return_workflow={"id": "test_wf_id", "name": "Test Workflow"}) + trs_uri = "trs://workflow/github.com/org/repo/main/v0.1.14" + + result = import_workflow_from_trs(trs_uri, fake) + + assert result is not None + assert result["id"] == "test_wf_id" + assert result["name"] == "Test Workflow" + # Verify the importer was called with the correct URL + assert len(fake.imported_urls) == 1 + assert ( + fake.imported_urls[0] + == "https://dockstore.org/api/ga4gh/trs/v2/tools/#workflow/github.com/org/repo/main/versions/v0.1.14" + ) + + def test_import_workflow_from_trs_with_version(self): + """Test importing a workflow from TRS with specific version.""" + fake = FakeTrsImporter(return_workflow={"id": "test_wf_id"}) + trs_uri = "trs://workflow/github.com/org/repo/main/v0.1.14" + + result = import_workflow_from_trs(trs_uri, fake) + + assert result is not None + assert len(fake.imported_urls) == 1 + assert ( + fake.imported_urls[0] + == "https://dockstore.org/api/ga4gh/trs/v2/tools/#workflow/github.com/org/repo/main/versions/v0.1.14" + ) + + def test_import_workflow_from_trs_invalid_uri(self): + """Test importing from an invalid TRS URI raises ValueError.""" + fake = FakeTrsImporter() + trs_uri = "invalid_uri" + + try: + import_workflow_from_trs(trs_uri, fake) + assert False, "Should have raised ValueError" + except ValueError as e: + assert "Invalid TRS URI" in str(e) + # Verify no imports were attempted + assert len(fake.imported_urls) == 0 + + +class TestTRSIdIntegration: + """Test TRS ID integration with for_runnable_identifier.""" + + def test_for_runnable_identifier_with_trs_id(self, tmp_path): + """Test that for_runnable_identifier creates TRS URIs.""" + ctx = planemo.cli.PlanemoCliContext() + ctx.planemo_directory = str(tmp_path) + + trs_id = "workflow/github.com/iwc-workflows/parallel-accession-download/main" + runnable = for_runnable_identifier(ctx, trs_id, {}) + + assert runnable is not None + assert runnable.type == RunnableType.galaxy_workflow + assert runnable.uri == f"{TRS_WORKFLOWS_PREFIX}{trs_id}" + assert runnable.is_trs_workflow_uri is True + assert runnable.is_remote_workflow_uri is True + + def test_for_runnable_identifier_with_hash_prefix(self, tmp_path): + """Test that for_runnable_identifier handles # prefix.""" + ctx = planemo.cli.PlanemoCliContext() + ctx.planemo_directory = str(tmp_path) + + trs_id = "#workflow/github.com/iwc-workflows/parallel-accession-download/main/v0.1.14" + runnable = for_runnable_identifier(ctx, trs_id, {}) + + assert runnable is not None + assert runnable.type == RunnableType.galaxy_workflow + assert runnable.uri == f"{TRS_WORKFLOWS_PREFIX}{trs_id}" + assert runnable.is_trs_workflow_uri is True + + def test_for_runnable_identifier_with_tool_trs_id(self, tmp_path): + """Test that for_runnable_identifier handles tool TRS IDs.""" + ctx = planemo.cli.PlanemoCliContext() + ctx.planemo_directory = str(tmp_path) + + trs_id = "tool/github.com/galaxyproject/example/v1.0" + runnable = for_runnable_identifier(ctx, trs_id, {}) + + assert runnable is not None + assert runnable.uri == f"{TRS_WORKFLOWS_PREFIX}{trs_id}" + assert runnable.is_trs_workflow_uri is True + + def test_for_runnable_identifier_with_full_dockstore_url(self, tmp_path): + """Test that for_runnable_identifier handles full Dockstore URLs.""" + ctx = planemo.cli.PlanemoCliContext() + ctx.planemo_directory = str(tmp_path) + + full_url = "https://dockstore.org/api/ga4gh/trs/v2/tools/#workflow/github.com/iwc-workflows/parallel-accession-download/main/versions/v0.1.14" + runnable = for_runnable_identifier(ctx, full_url, {}) + + assert runnable is not None + assert runnable.type == RunnableType.galaxy_workflow + # Should extract the path after the base URL + expected_uri = "trs://#workflow/github.com/iwc-workflows/parallel-accession-download/main/versions/v0.1.14" + assert runnable.uri == expected_uri + assert runnable.is_trs_workflow_uri is True diff --git a/tox.ini b/tox.ini index c48ec27b5..c8bc1e9f7 100644 --- a/tox.ini +++ b/tox.ini @@ -30,10 +30,11 @@ deps = lint_docs: -rdev-requirements.txt lint_docs,quick,unit: -rrequirements.txt lint_docstrings: flake8_docstrings - unit: pytest + unit: pytest # unit: pytest-timeout unit: coverage unit: flask + unit: responses mypy: mypy setenv =