diff --git a/docs/running/cliOptions.rst b/docs/running/cliOptions.rst index c5f678c1b3..5a55f61493 100644 --- a/docs/running/cliOptions.rst +++ b/docs/running/cliOptions.rst @@ -499,7 +499,8 @@ systems have issues!). gzipped on the leader. --writeMessages FILEPATH File to send messages from the leader's message bus to. - --realTimeLogging Enable real-time logging from workers to leader. + --realTimeLogging BOOL + Enable real-time logging from workers to leader. **Miscellaneous Options** diff --git a/src/toil/cwl/cwltoil.py b/src/toil/cwl/cwltoil.py index 4d46d46089..473fc374e7 100644 --- a/src/toil/cwl/cwltoil.py +++ b/src/toil/cwl/cwltoil.py @@ -33,7 +33,6 @@ import sys import textwrap import uuid -from contextlib import contextmanager from tempfile import NamedTemporaryFile, TemporaryFile, gettempdir from threading import Thread from typing import (IO, @@ -46,13 +45,12 @@ MutableMapping, MutableSequence, Optional, - Sequence, TextIO, Tuple, Type, TypeVar, Union, - cast, Generator) + cast) from urllib.parse import quote, unquote, urlparse, urlsplit import cwl_utils.errors @@ -2404,7 +2402,7 @@ def __init__( subjob_name="_wrapper", local=True, ) - self.cwltool = remove_pickle_problems(tool) + self.cwltool = tool self.cwljob = cwljob self.runtime_context = runtime_context self.conditional = conditional @@ -2441,7 +2439,7 @@ def __init__( conditional: Union[Conditional, None] = None, ): """Store the context for later execution.""" - self.cwltool = remove_pickle_problems(tool) + self.cwltool = tool self.conditional = conditional or Conditional() if runtime_context.builder: @@ -2790,6 +2788,32 @@ def get_container_engine(runtime_context: cwltool.context.RuntimeContext) -> str return "singularity" return "docker" +def makeRootJob( + tool: Process, + jobobj: CWLObjectType, + runtime_context: cwltool.context.RuntimeContext, + initialized_job_order: CWLObjectType, + options: Namespace, + toil: Toil +) -> CWLNamedJob: + """ + Create the Toil root Job object for the CWL tool. Is the same as makeJob() except this also handles import logic. + + Actually creates what might be a subgraph of two jobs. The second of which may be the follow on of the first. + If only one job is created, it is returned twice. + + :return: + """ + + if options.run_imports_on_workers: + import_job = CWLImportJob(initialized_job_order, tool, runtime_context, options) + return import_job + else: + import_workflow_inputs(toil._jobStore, options, initialized_job_order=initialized_job_order, tool=tool) + rootJob, followOn = makeJob(tool, jobobj, runtime_context, None, None) # toplevel, no name needed + rootJob.cwljob = initialized_job_order + return rootJob + def makeJob( tool: Process, @@ -2805,6 +2829,9 @@ def makeJob( """ Create the correct Toil Job object for the CWL tool. + Actually creates what might be a subgraph of two jobs. The second of which may be the follow on of the first. + If only one job is created, it is returned twice. + Types: workflow, job, or job wrapper for dynamic resource requirements. :return: "wfjob, followOn" if the input tool is a workflow, and "job, job" otherwise @@ -3111,7 +3138,11 @@ def hasChild(self, c: Job) -> Any: def remove_pickle_problems(obj: ProcessType) -> ProcessType: - """Doc_loader does not pickle correctly, causing Toil errors, remove from objects.""" + """ + Doc_loader does not pickle correctly, causing Toil errors, remove from objects. + + See github issue: https://github.com/mypyc/mypyc/issues/804 + """ if hasattr(obj, "doc_loader"): obj.doc_loader = None if isinstance(obj, cwltool.workflow.WorkflowStep): @@ -3143,7 +3174,6 @@ def __init__( self.cwlwf = cwlwf self.cwljob = cwljob self.runtime_context = runtime_context - self.cwlwf = remove_pickle_problems(self.cwlwf) self.conditional = conditional or Conditional() def run( @@ -3332,6 +3362,111 @@ def run( return UnresolvedDict(outobj) +class CWLSetupJob(CWLNamedJob): + """ + Job to take a CWL tool and job order with all files imported and makes a CWLWorkflow as a child to run it. + """ + def __init__(self, initialized_job_order: Promised[CWLObjectType], tool: Promised[Process], runtime_context: cwltool.context.RuntimeContext): + super().__init__() + self.initialized_job_order = initialized_job_order + self.tool = tool + self.runtime_context = runtime_context + + def run(self, file_store: AbstractFileStore) -> Any: + """ + :return: Returns a CWL object that represents the output of the workflow. + """ + initialized_job_order = unwrap(self.initialized_job_order) + tool = unwrap(self.tool) + root_job, _ = makeJob(tool, initialized_job_order, self.runtime_context, None, None) + self.addChild(root_job) + + root_job.cwljob = initialized_job_order + + return root_job.rv() + + +class CWLImportJob(CWLNamedJob): + """ + Job to do file imports on a worker instead of a leader. Assumes all local and cloud files are accessible. + + This class is only used when runImportsOnWorkers is enabled. + """ + def __init__(self, initialized_job_order: CWLObjectType, tool: Process, runtime_context: cwltool.context.RuntimeContext, options: Namespace): + super().__init__(local=False, disk=options.import_workers_disk) + self.initialized_job_order = initialized_job_order + self.tool = tool + self.options = options + self.runtime_context = runtime_context + + def run(self, file_store: AbstractFileStore) -> Any: + """ + Import the workflow inputs and then create and run the workflow. + :return: Promise of workflow outputs + """ + import_workflow_inputs(file_store.jobStore, self.options, self.initialized_job_order, self.tool) + setup_job = CWLSetupJob(self.initialized_job_order, self.tool, self.runtime_context) + self.addChild(setup_job) + return setup_job.rv() + + +def import_workflow_inputs(jobstore: AbstractJobStore, options: Namespace, initialized_job_order: CWLObjectType, tool: Process) -> None: + fileindex: Dict[str, str] = {} + existing: Dict[str, str] = {} + # Define something we can call to import a file and get its file + # ID. + # We cast this because import_file is overloaded depending on if we + # pass a shared file name or not, and we know the way we call it we + # always get a FileID out. + file_import_function = cast( + Callable[[str], FileID], + functools.partial(jobstore.import_file, symlink=True), + ) + + # Import all the input files, some of which may be missing optional + # files. + logger.info("Importing input files...") + fs_access = ToilFsAccess(options.basedir) + import_files( + file_import_function, + fs_access, + fileindex, + existing, + initialized_job_order, + mark_broken=True, + skip_remote=options.reference_inputs, + bypass_file_store=options.bypass_file_store, + log_level=logging.INFO, + ) + # Import all the files associated with tools (binaries, etc.). + # Not sure why you would have an optional secondary file here, but + # the spec probably needs us to support them. + logger.info("Importing tool-associated files...") + visitSteps( + tool, + functools.partial( + import_files, + file_import_function, + fs_access, + fileindex, + existing, + mark_broken=True, + skip_remote=options.reference_inputs, + bypass_file_store=options.bypass_file_store, + log_level=logging.INFO, + ), + ) + + # We always expect to have processed all files that exist + for param_name, param_value in initialized_job_order.items(): + # Loop through all the parameters for the workflow overall. + # Drop any files that aren't either imported (for when we use + # the file store) or available on disk (for when we don't). + # This will properly make them cause an error later if they + # were required. + rm_unprocessed_secondary_files(param_value) + + def visitSteps( cmdline_tool: Process, op: Callable[[CommentedMap], None], @@ -3624,6 +3759,11 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int: options = get_options(args) + # Take care of incompatible arguments related to file imports + if options.run_imports_on_workers is True and options.import_workers_disk is None: + logger.error("Commandline arguments --runImportsOnWorkers and --importWorkersDisk must both be set to run file imports on workers.") + return 1 + # Do cwltool setup cwltool.main.setup_schema(args=options, custom_schema_callback=None) tmpdir_prefix = options.tmpdir_prefix = options.tmpdir_prefix or DEFAULT_TMPDIR_PREFIX @@ -3683,9 +3823,6 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int: tmp_outdir_prefix = os.path.abspath( options.tmp_outdir_prefix or DEFAULT_TMPDIR_PREFIX ) - - fileindex: Dict[str, str] = {} - existing: Dict[str, str] = {} conf_file = getattr(options, "beta_dependency_resolvers_configuration", None) use_conda_dependencies = getattr(options, "beta_conda_dependencies", None) job_script_provider = None @@ -3895,73 +4032,21 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int: discover_secondaryFiles=True, ) - # Define something we can call to import a file and get its file - # ID. - # We cast this because import_file is overloaded depending on if we - # pass a shared file name or not, and we know the way we call it we - # always get a FileID out. - file_import_function = cast( - Callable[[str], FileID], - functools.partial(toil.import_file, symlink=True), - ) - - # Import all the input files, some of which may be missing optional - # files. - logger.info("Importing input files...") - fs_access = ToilFsAccess(options.basedir) - import_files( - file_import_function, - fs_access, - fileindex, - existing, - initialized_job_order, - mark_broken=True, - skip_remote=options.reference_inputs, - bypass_file_store=options.bypass_file_store, - log_level=logging.INFO, - ) - # Import all the files associated with tools (binaries, etc.). - # Not sure why you would have an optional secondary file here, but - # the spec probably needs us to support them. - logger.info("Importing tool-associated files...") - visitSteps( - tool, - functools.partial( - import_files, - file_import_function, - fs_access, - fileindex, - existing, - mark_broken=True, - skip_remote=options.reference_inputs, - bypass_file_store=options.bypass_file_store, - log_level=logging.INFO, - ), - ) - - # We always expect to have processed all files that exist - for param_name, param_value in initialized_job_order.items(): - # Loop through all the parameters for the workflow overall. - # Drop any files that aren't either imported (for when we use - # the file store) or available on disk (for when we don't). - # This will properly make them cause an error later if they - # were required. - rm_unprocessed_secondary_files(param_value) - logger.info("Creating root job") logger.debug("Root tool: %s", tool) + tool = remove_pickle_problems(tool) try: - wf1, _ = makeJob( + wf1 = makeRootJob( tool=tool, jobobj={}, runtime_context=runtime_context, - parent_name=None, # toplevel, no name needed - conditional=None, + initialized_job_order=initialized_job_order, + options=options, + toil=toil ) except CWL_UNSUPPORTED_REQUIREMENT_EXCEPTION as err: logging.error(err) return CWL_UNSUPPORTED_REQUIREMENT_EXIT_CODE - wf1.cwljob = initialized_job_order logger.info("Starting workflow") outobj = toil.start(wf1) diff --git a/src/toil/options/common.py b/src/toil/options/common.py index b4f6e25de1..2e4fd5eab2 100644 --- a/src/toil/options/common.py +++ b/src/toil/options/common.py @@ -628,7 +628,7 @@ def __call__(self, parser: Any, namespace: Any, values: Any, option_string: Any log_options.add_argument("--writeMessages", dest="write_messages", default=None, type=lambda x: None if x is None else os.path.abspath(x), metavar="PATH", help="File to send messages from the leader's message bus to.") - log_options.add_argument("--realTimeLogging", dest="realTimeLogging", type=strtobool, default=False, + log_options.add_argument("--realTimeLogging", dest="realTimeLogging", type=strtobool, default=False, metavar="BOOL", help="Enable real-time logging from workers to leader") # Misc options diff --git a/src/toil/options/cwl.py b/src/toil/options/cwl.py index 0db2c80889..a606db34e5 100644 --- a/src/toil/options/cwl.py +++ b/src/toil/options/cwl.py @@ -2,6 +2,7 @@ from argparse import ArgumentParser from configargparse import SUPPRESS +from toil.lib.conversions import human2bytes from toil.version import baseVersion @@ -281,6 +282,21 @@ def add_cwl_options(parser: ArgumentParser, suppress: bool = True) -> None: help=suppress_help or "Disable file streaming for files that have 'streamable' flag True", dest="disable_streaming", ) + parser.add_argument( + "--runImportsOnWorkers", "--run-imports-on-workers", + action="store_true", + default=False, + help=suppress_help or "Run the file imports on a worker instead of the leader. This is useful if the leader is not optimized for high network performance." + "If set to true, the argument --importWorkersDisk must also be set.", + dest="run_imports_on_workers" + ) + + parser.add_argument("--importWorkersDisk", "--import-workers-disk", + help=suppress_help or "Specify the amount of disk space an import worker will use. If file streaming for input files is not available," + "this should be set to the size of the largest input file. This must be set in conjunction with the argument runImportsOnWorkers.", + dest="import_workers_disk", + type=lambda x: human2bytes(str(x)), + default=None) provgroup = parser.add_argument_group( "Options for recording provenance information of the execution" diff --git a/src/toil/test/cwl/cwlTest.py b/src/toil/test/cwl/cwlTest.py index 06279357ad..787965d6b2 100644 --- a/src/toil/test/cwl/cwlTest.py +++ b/src/toil/test/cwl/cwlTest.py @@ -1651,3 +1651,42 @@ def test_download_structure(tmp_path: Path) -> None: ], any_order=True, ) + +def test_import_on_workers() -> None: + args = ["src/toil/test/cwl/download.cwl", + "src/toil/test/cwl/download_file.json", + "--runImportsOnWorkers", + "--importWorkersDisk=10MiB", + "--realTimeLogging=True", + "--logLevel=INFO", "--logColors=False"] + from toil.cwl import cwltoil + + detector = ImportWorkersMessageHandler() + + # Set up a log message detector to the root logger + logging.getLogger().addHandler(detector) + + cwltoil.main(args) + + assert detector.detected is True + + +# StreamHandler is generic, _typeshed doesn't exist at runtime, do a bit of typing trickery, see https://github.com/python/typeshed/issues/5680 +if TYPE_CHECKING: + from _typeshed import SupportsWrite + _stream_handler = logging.StreamHandler[SupportsWrite[str]] +else: + _stream_handler = logging.StreamHandler +class ImportWorkersMessageHandler(_stream_handler): + """ + Detect the import workers log message and set a flag. + """ + + def __init__(self) -> None: + self.detected = False # Have we seen the message we want? + + super().__init__(sys.stderr) + + def emit(self, record: logging.LogRecord) -> None: + if (record.msg % record.args).startswith("Issued job 'CWLImportJob' CWLImportJob"): + self.detected = True