Skip to content
Merged
2 changes: 2 additions & 0 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ Werkzeug
# For testing
tox
pytest
pytest-mock
coverage
responses>=0.23.0

#Building Docs
docutils
Expand Down
49 changes: 46 additions & 3 deletions planemo/commands/cmd_workflow_job_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@
from planemo import options
from planemo.cli import command_function
from planemo.galaxy.workflows import (
DOCKSTORE_TRS_BASE,
get_workflow_from_invocation_id,
is_trs_identifier,
job_template_with_metadata,
new_workflow_associated_path,
TRS_WORKFLOWS_PREFIX,
)
from planemo.io import can_write_to_path

Expand Down Expand Up @@ -70,6 +73,41 @@ def _build_commented_yaml(job, metadata):
return commented


def _trs_id_to_job_filename(trs_id: str) -> str:
"""Generate a job filename from a TRS ID.

Args:
trs_id: TRS ID in various formats:
- workflow/github.com/org/repo/name[/version]
- #workflow/github.com/org/repo/name[/version]
- trs://workflow/github.com/org/repo/name[/version]
- https://dockstore.org/api/ga4gh/trs/v2/tools/#workflow/...

Returns:
A job filename like "workflow-name-job.yml"
"""
# Strip common prefixes
identifier = trs_id
if identifier.startswith(DOCKSTORE_TRS_BASE):
identifier = identifier[len(DOCKSTORE_TRS_BASE) :]
if identifier.startswith(TRS_WORKFLOWS_PREFIX):
identifier = identifier[len(TRS_WORKFLOWS_PREFIX) :]
if identifier.startswith("#"):
identifier = identifier[1:]

# Parse the TRS ID path: workflow/github.com/org/repo/name[/version]
parts = identifier.split("/")
if len(parts) >= 5:
# Get the workflow name (5th element) and optionally version
workflow_name = parts[4]
# Sanitize the name for use as filename
workflow_name = workflow_name.replace(" ", "-").replace("/", "-")
return f"{workflow_name}-job.yml"
else:
# Fallback to a generic name
return "workflow-job.yml"


@click.command("workflow_job_init")
@options.required_workflow_arg()
@options.force_option()
Expand Down Expand Up @@ -102,9 +140,14 @@ def cli(ctx, workflow_identifier, output=None, **kwds):
job, metadata = job_template_with_metadata(workflow_identifier, **kwds)

if output is None:
output = new_workflow_associated_path(
path_basename if kwds["from_invocation"] else workflow_identifier, suffix="job"
)
if kwds["from_invocation"]:
output = new_workflow_associated_path(path_basename, suffix="job")
elif is_trs_identifier(workflow_identifier):
# Generate output filename from TRS ID
# Extract workflow name from TRS ID (e.g., workflow/github.com/org/repo/name -> name-job.yml)
output = _trs_id_to_job_filename(workflow_identifier)
else:
output = new_workflow_associated_path(workflow_identifier, suffix="job")
if not can_write_to_path(output, **kwds):
ctx.exit(1)

Expand Down
89 changes: 64 additions & 25 deletions planemo/galaxy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@
from planemo.docker import docker_host_args
from planemo.galaxy.workflows import (
get_toolshed_url_for_tool_id,
install_shed_repos_for_workflow_id,
remote_runnable_to_workflow_id,
TRS_WORKFLOWS_PREFIX,
)
from planemo.io import (
communicate,
Expand All @@ -67,7 +69,9 @@
from .run import setup_venv
from .workflows import (
find_tool_ids,
GalaxyTrsImporter,
import_workflow,
import_workflow_from_trs,
install_shed_repos,
MAIN_TOOLSHED_URL,
)
Expand Down Expand Up @@ -546,7 +550,9 @@ def _all_tool_paths(
runnables: List["Runnable"], galaxy_root: Optional[str] = None, extra_tools: Optional[List[str]] = None
) -> Set[str]:
extra_tools = extra_tools or []
all_tool_paths = {r.path for r in runnables if r.has_tools and not r.data_manager_conf_path}
all_tool_paths = {
r.path for r in runnables if r.has_tools and not r.data_manager_conf_path and not r.is_remote_workflow_uri
}
extra_tools = _expand_paths(galaxy_root, extra_tools=extra_tools)
all_tool_paths.update(extra_tools)
for tool_id in get_tool_ids_for_runnables(runnables):
Expand Down Expand Up @@ -814,10 +820,38 @@ def ready():

def install_workflows(self):
for runnable in self.runnables:
if runnable.type.name in ["galaxy_workflow", "cwl_workflow"] and not runnable.is_remote_workflow_uri:
# Install local workflows and TRS workflows, but skip already-imported Galaxy workflows
is_importable = runnable.type.name in ["galaxy_workflow", "cwl_workflow"]
is_trs = runnable.uri.startswith(TRS_WORKFLOWS_PREFIX)

if is_importable and (not runnable.is_remote_workflow_uri or is_trs):
self._install_workflow(runnable)

def _install_workflow(self, runnable):
# Check if this is a TRS workflow
if runnable.uri.startswith(TRS_WORKFLOWS_PREFIX):
# Import from TRS using Galaxy's TRS API
importer = GalaxyTrsImporter(self.user_gi)
workflow = import_workflow_from_trs(runnable.uri, importer)
self._workflow_ids[runnable.uri] = workflow["id"]

# Install required tools from the toolshed if shed_install is enabled
if self._kwds.get("shed_install") and (
self._kwds.get("engine") != "external_galaxy" or self._kwds.get("galaxy_admin_key")
):
workflow_repos = install_shed_repos_for_workflow_id(
workflow["id"],
self.user_gi,
self.gi,
self._kwds.get("ignore_dependency_problems", False),
self._kwds.get("install_tool_dependencies", False),
self._kwds.get("install_resolver_dependencies", True),
self._kwds.get("install_repository_dependencies", True),
self._kwds.get("install_most_recent_revision", False),
)
self.installed_repos[runnable.uri], self.updated_repos[runnable.uri] = workflow_repos
return

if self._kwds.get("shed_install") and (
self._kwds.get("engine") != "external_galaxy" or self._kwds.get("galaxy_admin_key")
):
Expand All @@ -839,7 +873,12 @@ def _install_workflow(self, runnable):
self._workflow_ids[runnable.path] = workflow["id"]

def workflow_id_for_runnable(self, runnable):
if runnable.is_remote_workflow_uri:
if runnable.uri.startswith(TRS_WORKFLOWS_PREFIX):
# TRS workflows are imported and their IDs are stored by URI
workflow_id = self._workflow_ids.get(runnable.uri)
if not workflow_id:
raise ValueError(f"TRS workflow not imported: {runnable.uri}")
elif runnable.is_remote_workflow_uri:
workflow_id = remote_runnable_to_workflow_id(runnable)
else:
workflow_id = self.workflow_id(runnable.path)
Expand Down Expand Up @@ -1137,43 +1176,43 @@ def _find_galaxy_root(ctx, **kwds):


def _find_test_data(runnables, **kwds):
# Find test data directory associated with path.
test_data = kwds.get("test_data", None)
if test_data:
return os.path.abspath(test_data)

test_data_search_path = "."
runnables = [r for r in runnables if r.has_tools]
runnables = [r for r in runnables if r.has_tools and not r.is_remote_workflow_uri]
if len(runnables) > 0:
test_data_search_path = runnables[0].test_data_search_path

# Find test data directory associated with path.
test_data = kwds.get("test_data", None)
test_data = _search_tool_path_for(test_data_search_path, "test-data")
if test_data:
return os.path.abspath(test_data)
else:
test_data = _search_tool_path_for(test_data_search_path, "test-data")
if test_data:
return test_data
return test_data
warn(NO_TEST_DATA_MESSAGE)
return None


def _find_tool_data_table(runnables, test_data_dir, **kwds) -> Optional[List[str]]:
tool_data_table = kwds.get("tool_data_table", None)
if tool_data_table:
return [os.path.abspath(table_path) for table_path in tool_data_table]

tool_data_search_path = "."
runnables = [r for r in runnables if r.has_tools]
runnables = [r for r in runnables if r.has_tools and not r.is_remote_workflow_uri]
if len(runnables) > 0:
tool_data_search_path = runnables[0].tool_data_search_path

tool_data_table = kwds.get("tool_data_table", None)
extra_paths = [test_data_dir] if test_data_dir else []
tool_data_table = _search_tool_path_for(
tool_data_search_path,
"tool_data_table_conf.xml.test",
extra_paths,
) or _search_tool_path_for( # if all else fails just use sample
tool_data_search_path, "tool_data_table_conf.xml.sample"
)
if tool_data_table:
return [os.path.abspath(table_path) for table_path in tool_data_table]
else:
extra_paths = [test_data_dir] if test_data_dir else []
tool_data_table = _search_tool_path_for(
tool_data_search_path,
"tool_data_table_conf.xml.test",
extra_paths,
) or _search_tool_path_for( # if all else fails just use sample
tool_data_search_path, "tool_data_table_conf.xml.sample"
)
if tool_data_table:
return [tool_data_table]
return [tool_data_table]
return None


Expand Down
Loading
Loading