-
Notifications
You must be signed in to change notification settings - Fork 241
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow importing on workers #5098
base: master
Are you sure you want to change the base?
Changes from 3 commits
f0ee54b
8fee9a9
62a0cec
9d72793
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,5 @@ | ||
"""Implemented support for Common Workflow Language (CWL) for Toil.""" | ||
import argparse | ||
# Copyright (C) 2015 Curoverse, Inc | ||
# Copyright (C) 2015-2021 Regents of the University of California | ||
# Copyright (C) 2019-2020 Seven Bridges | ||
|
@@ -2404,7 +2405,7 @@ def __init__( | |
subjob_name="_wrapper", | ||
local=True, | ||
) | ||
self.cwltool = remove_pickle_problems(tool) | ||
self.cwltool = tool | ||
self.cwljob = cwljob | ||
self.runtime_context = runtime_context | ||
self.conditional = conditional | ||
|
@@ -2441,7 +2442,7 @@ def __init__( | |
conditional: Union[Conditional, None] = None, | ||
): | ||
"""Store the context for later execution.""" | ||
self.cwltool = remove_pickle_problems(tool) | ||
self.cwltool = tool | ||
self.conditional = conditional or Conditional() | ||
|
||
if runtime_context.builder: | ||
|
@@ -2790,6 +2791,32 @@ def get_container_engine(runtime_context: cwltool.context.RuntimeContext) -> str | |
return "singularity" | ||
return "docker" | ||
|
||
def makeRootJob( | ||
tool: Process, | ||
jobobj: CWLObjectType, | ||
runtime_context: cwltool.context.RuntimeContext, | ||
initialized_job_order: CWLObjectType, | ||
options: Namespace, | ||
toil: Toil | ||
) -> CWLNamedJob: | ||
""" | ||
Create the Toil root Job object for the CWL tool. Is the same as makeJob() except this also handles import logic. | ||
|
||
Actually creates what might be a subgraph of two jobs. The second of which may be the follow on of the first. | ||
If only one job is created, it is returned twice. | ||
|
||
:return: | ||
""" | ||
|
||
if options.run_imports_on_workers: | ||
import_job = CWLImportJob(initialized_job_order, tool, runtime_context, options) | ||
return import_job | ||
else: | ||
import_workflow_inputs(toil._jobStore, options, initialized_job_order=initialized_job_order, tool=tool) | ||
rootJob, followOn = makeJob(tool, jobobj, runtime_context, None, None) # toplevel, no name needed | ||
rootJob.cwljob = initialized_job_order | ||
return rootJob | ||
|
||
|
||
def makeJob( | ||
tool: Process, | ||
|
@@ -2805,6 +2832,9 @@ def makeJob( | |
""" | ||
Create the correct Toil Job object for the CWL tool. | ||
|
||
Actually creates what might be a subgraph of two jobs. The second of which may be the follow on of the first. | ||
If only one job is created, it is returned twice. | ||
|
||
Types: workflow, job, or job wrapper for dynamic resource requirements. | ||
|
||
:return: "wfjob, followOn" if the input tool is a workflow, and "job, job" otherwise | ||
|
@@ -3111,7 +3141,11 @@ def hasChild(self, c: Job) -> Any: | |
|
||
|
||
def remove_pickle_problems(obj: ProcessType) -> ProcessType: | ||
"""Doc_loader does not pickle correctly, causing Toil errors, remove from objects.""" | ||
""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function is only called in one place now, other than itself. Is it safe to remove and was the issue resolved? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's still needed as I ran into this issue while creating the CWLImportJob. Something internal in the CWL tool object is unpickleable and must be removed. Before it was being called in every Job initialization, but now I moved it to be ran only once on the leader. |
||
Doc_loader does not pickle correctly, causing Toil errors, remove from objects. | ||
|
||
See github issue: https://github.com/mypyc/mypyc/issues/804 | ||
""" | ||
if hasattr(obj, "doc_loader"): | ||
obj.doc_loader = None | ||
if isinstance(obj, cwltool.workflow.WorkflowStep): | ||
|
@@ -3143,7 +3177,7 @@ def __init__( | |
self.cwlwf = cwlwf | ||
self.cwljob = cwljob | ||
self.runtime_context = runtime_context | ||
self.cwlwf = remove_pickle_problems(self.cwlwf) | ||
self.cwlwf = self.cwlwf | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No-op. |
||
self.conditional = conditional or Conditional() | ||
|
||
def run( | ||
|
@@ -3332,6 +3366,111 @@ def run( | |
return UnresolvedDict(outobj) | ||
|
||
|
||
class CWLSetupJob(CWLNamedJob): | ||
""" | ||
Job to take a CWL tool and job order with all files imported and makes a CWLWorkflow as a child to run it. | ||
""" | ||
def __init__(self, initialized_job_order: Promised[CWLObjectType], tool: Promised[Process], runtime_context: cwltool.context.RuntimeContext): | ||
super().__init__() | ||
self.initialized_job_order = initialized_job_order | ||
self.tool = tool | ||
self.runtime_context = runtime_context | ||
|
||
def run(self, file_store: AbstractFileStore) -> Any: | ||
""" | ||
:return: Returns a CWL object that represents the output of the workflow. | ||
""" | ||
initialized_job_order = unwrap(self.initialized_job_order) | ||
tool = unwrap(self.tool) | ||
root_job, _ = makeJob(tool, initialized_job_order, self.runtime_context, None, None) | ||
self.addChild(root_job) | ||
|
||
root_job.cwljob = initialized_job_order | ||
|
||
return root_job.rv() | ||
|
||
|
||
class CWLImportJob(CWLNamedJob): | ||
""" | ||
Job to do file imports on a worker instead of a leader. Assumes all local and cloud files are accessible. | ||
|
||
This class is only used when runImportsOnWorkers is enabled. | ||
""" | ||
def __init__(self, initialized_job_order: CWLObjectType, tool: Process, runtime_context: cwltool.context.RuntimeContext, options: Namespace): | ||
super().__init__(local=False, disk=options.import_workers_disk) | ||
self.initialized_job_order = initialized_job_order | ||
self.tool = tool | ||
self.options = options | ||
self.runtime_context = runtime_context | ||
|
||
def run(self, file_store: AbstractFileStore) -> Any: | ||
""" | ||
Import the workflow inputs and then create and run the workflow. | ||
:return: Promise of workflow outputs | ||
""" | ||
import_workflow_inputs(file_store.jobStore, self.options, self.initialized_job_order, self.tool) | ||
setup_job = CWLSetupJob(self.initialized_job_order, self.tool, self.runtime_context) | ||
self.addChild(setup_job) | ||
return setup_job.rv() | ||
|
||
|
||
def import_workflow_inputs(jobstore: AbstractJobStore, options: Namespace, initialized_job_order: CWLObjectType, tool: Process) -> None: | ||
fileindex: Dict[str, str] = {} | ||
existing: Dict[str, str] = {} | ||
# Define something we can call to import a file and get its file | ||
# ID. | ||
# We cast this because import_file is overloaded depending on if we | ||
# pass a shared file name or not, and we know the way we call it we | ||
# always get a FileID out. | ||
file_import_function = cast( | ||
Callable[[str], FileID], | ||
functools.partial(jobstore.import_file, symlink=True), | ||
) | ||
|
||
# Import all the input files, some of which may be missing optional | ||
# files. | ||
logger.info("Importing input files...") | ||
fs_access = ToilFsAccess(options.basedir) | ||
import_files( | ||
file_import_function, | ||
fs_access, | ||
fileindex, | ||
existing, | ||
initialized_job_order, | ||
mark_broken=True, | ||
skip_remote=options.reference_inputs, | ||
bypass_file_store=options.bypass_file_store, | ||
log_level=logging.INFO, | ||
) | ||
# Import all the files associated with tools (binaries, etc.). | ||
# Not sure why you would have an optional secondary file here, but | ||
# the spec probably needs us to support them. | ||
logger.info("Importing tool-associated files...") | ||
visitSteps( | ||
tool, | ||
functools.partial( | ||
import_files, | ||
file_import_function, | ||
fs_access, | ||
fileindex, | ||
existing, | ||
mark_broken=True, | ||
skip_remote=options.reference_inputs, | ||
bypass_file_store=options.bypass_file_store, | ||
log_level=logging.INFO, | ||
), | ||
) | ||
|
||
# We always expect to have processed all files that exist | ||
for param_name, param_value in initialized_job_order.items(): | ||
# Loop through all the parameters for the workflow overall. | ||
# Drop any files that aren't either imported (for when we use | ||
# the file store) or available on disk (for when we don't). | ||
# This will properly make them cause an error later if they | ||
# were required. | ||
rm_unprocessed_secondary_files(param_value) | ||
|
||
|
||
def visitSteps( | ||
cmdline_tool: Process, | ||
op: Callable[[CommentedMap], None], | ||
|
@@ -3624,6 +3763,11 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int: | |
|
||
options = get_options(args) | ||
|
||
# Take care of incompatible arguments related to file imports | ||
if options.run_imports_on_workers is True and options.import_workers_disk is None: | ||
logger.error("Commandline arguments --runImportsOnWorkers and --importWorkersDisk must both be set to run file imports on workers.") | ||
return 1 | ||
|
||
# Do cwltool setup | ||
cwltool.main.setup_schema(args=options, custom_schema_callback=None) | ||
tmpdir_prefix = options.tmpdir_prefix = options.tmpdir_prefix or DEFAULT_TMPDIR_PREFIX | ||
|
@@ -3683,9 +3827,6 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int: | |
tmp_outdir_prefix = os.path.abspath( | ||
options.tmp_outdir_prefix or DEFAULT_TMPDIR_PREFIX | ||
) | ||
|
||
fileindex: Dict[str, str] = {} | ||
existing: Dict[str, str] = {} | ||
conf_file = getattr(options, "beta_dependency_resolvers_configuration", None) | ||
use_conda_dependencies = getattr(options, "beta_conda_dependencies", None) | ||
job_script_provider = None | ||
|
@@ -3895,73 +4036,21 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int: | |
discover_secondaryFiles=True, | ||
) | ||
|
||
# Define something we can call to import a file and get its file | ||
# ID. | ||
# We cast this because import_file is overloaded depending on if we | ||
# pass a shared file name or not, and we know the way we call it we | ||
# always get a FileID out. | ||
file_import_function = cast( | ||
Callable[[str], FileID], | ||
functools.partial(toil.import_file, symlink=True), | ||
) | ||
|
||
# Import all the input files, some of which may be missing optional | ||
# files. | ||
logger.info("Importing input files...") | ||
fs_access = ToilFsAccess(options.basedir) | ||
import_files( | ||
file_import_function, | ||
fs_access, | ||
fileindex, | ||
existing, | ||
initialized_job_order, | ||
mark_broken=True, | ||
skip_remote=options.reference_inputs, | ||
bypass_file_store=options.bypass_file_store, | ||
log_level=logging.INFO, | ||
) | ||
# Import all the files associated with tools (binaries, etc.). | ||
# Not sure why you would have an optional secondary file here, but | ||
# the spec probably needs us to support them. | ||
logger.info("Importing tool-associated files...") | ||
visitSteps( | ||
tool, | ||
functools.partial( | ||
import_files, | ||
file_import_function, | ||
fs_access, | ||
fileindex, | ||
existing, | ||
mark_broken=True, | ||
skip_remote=options.reference_inputs, | ||
bypass_file_store=options.bypass_file_store, | ||
log_level=logging.INFO, | ||
), | ||
) | ||
|
||
# We always expect to have processed all files that exist | ||
for param_name, param_value in initialized_job_order.items(): | ||
# Loop through all the parameters for the workflow overall. | ||
# Drop any files that aren't either imported (for when we use | ||
# the file store) or available on disk (for when we don't). | ||
# This will properly make them cause an error later if they | ||
# were required. | ||
rm_unprocessed_secondary_files(param_value) | ||
|
||
logger.info("Creating root job") | ||
logger.debug("Root tool: %s", tool) | ||
tool = remove_pickle_problems(tool) | ||
try: | ||
wf1, _ = makeJob( | ||
wf1 = makeRootJob( | ||
tool=tool, | ||
jobobj={}, | ||
runtime_context=runtime_context, | ||
parent_name=None, # toplevel, no name needed | ||
conditional=None, | ||
initialized_job_order=initialized_job_order, | ||
options=options, | ||
toil=toil | ||
) | ||
except CWL_UNSUPPORTED_REQUIREMENT_EXCEPTION as err: | ||
logging.error(err) | ||
return CWL_UNSUPPORTED_REQUIREMENT_EXIT_CODE | ||
wf1.cwljob = initialized_job_order | ||
logger.info("Starting workflow") | ||
outobj = toil.start(wf1) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ | |
from argparse import ArgumentParser | ||
|
||
from configargparse import SUPPRESS | ||
from toil.lib.conversions import human2bytes | ||
|
||
from toil.version import baseVersion | ||
|
||
|
@@ -281,6 +282,21 @@ def add_cwl_options(parser: ArgumentParser, suppress: bool = True) -> None: | |
help=suppress_help or "Disable file streaming for files that have 'streamable' flag True", | ||
dest="disable_streaming", | ||
) | ||
parser.add_argument( | ||
"--runImportsOnWorkers", "--run-imports-on-workers", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add a test for this? |
||
action="store_true", | ||
default=False, | ||
help=suppress_help or "Run the file imports on a worker instead of the leader. This is useful if the leader is not optimized for high network performance." | ||
"If set to true, the argument --importWorkersDisk must also be set.", | ||
dest="run_imports_on_workers" | ||
) | ||
|
||
parser.add_argument("--importWorkersDisk", "--import-workers-disk", | ||
help=suppress_help or "Specify the amount of disk space an import worker will use. If file streaming for input files is not available," | ||
"this should be set to the size of the largest input file. This must be set in conjunction with the argument runImportsOnWorkers.", | ||
dest="import_workers_disk", | ||
type=lambda x: human2bytes(str(x)), | ||
default=None) | ||
|
||
provgroup = parser.add_argument_group( | ||
"Options for recording provenance information of the execution" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unused import?