diff --git a/src/toil/provisioners/node.py b/src/toil/provisioners/node.py index 9b4f304430..dc025370c1 100644 --- a/src/toil/provisioners/node.py +++ b/src/toil/provisioners/node.py @@ -13,7 +13,7 @@ # limitations under the License. import datetime import logging -import pipes +from shlex import quote import socket import subprocess import time @@ -279,7 +279,7 @@ def coreSSH(self, *args, **kwargs): commandTokens += ['docker', 'exec', '-i', ttyFlag, 'toil_leader'] logger.debug('Node %s: %s', self.effectiveIP, ' '.join(args)) - args = list(map(pipes.quote, args)) + args = list(map(quote, args)) commandTokens += args logger.debug('Full command %s', ' '.join(commandTokens)) process = subprocess.Popen(commandTokens, **kwargs) diff --git a/src/toil/test/wdl/md5sum/md5sum.1.0.wdl b/src/toil/test/wdl/md5sum/md5sum.1.0.wdl index f5fc41698d..3d0fbea047 100644 --- a/src/toil/test/wdl/md5sum/md5sum.1.0.wdl +++ b/src/toil/test/wdl/md5sum/md5sum.1.0.wdl @@ -4,16 +4,17 @@ task md5 { input { File inputFile } - command { - /bin/my_md5sum ${inputFile} - } + command <<< + set -euf -o pipefail + md5sum ~{inputFile} | awk '{print $1}' > md5sum.txt + >>> output { File value = "md5sum.txt" } runtime { - docker: "quay.io/briandoconnor/dockstore-tool-md5sum:1.0.4" + docker: "ubuntu:22.04" cpu: 1 memory: "512 MB" disks: "local-disk 10 HDD" diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index 158398dd5f..d2a2aedeea 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -78,7 +78,7 @@ from toil.jobStores.abstractJobStore import (AbstractJobStore, UnimplementedURLException, InvalidImportExportUrlException, LocatorException) from toil.lib.accelerators import get_individual_local_accelerators -from toil.lib.conversions import convert_units, human2bytes +from toil.lib.conversions import convert_units, human2bytes, VALID_PREFIXES from toil.lib.io import mkdtemp from toil.lib.memoize import memoize from toil.lib.misc import get_user_name @@ -90,6 +90,11 @@ logger = logging.getLogger(__name__) +class InsufficientMountDiskSpace(Exception): + def __init__(self, mount_targets: List[str], desired_bytes: int, available_bytes: int) -> None: + super().__init__("Not enough available disk space for the target mount points %s. Needed %d bytes but there is only %d available." + % (", ".join(mount_targets), desired_bytes, available_bytes)) + @contextmanager def wdl_error_reporter(task: str, exit: bool = False, log: Callable[[str], None] = logger.critical) -> Generator[None, None, None]: """ @@ -109,7 +114,8 @@ def wdl_error_reporter(task: str, exit: bool = False, log: Callable[[str], None] LocatorException, InvalidImportExportUrlException, UnimplementedURLException, - JobTooBigError + JobTooBigError, + InsufficientMountDiskSpace ) as e: # Don't expose tracebacks to the user for exceptions that may be expected log("Could not " + task + " because:") @@ -1915,43 +1921,87 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: memory_spec = human2bytes(memory_spec) runtime_memory = memory_spec + mount_spec: Dict[Optional[str], int] = dict() if runtime_bindings.has_binding('disks'): # Miniwdl doesn't have this, but we need to be able to parse things like: # local-disk 5 SSD # which would mean we need 5 GB space. Cromwell docs for this are at https://cromwell.readthedocs.io/en/stable/RuntimeAttributes/#disks # We ignore all disk types, and complain if the mount point is not `local-disk`. - disks_spec: str = runtime_bindings.resolve('disks').value - all_specs = disks_spec.split(',') - # Sum up the gigabytes in each disk specification - total_gb = 0 + disks_spec: Union[List[WDL.Value.String], str] = runtime_bindings.resolve('disks').value + if isinstance(disks_spec, list): + # SPEC says to use the first one + # the parser gives an array of WDL string objects + all_specs = [part.value for part in disks_spec] + else: + all_specs = disks_spec.split(',') + # Sum up the space in each disk specification + total_bytes: float = 0 for spec in all_specs: # Split up each spec as space-separated. We assume no fields # are empty, and we want to allow people to use spaces after # their commas when separating the list, like in Cromwell's # examples, so we strip whitespace. spec_parts = spec.strip().split(' ') - if len(spec_parts) != 3: - # TODO: Add a WDL line to this error - raise ValueError(f"Could not parse disks = {disks_spec} because {spec} does not have 3 space-separated parts") - if spec_parts[0] != 'local-disk': - # TODO: Add a WDL line to this error - raise NotImplementedError(f"Could not provide disks = {disks_spec} because only the local-disks mount point is implemented") - try: - total_gb += int(spec_parts[1]) - except: - # TODO: Add a WDL line to this error - raise ValueError(f"Could not parse disks = {disks_spec} because {spec_parts[1]} is not an integer") - # TODO: we always ignore the disk type and assume we have the right one. - # TODO: Cromwell rounds LOCAL disks up to the nearest 375 GB. I - # can't imagine that ever being standardized; just leave it - # alone so that the workflow doesn't rely on this weird and - # likely-to-change Cromwell detail. - if spec_parts[2] == 'LOCAL': - logger.warning('Not rounding LOCAL disk to the nearest 375 GB; workflow execution will differ from Cromwell!') - total_bytes: float = convert_units(total_gb, 'GB') - runtime_disk = int(total_bytes) + # First check that this is a format we support. Both the WDL spec and Cromwell allow a max 3-piece specification + # So if there are more than 3 pieces, raise an error + if len(spec_parts) > 3: + raise RuntimeError(f"Could not parse disks = {disks_spec} because {all_specs} contains more than 3 parts") + part_size = None + # default to GiB as per spec + part_suffix: str = "GiB" # The WDL spec's default is 1 GiB + # default to the execution directory + specified_mount_point = None + # first get the size, since units should always be some nonnumerical string, get the last numerical value + for i, part in enumerate(spec_parts): + if part.replace(".", "", 1).isdigit(): + part_size = int(float(part)) + spec_parts.pop(i) + break + # unit specification is only allowed to be at the end + if spec_parts[-1].lower() in VALID_PREFIXES: + part_suffix = spec_parts[-1] + spec_parts.pop(-1) + # The last remaining element, if it exists, is the mount point + if len(spec_parts) > 0: + specified_mount_point = spec_parts[0] + + if part_size is None: + # Disk spec did not include a size + raise ValueError(f"Could not parse disks = {disks_spec} because {spec} does not specify a disk size") + + + if part_suffix == "LOCAL": + # TODO: Cromwell rounds LOCAL disks up to the nearest 375 GB. I + # can't imagine that ever being standardized; just leave it + # alone so that the workflow doesn't rely on this weird and + # likely-to-change Cromwell detail. + logger.warning('Not rounding LOCAL disk to the nearest 375 GB; workflow execution will differ from Cromwell!') + elif part_suffix in ("HDD", "SSD"): + # For cromwell compatibility, assume this means GB in units + # We don't actually differentiate between HDD and SSD + part_suffix = "GB" + + per_part_size = convert_units(part_size, part_suffix) + total_bytes += per_part_size + if mount_spec.get(specified_mount_point) is not None: + if specified_mount_point is not None: + # raise an error as all mount points must be unique + raise ValueError(f"Could not parse disks = {disks_spec} because the mount point {specified_mount_point} is specified multiple times") + else: + if mount_spec.get(specified_mount_point) is not None: + raise ValueError(f"Could not parse disks = {disks_spec} because the mount point is omitted more than once") + # TODO: we always ignore the disk type and assume we have the right one. + if specified_mount_point != "local-disk": + # Don't mount local-disk. This isn't in the spec, but is carried over from cromwell + # When the mount point is omitted, default to the task's execution directory, which None will represent + mount_spec[specified_mount_point] = int(per_part_size) + else: + # local-disk is equivalent to an omitted mount point + mount_spec[None] = int(per_part_size) + runtime_disk = int(total_bytes) + if not runtime_bindings.has_binding("gpu") and self._task.effective_wdl_version in ('1.0', 'draft-2'): # For old WDL versions, guess whether the task wants GPUs if not specified. use_gpus = (runtime_bindings.has_binding('gpuCount') or @@ -1985,7 +2035,9 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: runtime_accelerators = [accelerator_requirement] # Schedule to get resources. Pass along the bindings from evaluating all the inputs and decls, and the runtime, with files virtualized. - run_job = WDLTaskJob(self._task, virtualize_files(bindings, standard_library), virtualize_files(runtime_bindings, standard_library), self._task_id, self._namespace, self._task_path, cores=runtime_cores or self.cores, memory=runtime_memory or self.memory, disk=runtime_disk or self.disk, accelerators=runtime_accelerators or self.accelerators, wdl_options=self._wdl_options) + run_job = WDLTaskJob(self._task, virtualize_files(bindings, standard_library), virtualize_files(runtime_bindings, standard_library), self._task_id, self._namespace, + self._task_path, mount_spec, cores=runtime_cores or self.cores, memory=runtime_memory or self.memory, disk=runtime_disk or self.disk, + accelerators=runtime_accelerators or self.accelerators, wdl_options=self._wdl_options) # Run that as a child self.addChild(run_job) @@ -2008,7 +2060,8 @@ class WDLTaskJob(WDLBaseJob): All bindings are in terms of task-internal names. """ - def __init__(self, task: WDL.Tree.Task, task_internal_bindings: Promised[WDLBindings], runtime_bindings: Promised[WDLBindings], task_id: List[str], namespace: str, task_path: str, **kwargs: Any) -> None: + def __init__(self, task: WDL.Tree.Task, task_internal_bindings: Promised[WDLBindings], runtime_bindings: Promised[WDLBindings], task_id: List[str], namespace: str, + task_path: str, mount_spec: Dict[Optional[str], int], **kwargs: Any) -> None: """ Make a new job to run a task. @@ -2032,6 +2085,7 @@ def __init__(self, task: WDL.Tree.Task, task_internal_bindings: Promised[WDLBind self._task_id = task_id self._namespace = namespace self._task_path = task_path + self._mount_spec = mount_spec ### # Runtime code injection system @@ -2211,6 +2265,58 @@ def can_mount_proc(self) -> bool: """ return "KUBERNETES_SERVICE_HOST" not in os.environ + def ensure_mount_point(self, file_store: AbstractFileStore, mount_spec: Dict[Optional[str], int]) -> Dict[str, str]: + """ + Ensure the mount point sources are available. + + Will check if the mount point source has the requested amount of space available. + + Note: We are depending on Toil's job scheduling backend to error when the sum of multiple mount points disk requests is greater than the total available + For example, if a task has two mount points request 100 GB each but there is only 100 GB available, the df check may pass + but Toil should fail to schedule the jobs internally + + :param mount_spec: Mount specification from the disks attribute in the WDL task. Is a dict where key is the mount point target and value is the size + :param file_store: File store to create a tmp directory for the mount point source + :return: Dict mapping mount point target to mount point source + """ + logger.debug("Detected mount specifications, creating mount points.") + mount_src_mapping = {} + # Create one tmpdir to encapsulate all mount point sources, each mount point will be associated with a subdirectory + tmpdir = file_store.getLocalTempDir() + + # The POSIX standard doesn't specify how to escape spaces in mount points and file system names + # The only defect of this regex is if the target mount point is the same format as the df output + # It is likely reliable enough to trust the user has not created a mount with a df output-like name + regex_df = re.compile(r".+ \d+ +\d+ +(\d+) +\d+% +.+") + total_mount_size = sum(mount_spec.values()) + try: + # Use arguments from the df POSIX standard + df_line = subprocess.check_output(["df", "-k", "-P", tmpdir], encoding="utf-8").split("\n")[1] + m = re.match(regex_df, df_line) + if m is None: + logger.debug("Output of df may be malformed: %s", df_line) + logger.warning("Unable to check disk requirements as output of 'df' command is malformed. Will assume storage is always available.") + else: + # Block size will always be 1024 + available_space = int(m[1]) * 1024 + if available_space < total_mount_size: + # We do not have enough space available for this mount point + # An omitted mount point is the task's execution directory so show that to the user instead + raise InsufficientMountDiskSpace([mount_point if mount_point is not None else "/mnt/miniwdl_task_container/work" for mount_point in mount_spec.keys()], + total_mount_size, available_space) + except subprocess.CalledProcessError as e: + # If df somehow isn't available + logger.debug("Unable to call df. stdout: %s stderr: %s", e.stdout, e.stderr) + logger.warning("Unable to check disk requirements as call to 'df' command failed. Will assume storage is always available.") + for mount_target in mount_spec.keys(): + # Create a new subdirectory for each mount point + source_location = os.path.join(tmpdir, str(uuid.uuid4())) + os.mkdir(source_location) + if mount_target is not None: + # None represents an omitted mount point, which will default to the task's work directory. MiniWDL's internals will mount the task's work directory by itself + mount_src_mapping[mount_target] = source_location + return mount_src_mapping + @report_wdl_errors("run task command", exit=True) def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: """ @@ -2224,6 +2330,9 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # We process nonexistent files in WDLTaskWrapperJob as those must be run locally, so don't try to devirtualize them standard_library = ToilWDLStdLibBase(file_store, self._task_path, enforce_existence=False) + # Create mount points and get a mapping of target mount points to locations on disk + mount_mapping = self.ensure_mount_point(file_store, self._mount_spec) + # Get the bindings from after the input section bindings = unwrap(self._task_internal_bindings) # And the bindings from evaluating the runtime section @@ -2385,8 +2494,51 @@ def patched_run_invocation(*args: Any, **kwargs: Any) -> List[str]: return command_line # Apply the patch - task_container._run_invocation = patched_run_invocation # type: ignore + task_container._run_invocation = patched_run_invocation # type: ignore + singularity_original_prepare_mounts = task_container.prepare_mounts + + def patch_prepare_mounts_singularity() -> List[Tuple[str, str, bool]]: + """ + Mount the mount points specified from the disk requirements. + + The singularity and docker patch are separate as they have different function signatures + """ + # todo: support AWS EBS/Kubernetes persistent volumes + # this logic likely only works for local clusters as we don't deal with the size of each mount point + mounts: List[Tuple[str, str, bool]] = singularity_original_prepare_mounts() + # todo: support AWS EBS/Kubernetes persistent volumes + # this logic likely only works for local clusters as we don't deal with the size of each mount point + for mount_point, source_location in mount_mapping.items(): + mounts.append((mount_point, source_location, True)) + return mounts + task_container.prepare_mounts = patch_prepare_mounts_singularity # type: ignore[method-assign] + elif isinstance(task_container, SwarmContainer): + docker_original_prepare_mounts = task_container.prepare_mounts + + try: + # miniwdl depends on docker so this should be available but check just in case + import docker + # docker stubs are still WIP: https://github.com/docker/docker-py/issues/2796 + from docker.types import Mount # type: ignore[import-untyped] + + def patch_prepare_mounts_docker(logger: logging.Logger) -> List[Mount]: + """ + Same as the singularity patch but for docker + """ + mounts: List[Mount] = docker_original_prepare_mounts(logger) + for mount_point, source_location in mount_mapping.items(): + mounts.append( + Mount( + mount_point.rstrip("/").replace("{{", '{{"{{"}}'), + source_location.rstrip("/").replace("{{", '{{"{{"}}'), + type="bind", + ) + ) + return mounts + task_container.prepare_mounts = patch_prepare_mounts_docker # type: ignore[method-assign] + except ImportError: + logger.warning("Docker package not installed. Unable to add mount points.") # Show the runtime info to the container task_container.process_runtime(miniwdl_logger, {binding.name: binding.value for binding in devirtualize_files(runtime_bindings, standard_library)})