From 3971be6e52fa4c03055e6ebab20dbbbdcfae5e32 Mon Sep 17 00:00:00 2001 From: stxue1 Date: Fri, 28 Jun 2024 17:27:17 -0700 Subject: [PATCH 01/21] better disk logic and add logic to mount specific points --- src/toil/wdl/wdltoil.py | 119 ++++++++++++++++++++++++++++++++-------- 1 file changed, 96 insertions(+), 23 deletions(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index 53d10c6ebf..782ddfe346 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -1745,40 +1745,67 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: memory_spec = human2bytes(memory_spec) runtime_memory = memory_spec + mount_spec: Dict[str, int] = dict() if runtime_bindings.has_binding('disks'): # Miniwdl doesn't have this, but we need to be able to parse things like: # local-disk 5 SSD # which would mean we need 5 GB space. Cromwell docs for this are at https://cromwell.readthedocs.io/en/stable/RuntimeAttributes/#disks # We ignore all disk types, and complain if the mount point is not `local-disk`. - disks_spec: str = runtime_bindings.resolve('disks').value - all_specs = disks_spec.split(',') + disks_spec: Union[List[str], str] = runtime_bindings.resolve('disks').value + if isinstance(disks_spec, list): + # SPEC says to use the first one + all_specs = disks_spec + else: + all_specs = disks_spec.split(',') # Sum up the gigabytes in each disk specification - total_gb = 0 + total_bytes: float = 0 for spec in all_specs: # Split up each spec as space-separated. We assume no fields # are empty, and we want to allow people to use spaces after # their commas when separating the list, like in Cromwell's # examples, so we strip whitespace. spec_parts = spec.strip().split(' ') - if len(spec_parts) != 3: - # TODO: Add a WDL line to this error - raise ValueError(f"Could not parse disks = {disks_spec} because {spec} does not have 3 space-separated parts") - if spec_parts[0] != 'local-disk': - # TODO: Add a WDL line to this error - raise NotImplementedError(f"Could not provide disks = {disks_spec} because only the local-disks mount point is implemented") - try: - total_gb += int(spec_parts[1]) - except: - # TODO: Add a WDL line to this error - raise ValueError(f"Could not parse disks = {disks_spec} because {spec_parts[1]} is not an integer") + part_size = None + # default to GiB as per spec + part_suffix: str = "GiB" + # default to the execution directory + part_mount_point: str = self._wdl_options.get("execution_dir") or os.getcwd() + for i, part in enumerate(spec_parts): + if part.isnumeric(): + part_size = int(part) + continue + if i == 0: + # mount point is always the first + part_mount_point = part + continue + if part_size is not None: + # suffix will always be after the size, if it exists + part_suffix = part + continue + + if part_size is None: + # Disk spec did not include a size + raise ValueError(f"Could not parse disks = {disks_spec} because {spec} does not specify a disk size") + + per_part_size = convert_units(part_size, part_suffix) + total_bytes += per_part_size + if mount_spec.get(part_mount_point) is not None: + # raise an error as all mount points must be unique + raise ValueError(f"Could not parse disks = {disks_spec} because the mount point {part_mount_point} is specified multiple times") + # TODO: we always ignore the disk type and assume we have the right one. - # TODO: Cromwell rounds LOCAL disks up to the nearest 375 GB. I - # can't imagine that ever being standardized; just leave it - # alone so that the workflow doesn't rely on this weird and - # likely-to-change Cromwell detail. - if spec_parts[2] == 'LOCAL': + mount_spec[part_mount_point] = int(per_part_size) + + if not os.path.exists(part_mount_point): + # this isn't a valid mount point + raise NotImplementedError(f"Cannot use mount point {part_mount_point} as it does not exist") + + if part_suffix == "LOCAL": + # TODO: Cromwell rounds LOCAL disks up to the nearest 375 GB. I + # can't imagine that ever being standardized; just leave it + # alone so that the workflow doesn't rely on this weird and + # likely-to-change Cromwell detail. logger.warning('Not rounding LOCAL disk to the nearest 375 GB; workflow execution will differ from Cromwell!') - total_bytes: float = convert_units(total_gb, 'GB') runtime_disk = int(total_bytes) # The gpu field is the WDL 1.1 standard, so this field will be the absolute truth on whether to use GPUs or not @@ -1804,7 +1831,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: runtime_accelerators = [accelerator_requirement] # Schedule to get resources. Pass along the bindings from evaluating all the inputs and decls, and the runtime, with files virtualized. - run_job = WDLTaskJob(self._task, virtualize_files(bindings, standard_library), virtualize_files(runtime_bindings, standard_library), self._task_id, self._namespace, self._task_path, cores=runtime_cores or self.cores, memory=runtime_memory or self.memory, disk=runtime_disk or self.disk, accelerators=runtime_accelerators or self.accelerators, wdl_options=self._wdl_options) + run_job = WDLTaskJob(self._task, virtualize_files(bindings, standard_library), virtualize_files(runtime_bindings, standard_library), self._task_id, self._namespace, self._task_path, mount_spec, cores=runtime_cores or self.cores, memory=runtime_memory or self.memory, disk=runtime_disk or self.disk, accelerators=runtime_accelerators or self.accelerators, wdl_options=self._wdl_options) # Run that as a child self.addChild(run_job) @@ -1827,7 +1854,7 @@ class WDLTaskJob(WDLBaseJob): All bindings are in terms of task-internal names. """ - def __init__(self, task: WDL.Tree.Task, task_internal_bindings: Promised[WDLBindings], runtime_bindings: Promised[WDLBindings], task_id: List[str], namespace: str, task_path: str, **kwargs: Any) -> None: + def __init__(self, task: WDL.Tree.Task, task_internal_bindings: Promised[WDLBindings], runtime_bindings: Promised[WDLBindings], task_id: List[str], namespace: str, task_path: str, mount_spec: Dict[str, int], **kwargs: Any) -> None: """ Make a new job to run a task. @@ -1851,6 +1878,7 @@ def __init__(self, task: WDL.Tree.Task, task_internal_bindings: Promised[WDLBind self._task_id = task_id self._namespace = namespace self._task_path = task_path + self._mount_spec = mount_spec ### # Runtime code injection system @@ -2202,8 +2230,53 @@ def patched_run_invocation(*args: Any, **kwargs: Any) -> List[str]: return command_line # Apply the patch - task_container._run_invocation = patched_run_invocation # type: ignore + task_container._run_invocation = patched_run_invocation # type: ignore + + singularity_original_prepare_mounts = task_container.prepare_mounts + + def patch_prepare_mounts_singularity() -> List[Tuple[str, str, bool]]: + """ + Mount the mount points specified from the disk requirements. + + The singularity and docker patch are separate as they have different function signatures + """ + # todo: support AWS EBS/Kubernetes persistent volumes + # this logic likely only works for local clusters as we don't deal with the size of each mount point + mounts: List[Tuple[str, str, bool]] = singularity_original_prepare_mounts() + # todo: support AWS EBS/Kubernetes persistent volumes + # this logic likely only works for local clusters as we don't deal with the size of each mount point + for mount_point, _ in self._mount_spec.items(): + abs_mount_point = os.path.abspath(mount_point) + mounts.append((abs_mount_point, abs_mount_point, True)) + return mounts + task_container.prepare_mounts = patch_prepare_mounts_singularity # type: ignore[method-assign] + elif isinstance(task_container, SwarmContainer): + docker_original_prepare_mounts = task_container.prepare_mounts + try: + # miniwdl depends on docker so this should be available but check just in case + import docker + # docker stubs are still WIP: https://github.com/docker/docker-py/issues/2796 + from docker.types import Mount # type: ignore[import-untyped] + + def patch_prepare_mounts_docker(logger: logging.Logger) -> List[Mount]: + """ + Same as the singularity patch but for docker + """ + mounts: List[Mount] = docker_original_prepare_mounts(logger) + for mount_point, _ in self._mount_spec.items(): + abs_mount_point = os.path.abspath(mount_point) + mounts.append( + Mount( + abs_mount_point.rstrip("/").replace("{{", '{{"{{"}}'), + abs_mount_point.rstrip("/").replace("{{", '{{"{{"}}'), + type="bind", + ) + ) + return mounts + task_container.prepare_mounts = patch_prepare_mounts_docker # type: ignore[method-assign] + except ImportError: + logger.warning("Docker package not installed. Unable to add mount points.") # Show the runtime info to the container task_container.process_runtime(miniwdl_logger, {binding.name: binding.value for binding in devirtualize_files(runtime_bindings, standard_library)}) From 48990d0b930c6e2562348ea7989aa32e162a5e1e Mon Sep 17 00:00:00 2001 From: stxue1 Date: Fri, 28 Jun 2024 17:32:55 -0700 Subject: [PATCH 02/21] cromwell compatibility --- src/toil/wdl/wdltoil.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index 782ddfe346..d9ef19ef8a 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -1794,7 +1794,9 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: raise ValueError(f"Could not parse disks = {disks_spec} because the mount point {part_mount_point} is specified multiple times") # TODO: we always ignore the disk type and assume we have the right one. - mount_spec[part_mount_point] = int(per_part_size) + if part_mount_point != "local-disk": + # Don't mount local-disk. This isn't in the spec, but is carried over from cromwell + mount_spec[part_mount_point] = int(per_part_size) if not os.path.exists(part_mount_point): # this isn't a valid mount point From e8d223c7c67b57a757a5b0de10f75d275bf90f52 Mon Sep 17 00:00:00 2001 From: stxue1 Date: Fri, 28 Jun 2024 17:43:18 -0700 Subject: [PATCH 03/21] Convert from wdl string to normal string --- src/toil/wdl/wdltoil.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index d9ef19ef8a..6496e89fdc 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -1754,7 +1754,8 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: disks_spec: Union[List[str], str] = runtime_bindings.resolve('disks').value if isinstance(disks_spec, list): # SPEC says to use the first one - all_specs = disks_spec + # the parser gives an array of WDL string objects + all_specs = [part.value for part in disks_spec] else: all_specs = disks_spec.split(',') # Sum up the gigabytes in each disk specification From 8f7199aa2d3c3426f567b2ef06ba0f0f6e3646e2 Mon Sep 17 00:00:00 2001 From: stxue1 Date: Sun, 30 Jun 2024 19:13:17 -0700 Subject: [PATCH 04/21] floats --- src/toil/wdl/wdltoil.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index 6496e89fdc..41790dd667 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -1772,8 +1772,9 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # default to the execution directory part_mount_point: str = self._wdl_options.get("execution_dir") or os.getcwd() for i, part in enumerate(spec_parts): - if part.isnumeric(): - part_size = int(part) + if part.replace(".", "", 1).isdigit(): + # round down floats + part_size = int(float(part)) continue if i == 0: # mount point is always the first From d7719b94afd21f0685d57eb15fed55991bdc6cc2 Mon Sep 17 00:00:00 2001 From: stxue1 Date: Mon, 1 Jul 2024 12:38:12 -0700 Subject: [PATCH 05/21] Satisfy mypy --- src/toil/wdl/wdltoil.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index 41790dd667..ee940269e4 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -1751,7 +1751,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # local-disk 5 SSD # which would mean we need 5 GB space. Cromwell docs for this are at https://cromwell.readthedocs.io/en/stable/RuntimeAttributes/#disks # We ignore all disk types, and complain if the mount point is not `local-disk`. - disks_spec: Union[List[str], str] = runtime_bindings.resolve('disks').value + disks_spec: Union[List[WDL.Value.String], str] = runtime_bindings.resolve('disks').value if isinstance(disks_spec, list): # SPEC says to use the first one # the parser gives an array of WDL string objects From ea19cb61ef732efb35e967f6dace8c4d7454a900 Mon Sep 17 00:00:00 2001 From: stxue1 Date: Fri, 23 Aug 2024 00:23:03 -0700 Subject: [PATCH 06/21] Follow new spec --- src/toil/wdl/wdltoil.py | 84 ++++++++++++++++++++++++++++++----------- 1 file changed, 62 insertions(+), 22 deletions(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index 23e7eccf8b..4078693000 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -88,6 +88,11 @@ logger = logging.getLogger(__name__) +class InsufficientMountDiskSpace(Exception): + def __init__(self, mount_target: str, desired_bytes, available_bytes): + super().__init__("Not enough available disk space for the target mount point %s. Needed %d bytes but there is only %d available." + % (mount_target, desired_bytes, available_bytes)) + @contextmanager def wdl_error_reporter(task: str, exit: bool = False, log: Callable[[str], None] = logger.critical) -> Generator[None, None, None]: """ @@ -1928,7 +1933,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # default to GiB as per spec part_suffix: str = "GiB" # default to the execution directory - part_mount_point: str = self._wdl_options.get("execution_dir") or os.getcwd() + specified_mount_point = None for i, part in enumerate(spec_parts): if part.replace(".", "", 1).isdigit(): # round down floats @@ -1936,7 +1941,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: continue if i == 0: # mount point is always the first - part_mount_point = part + specified_mount_point = part continue if part_size is not None: # suffix will always be after the size, if it exists @@ -1949,18 +1954,15 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: per_part_size = convert_units(part_size, part_suffix) total_bytes += per_part_size - if mount_spec.get(part_mount_point) is not None: - # raise an error as all mount points must be unique - raise ValueError(f"Could not parse disks = {disks_spec} because the mount point {part_mount_point} is specified multiple times") - - # TODO: we always ignore the disk type and assume we have the right one. - if part_mount_point != "local-disk": - # Don't mount local-disk. This isn't in the spec, but is carried over from cromwell - mount_spec[part_mount_point] = int(per_part_size) + if specified_mount_point is not None: + if mount_spec.get(specified_mount_point) is not None: + # raise an error as all mount points must be unique + raise ValueError(f"Could not parse disks = {disks_spec} because the mount point {specified_mount_point} is specified multiple times") - if not os.path.exists(part_mount_point): - # this isn't a valid mount point - raise NotImplementedError(f"Cannot use mount point {part_mount_point} as it does not exist") + # TODO: we always ignore the disk type and assume we have the right one. + if specified_mount_point != "local-disk": + # Don't mount local-disk. This isn't in the spec, but is carried over from cromwell + mount_spec[specified_mount_point] = int(per_part_size) if part_suffix == "LOCAL": # TODO: Cromwell rounds LOCAL disks up to the nearest 375 GB. I @@ -1968,8 +1970,6 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # alone so that the workflow doesn't rely on this weird and # likely-to-change Cromwell detail. logger.warning('Not rounding LOCAL disk to the nearest 375 GB; workflow execution will differ from Cromwell!') - runtime_disk = int(total_bytes) - if not runtime_bindings.has_binding("gpu") and self._task.effective_wdl_version in ('1.0', 'draft-2'): # For old WDL versions, guess whether the task wants GPUs if not specified. @@ -2230,6 +2230,45 @@ def can_mount_proc(self) -> bool: """ return "KUBERNETES_SERVICE_HOST" not in os.environ + def ensure_mount_point(self, file_store: AbstractFileStore, mount_spec: Dict[str, int]) -> Dict[str, str]: + """ + Ensure the mount point sources are available. + + Will check if the mount point source has the requested amount of space available. + + Note: We are depending on Toil's job scheduling backend to error when the sum of multiple mount points disk requests is greater than the total available + For example, if a task has two mount points request 100 GB each but there is only 100 GB available, the df check may pass + but Toil should fail to schedule the jobs internally + + :param mount_spec: Mount specification from the disks attribute in the WDL task. Is a dict where key is the mount point target and value is the size + :param file_store: File store to create a tmp directory for the mount point source + :return: Dict mapping mount point target to mount point source + """ + logger.debug("Detected mount specifications, creating mount points.") + mount_src_mapping = {} + # Create one tmpdir to encapsulate all mount point sources, each mount point will be associated with a subdirectory + tmpdir = file_store.getLocalTempDir() + + # The POSIX standard doesn't specify how to escape spaces in mount points and file system names + # The only defect of this regex is if the target mount point is the same format as the df output + # It is likely reliable enough to trust the user has not created a mount with a df output-like name + regex_df = re.compile(r".+ \d+ +\d+ +(\d+) +\d+% +.+") + for mount_target, mount_size in mount_spec.items(): + # Use arguments from the df POSIX standard + df_line = subprocess.check_output(["df", "-k", "-P", tmpdir], encoding="utf-8").split("\n")[1] + m = re.match(regex_df, df_line) + # Block size will always be 1024 + available_space = int(m[1]) * 1024 + if available_space < mount_size: + # We do not have enough space available for this mount point + raise InsufficientMountDiskSpace(mount_target, mount_size, available_space) + + # Create a new subdirectory for each mount point + source_location = os.path.join(tmpdir, str(uuid.uuid4())) + os.mkdir(source_location) + mount_src_mapping[mount_target] = source_location + return mount_src_mapping + @report_wdl_errors("run task command", exit=True) def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: """ @@ -2238,6 +2277,9 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: super().run(file_store) logger.info("Running task command for %s (%s) called as %s", self._task.name, self._task_id, self._namespace) + # Create mount points and get a mapping of target mount points to locations on disk + mount_mapping = self.ensure_mount_point(file_store, self._mount_spec) + # Set up the WDL standard library # UUID to use for virtualizing files # We process nonexistent files in WDLTaskWrapperJob as those must be run locally, so don't try to devirtualize them @@ -2419,9 +2461,8 @@ def patch_prepare_mounts_singularity() -> List[Tuple[str, str, bool]]: mounts: List[Tuple[str, str, bool]] = singularity_original_prepare_mounts() # todo: support AWS EBS/Kubernetes persistent volumes # this logic likely only works for local clusters as we don't deal with the size of each mount point - for mount_point, _ in self._mount_spec.items(): - abs_mount_point = os.path.abspath(mount_point) - mounts.append((abs_mount_point, abs_mount_point, True)) + for mount_point, source_location in mount_mapping.items(): + mounts.append((mount_point, source_location, True)) return mounts task_container.prepare_mounts = patch_prepare_mounts_singularity # type: ignore[method-assign] elif isinstance(task_container, SwarmContainer): @@ -2438,12 +2479,11 @@ def patch_prepare_mounts_docker(logger: logging.Logger) -> List[Mount]: Same as the singularity patch but for docker """ mounts: List[Mount] = docker_original_prepare_mounts(logger) - for mount_point, _ in self._mount_spec.items(): - abs_mount_point = os.path.abspath(mount_point) + for mount_point, source_location in mount_mapping.items(): mounts.append( Mount( - abs_mount_point.rstrip("/").replace("{{", '{{"{{"}}'), - abs_mount_point.rstrip("/").replace("{{", '{{"{{"}}'), + mount_point.rstrip("/").replace("{{", '{{"{{"}}'), + source_location.rstrip("/").replace("{{", '{{"{{"}}'), type="bind", ) ) From 32c65dd416885ad42b542c224298c2c5478fab12 Mon Sep 17 00:00:00 2001 From: stxue1 Date: Fri, 23 Aug 2024 01:03:03 -0700 Subject: [PATCH 07/21] mypy --- src/toil/wdl/wdltoil.py | 34 ++++++++++++++++++++++------------ 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index 4078693000..851821214c 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -89,7 +89,7 @@ class InsufficientMountDiskSpace(Exception): - def __init__(self, mount_target: str, desired_bytes, available_bytes): + def __init__(self, mount_target: str, desired_bytes: int, available_bytes: int) -> None: super().__init__("Not enough available disk space for the target mount point %s. Needed %d bytes but there is only %d available." % (mount_target, desired_bytes, available_bytes)) @@ -112,7 +112,8 @@ def wdl_error_reporter(task: str, exit: bool = False, log: Callable[[str], None] LocatorException, InvalidImportExportUrlException, UnimplementedURLException, - JobTooBigError + JobTooBigError, + InsufficientMountDiskSpace ) as e: # Don't expose tracebacks to the user for exceptions that may be expected log("Could not " + task + " because:") @@ -2253,16 +2254,25 @@ def ensure_mount_point(self, file_store: AbstractFileStore, mount_spec: Dict[str # The only defect of this regex is if the target mount point is the same format as the df output # It is likely reliable enough to trust the user has not created a mount with a df output-like name regex_df = re.compile(r".+ \d+ +\d+ +(\d+) +\d+% +.+") - for mount_target, mount_size in mount_spec.items(): - # Use arguments from the df POSIX standard - df_line = subprocess.check_output(["df", "-k", "-P", tmpdir], encoding="utf-8").split("\n")[1] - m = re.match(regex_df, df_line) - # Block size will always be 1024 - available_space = int(m[1]) * 1024 - if available_space < mount_size: - # We do not have enough space available for this mount point - raise InsufficientMountDiskSpace(mount_target, mount_size, available_space) - + try: + for mount_target, mount_size in mount_spec.items(): + # Use arguments from the df POSIX standard + df_line = subprocess.check_output(["df", "-k", "-P", tmpdir], encoding="utf-8").split("\n")[1] + m = re.match(regex_df, df_line) + if m is None: + logger.debug("Output of df may be malformed: %s", df_line) + logger.warning("Unable to check disk requirements as output of 'df' command is malformed. Will assume storage is always available.") + continue + # Block size will always be 1024 + available_space = int(m[1]) * 1024 + if available_space < mount_size: + # We do not have enough space available for this mount point + raise InsufficientMountDiskSpace(mount_target, mount_size, available_space) + except subprocess.CalledProcessError as e: + # If df somehow isn't available + logger.debug("Unable to call df. stdout: %s stderr: %s", e.stdout, e.stderr) + logger.warning("Unable to check disk requirements as call to 'df' command failed. Will assume storage is always available.") + for mount_target in mount_spec.keys(): # Create a new subdirectory for each mount point source_location = os.path.join(tmpdir, str(uuid.uuid4())) os.mkdir(source_location) From a1a8651def9b6c9746c3963b3789286283025cc9 Mon Sep 17 00:00:00 2001 From: stxue1 Date: Fri, 23 Aug 2024 12:57:51 -0700 Subject: [PATCH 08/21] Support cromwell disks attributes for backwards compatibility --- src/toil/wdl/wdltoil.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index 851821214c..724279def5 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -1953,6 +1953,18 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # Disk spec did not include a size raise ValueError(f"Could not parse disks = {disks_spec} because {spec} does not specify a disk size") + + if part_suffix == "LOCAL": + # TODO: Cromwell rounds LOCAL disks up to the nearest 375 GB. I + # can't imagine that ever being standardized; just leave it + # alone so that the workflow doesn't rely on this weird and + # likely-to-change Cromwell detail. + logger.warning('Not rounding LOCAL disk to the nearest 375 GB; workflow execution will differ from Cromwell!') + elif part_suffix in ("HDD", "SSD"): + # For cromwell compatibility, assume this means GB in units + # We don't actually differentiate between HDD and SSD + part_suffix = "GB" + per_part_size = convert_units(part_size, part_suffix) total_bytes += per_part_size if specified_mount_point is not None: @@ -1964,13 +1976,6 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: if specified_mount_point != "local-disk": # Don't mount local-disk. This isn't in the spec, but is carried over from cromwell mount_spec[specified_mount_point] = int(per_part_size) - - if part_suffix == "LOCAL": - # TODO: Cromwell rounds LOCAL disks up to the nearest 375 GB. I - # can't imagine that ever being standardized; just leave it - # alone so that the workflow doesn't rely on this weird and - # likely-to-change Cromwell detail. - logger.warning('Not rounding LOCAL disk to the nearest 375 GB; workflow execution will differ from Cromwell!') if not runtime_bindings.has_binding("gpu") and self._task.effective_wdl_version in ('1.0', 'draft-2'): # For old WDL versions, guess whether the task wants GPUs if not specified. From 29ffd3f14dada828427406aac8f2f4374ee57827 Mon Sep 17 00:00:00 2001 From: stxue1 Date: Fri, 23 Aug 2024 12:57:58 -0700 Subject: [PATCH 09/21] Deal with pipes deprecation --- src/toil/provisioners/node.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/toil/provisioners/node.py b/src/toil/provisioners/node.py index 9b4f304430..dc025370c1 100644 --- a/src/toil/provisioners/node.py +++ b/src/toil/provisioners/node.py @@ -13,7 +13,7 @@ # limitations under the License. import datetime import logging -import pipes +from shlex import quote import socket import subprocess import time @@ -279,7 +279,7 @@ def coreSSH(self, *args, **kwargs): commandTokens += ['docker', 'exec', '-i', ttyFlag, 'toil_leader'] logger.debug('Node %s: %s', self.effectiveIP, ' '.join(args)) - args = list(map(pipes.quote, args)) + args = list(map(quote, args)) commandTokens += args logger.debug('Full command %s', ' '.join(commandTokens)) process = subprocess.Popen(commandTokens, **kwargs) From 70688108af1745a64d8ed61548752bef422c20c2 Mon Sep 17 00:00:00 2001 From: stxue1 Date: Fri, 23 Aug 2024 12:58:36 -0700 Subject: [PATCH 10/21] Update md5sum test to be compatible with newer docker/singularity versions --- src/toil/test/wdl/md5sum/md5sum.1.0.wdl | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/toil/test/wdl/md5sum/md5sum.1.0.wdl b/src/toil/test/wdl/md5sum/md5sum.1.0.wdl index f5fc41698d..3d0fbea047 100644 --- a/src/toil/test/wdl/md5sum/md5sum.1.0.wdl +++ b/src/toil/test/wdl/md5sum/md5sum.1.0.wdl @@ -4,16 +4,17 @@ task md5 { input { File inputFile } - command { - /bin/my_md5sum ${inputFile} - } + command <<< + set -euf -o pipefail + md5sum ~{inputFile} | awk '{print $1}' > md5sum.txt + >>> output { File value = "md5sum.txt" } runtime { - docker: "quay.io/briandoconnor/dockstore-tool-md5sum:1.0.4" + docker: "ubuntu:22.04" cpu: 1 memory: "512 MB" disks: "local-disk 10 HDD" From 1a098b4ac0a6dfdafe63b7d8678c4298644e36d2 Mon Sep 17 00:00:00 2001 From: stxue1 Date: Tue, 17 Sep 2024 12:34:02 -0700 Subject: [PATCH 11/21] Address comments --- src/toil/wdl/wdltoil.py | 100 ++++++++++++++++++++++++---------------- 1 file changed, 60 insertions(+), 40 deletions(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index 2a396811bf..ec0c4ed631 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -78,7 +78,7 @@ from toil.jobStores.abstractJobStore import (AbstractJobStore, UnimplementedURLException, InvalidImportExportUrlException, LocatorException) from toil.lib.accelerators import get_individual_local_accelerators -from toil.lib.conversions import convert_units, human2bytes +from toil.lib.conversions import convert_units, human2bytes, VALID_PREFIXES from toil.lib.io import mkdtemp from toil.lib.memoize import memoize from toil.lib.misc import get_user_name @@ -91,9 +91,9 @@ class InsufficientMountDiskSpace(Exception): - def __init__(self, mount_target: str, desired_bytes: int, available_bytes: int) -> None: - super().__init__("Not enough available disk space for the target mount point %s. Needed %d bytes but there is only %d available." - % (mount_target, desired_bytes, available_bytes)) + def __init__(self, mount_targets: List[str], desired_bytes: int, available_bytes: int) -> None: + super().__init__("Not enough available disk space for the target mount points %s. Needed %d bytes but there is only %d available." + % (", ".join(mount_targets), desired_bytes, available_bytes)) @contextmanager def wdl_error_reporter(task: str, exit: bool = False, log: Callable[[str], None] = logger.critical) -> Generator[None, None, None]: @@ -1921,7 +1921,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: memory_spec = human2bytes(memory_spec) runtime_memory = memory_spec - mount_spec: Dict[str, int] = dict() + mount_spec: Dict[Optional[str], int] = dict() if runtime_bindings.has_binding('disks'): # Miniwdl doesn't have this, but we need to be able to parse things like: # local-disk 5 SSD @@ -1942,24 +1942,29 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # their commas when separating the list, like in Cromwell's # examples, so we strip whitespace. spec_parts = spec.strip().split(' ') + + # First check that this is a format we support. Both the WDL spec and Cromwell allow a max 3-piece specification + # So if there are more than 3 pieces, raise an error + if len(spec_parts) > 3: + raise RuntimeError(f"Could not parse disks = {disks_spec} because {all_specs} contains more than 3 parts") part_size = None # default to GiB as per spec - part_suffix: str = "GiB" + part_suffix: str = "GiB" # The WDL spec's default is 1 GiB # default to the execution directory specified_mount_point = None + # first get the size, since units should always be some nonnumerical string, get the last numerical value for i, part in enumerate(spec_parts): if part.replace(".", "", 1).isdigit(): - # round down floats part_size = int(float(part)) - continue - if i == 0: - # mount point is always the first - specified_mount_point = part - continue - if part_size is not None: - # suffix will always be after the size, if it exists - part_suffix = part - continue + spec_parts.pop(i) + break + # unit specification is only allowed to be at the end + if spec_parts[-1].lower() in VALID_PREFIXES: + part_suffix = spec_parts[-1] + spec_parts.pop(-1) + # The last remaining element, if it exists, is the mount point + if len(spec_parts) > 0: + specified_mount_point = spec_parts[0] if part_size is None: # Disk spec did not include a size @@ -1979,15 +1984,23 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: per_part_size = convert_units(part_size, part_suffix) total_bytes += per_part_size - if specified_mount_point is not None: - if mount_spec.get(specified_mount_point) is not None: + if mount_spec.get(specified_mount_point) is not None: + if specified_mount_point is not None: # raise an error as all mount points must be unique raise ValueError(f"Could not parse disks = {disks_spec} because the mount point {specified_mount_point} is specified multiple times") - - # TODO: we always ignore the disk type and assume we have the right one. - if specified_mount_point != "local-disk": - # Don't mount local-disk. This isn't in the spec, but is carried over from cromwell - mount_spec[specified_mount_point] = int(per_part_size) + else: + if mount_spec.get(specified_mount_point) is not None: + raise ValueError(f"Could not parse disks = {disks_spec} because the mount point is omitted more than once") + + # TODO: we always ignore the disk type and assume we have the right one. + if specified_mount_point != "local-disk": + # Don't mount local-disk. This isn't in the spec, but is carried over from cromwell + # When the mount point is omitted, default to the task's execution directory, which None will represent + mount_spec[specified_mount_point] = int(per_part_size) + else: + # local-disk is equivalent to an omitted mount point + mount_spec[None] = int(per_part_size) + runtime_disk = int(total_bytes) if not runtime_bindings.has_binding("gpu") and self._task.effective_wdl_version in ('1.0', 'draft-2'): # For old WDL versions, guess whether the task wants GPUs if not specified. @@ -2022,7 +2035,9 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: runtime_accelerators = [accelerator_requirement] # Schedule to get resources. Pass along the bindings from evaluating all the inputs and decls, and the runtime, with files virtualized. - run_job = WDLTaskJob(self._task, virtualize_files(bindings, standard_library), virtualize_files(runtime_bindings, standard_library), self._task_id, self._namespace, self._task_path, mount_spec, cores=runtime_cores or self.cores, memory=runtime_memory or self.memory, disk=runtime_disk or self.disk, accelerators=runtime_accelerators or self.accelerators, wdl_options=self._wdl_options) + run_job = WDLTaskJob(self._task, virtualize_files(bindings, standard_library), virtualize_files(runtime_bindings, standard_library), self._task_id, self._namespace, + self._task_path, mount_spec, cores=runtime_cores or self.cores, memory=runtime_memory or self.memory, disk=runtime_disk or self.disk, + accelerators=runtime_accelerators or self.accelerators, wdl_options=self._wdl_options) # Run that as a child self.addChild(run_job) @@ -2045,7 +2060,8 @@ class WDLTaskJob(WDLBaseJob): All bindings are in terms of task-internal names. """ - def __init__(self, task: WDL.Tree.Task, task_internal_bindings: Promised[WDLBindings], runtime_bindings: Promised[WDLBindings], task_id: List[str], namespace: str, task_path: str, mount_spec: Dict[str, int], **kwargs: Any) -> None: + def __init__(self, task: WDL.Tree.Task, task_internal_bindings: Promised[WDLBindings], runtime_bindings: Promised[WDLBindings], task_id: List[str], namespace: str, + task_path: str, mount_spec: Dict[Optional[str], int], **kwargs: Any) -> None: """ Make a new job to run a task. @@ -2248,7 +2264,7 @@ def can_mount_proc(self) -> bool: """ return "KUBERNETES_SERVICE_HOST" not in os.environ - def ensure_mount_point(self, file_store: AbstractFileStore, mount_spec: Dict[str, int]) -> Dict[str, str]: + def ensure_mount_point(self, file_store: AbstractFileStore, mount_spec: Dict[Optional[str], int]) -> Dict[str, str]: """ Ensure the mount point sources are available. @@ -2271,20 +2287,22 @@ def ensure_mount_point(self, file_store: AbstractFileStore, mount_spec: Dict[str # The only defect of this regex is if the target mount point is the same format as the df output # It is likely reliable enough to trust the user has not created a mount with a df output-like name regex_df = re.compile(r".+ \d+ +\d+ +(\d+) +\d+% +.+") + total_mount_size = sum(mount_spec.values()) try: - for mount_target, mount_size in mount_spec.items(): - # Use arguments from the df POSIX standard - df_line = subprocess.check_output(["df", "-k", "-P", tmpdir], encoding="utf-8").split("\n")[1] - m = re.match(regex_df, df_line) - if m is None: - logger.debug("Output of df may be malformed: %s", df_line) - logger.warning("Unable to check disk requirements as output of 'df' command is malformed. Will assume storage is always available.") - continue + # Use arguments from the df POSIX standard + df_line = subprocess.check_output(["df", "-k", "-P", tmpdir], encoding="utf-8").split("\n")[1] + m = re.match(regex_df, df_line) + if m is None: + logger.debug("Output of df may be malformed: %s", df_line) + logger.warning("Unable to check disk requirements as output of 'df' command is malformed. Will assume storage is always available.") + else: # Block size will always be 1024 available_space = int(m[1]) * 1024 - if available_space < mount_size: + if available_space < total_mount_size: # We do not have enough space available for this mount point - raise InsufficientMountDiskSpace(mount_target, mount_size, available_space) + # An omitted mount point is the task's execution directory so show that to the user instead + raise InsufficientMountDiskSpace([mount_point if mount_point is not None else "/mnt/miniwdl_task_container/work" for mount_point in mount_spec.keys()], + total_mount_size, available_space) except subprocess.CalledProcessError as e: # If df somehow isn't available logger.debug("Unable to call df. stdout: %s stderr: %s", e.stdout, e.stderr) @@ -2293,7 +2311,9 @@ def ensure_mount_point(self, file_store: AbstractFileStore, mount_spec: Dict[str # Create a new subdirectory for each mount point source_location = os.path.join(tmpdir, str(uuid.uuid4())) os.mkdir(source_location) - mount_src_mapping[mount_target] = source_location + if mount_target is not None: + # None represents an omitted mount point, which will default to the task's work directory. MiniWDL's internals will mount the task's work directory by itself + mount_src_mapping[mount_target] = source_location return mount_src_mapping @report_wdl_errors("run task command", exit=True) @@ -2304,14 +2324,14 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: super().run(file_store) logger.info("Running task command for %s (%s) called as %s", self._task.name, self._task_id, self._namespace) - # Create mount points and get a mapping of target mount points to locations on disk - mount_mapping = self.ensure_mount_point(file_store, self._mount_spec) - # Set up the WDL standard library # UUID to use for virtualizing files # We process nonexistent files in WDLTaskWrapperJob as those must be run locally, so don't try to devirtualize them standard_library = ToilWDLStdLibBase(file_store, self._task_path, enforce_existence=False) + # Create mount points and get a mapping of target mount points to locations on disk + mount_mapping = self.ensure_mount_point(file_store, self._mount_spec) + # Get the bindings from after the input section bindings = unwrap(self._task_internal_bindings) # And the bindings from evaluating the runtime section From 839e09b946a9025f469e8c1cb87b98cca2ccd8ef Mon Sep 17 00:00:00 2001 From: stxue1 <122345910+stxue1@users.noreply.github.com> Date: Tue, 17 Sep 2024 12:35:25 -0700 Subject: [PATCH 12/21] Update src/toil/wdl/wdltoil.py Co-authored-by: Adam Novak --- src/toil/wdl/wdltoil.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index f0f1870a9e..d2a2aedeea 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -1934,7 +1934,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: all_specs = [part.value for part in disks_spec] else: all_specs = disks_spec.split(',') - # Sum up the gigabytes in each disk specification + # Sum up the space in each disk specification total_bytes: float = 0 for spec in all_specs: # Split up each spec as space-separated. We assume no fields From bf3ca2ab9d22ed867623308be013080329b09c96 Mon Sep 17 00:00:00 2001 From: stxue1 Date: Tue, 24 Sep 2024 12:39:53 -0700 Subject: [PATCH 13/21] Fix missing reverse iteration loop and make local-disk disambiguation clearer --- src/toil/wdl/wdltoil.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index e908eb4761..8e2a25d02e 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -1943,7 +1943,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # default to the execution directory specified_mount_point = None # first get the size, since units should always be some nonnumerical string, get the last numerical value - for i, part in enumerate(spec_parts): + for i, part in reversed(list(enumerate(spec_parts))): if part.replace(".", "", 1).isdigit(): part_size = int(float(part)) spec_parts.pop(i) @@ -1974,22 +1974,19 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: per_part_size = convert_units(part_size, part_suffix) total_bytes += per_part_size + if specified_mount_point == "local-disk": + # Don't mount local-disk. This isn't in the spec, but is carried over from cromwell + # When the mount point is omitted, default to the task's execution directory, which None will represent + specified_mount_point = None if mount_spec.get(specified_mount_point) is not None: if specified_mount_point is not None: # raise an error as all mount points must be unique raise ValueError(f"Could not parse disks = {disks_spec} because the mount point {specified_mount_point} is specified multiple times") else: - if mount_spec.get(specified_mount_point) is not None: - raise ValueError(f"Could not parse disks = {disks_spec} because the mount point is omitted more than once") + raise ValueError(f"Could not parse disks = {disks_spec} because the mount point is omitted more than once") # TODO: we always ignore the disk type and assume we have the right one. - if specified_mount_point != "local-disk": - # Don't mount local-disk. This isn't in the spec, but is carried over from cromwell - # When the mount point is omitted, default to the task's execution directory, which None will represent - mount_spec[specified_mount_point] = int(per_part_size) - else: - # local-disk is equivalent to an omitted mount point - mount_spec[None] = int(per_part_size) + mount_spec[specified_mount_point] = int(per_part_size) runtime_disk = int(total_bytes) if not runtime_bindings.has_binding("gpu") and self._task.effective_wdl_version in ('1.0', 'draft-2'): From 4130ab86685c150747b09ad04d2b8e5063b01cbf Mon Sep 17 00:00:00 2001 From: stxue1 Date: Tue, 24 Sep 2024 13:07:53 -0700 Subject: [PATCH 14/21] move out disk parse into a function --- src/toil/wdl/wdltoil.py | 103 ++++++++++++++++++++++------------------ 1 file changed, 57 insertions(+), 46 deletions(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index 8e2a25d02e..ed46357cd7 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -568,6 +568,62 @@ def recursive_dependencies(root: WDL.Tree.WorkflowNode) -> Set[str]: # And produce the diff return needed - provided + +def parse_disks(spec: str, disks_spec: Union[List[WDL.Value.String], str]) -> Tuple[Optional[str], float, str]: + """ + + :param spec: Disks spec to parse + :param disks_spec: All disks spec as specified in the WDL file. Only used for better error messages. + :return: Specified mount point (None if omitted or local-disk), number of units, size of unit (ex GB) + """ + # Split up each spec as space-separated. We assume no fields + # are empty, and we want to allow people to use spaces after + # their commas when separating the list, like in Cromwell's + # examples, so we strip whitespace. + spec_parts = spec.strip().split(' ') + + # First check that this is a format we support. Both the WDL spec and Cromwell allow a max 3-piece specification + # So if there are more than 3 pieces, raise an error + if len(spec_parts) > 3: + raise RuntimeError(f"Could not parse disks = {disks_spec} because {spec_parts} contains more than 3 parts") + part_size = None + # default to GiB as per spec + part_suffix: str = "GiB" # The WDL spec's default is 1 GiB + # default to the execution directory + specified_mount_point = None + # first get the size, since units should always be some nonnumerical string, get the last numerical value + for i, part in reversed(list(enumerate(spec_parts))): + if part.replace(".", "", 1).isdigit(): + part_size = int(float(part)) + spec_parts.pop(i) + break + # unit specification is only allowed to be at the end + if spec_parts[-1].lower() in VALID_PREFIXES: + part_suffix = spec_parts[-1] + spec_parts.pop(-1) + # The last remaining element, if it exists, is the mount point + if len(spec_parts) > 0: + specified_mount_point = spec_parts[0] + + if part_size is None: + # Disk spec did not include a size + raise ValueError(f"Could not parse disks = {disks_spec} because {spec} does not specify a disk size") + + if part_suffix == "LOCAL": + # TODO: Cromwell rounds LOCAL disks up to the nearest 375 GB. I + # can't imagine that ever being standardized; just leave it + # alone so that the workflow doesn't rely on this weird and + # likely-to-change Cromwell detail. + logger.warning('Not rounding LOCAL disk to the nearest 375 GB; workflow execution will differ from Cromwell!') + elif part_suffix in ("HDD", "SSD"): + # For cromwell compatibility, assume this means GB in units + # We don't actually differentiate between HDD and SSD + part_suffix = "GB" + + per_part_size = convert_units(part_size, part_suffix) + return specified_mount_point, per_part_size, part_suffix + + # We define a URI scheme kind of like but not actually compatible with the one # we use for CWL. CWL brings along the file basename in its file type, but # WDL.Value.File doesn't. So we need to make sure we stash that somewhere in @@ -1927,52 +1983,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # Sum up the space in each disk specification total_bytes: float = 0 for spec in all_specs: - # Split up each spec as space-separated. We assume no fields - # are empty, and we want to allow people to use spaces after - # their commas when separating the list, like in Cromwell's - # examples, so we strip whitespace. - spec_parts = spec.strip().split(' ') - - # First check that this is a format we support. Both the WDL spec and Cromwell allow a max 3-piece specification - # So if there are more than 3 pieces, raise an error - if len(spec_parts) > 3: - raise RuntimeError(f"Could not parse disks = {disks_spec} because {all_specs} contains more than 3 parts") - part_size = None - # default to GiB as per spec - part_suffix: str = "GiB" # The WDL spec's default is 1 GiB - # default to the execution directory - specified_mount_point = None - # first get the size, since units should always be some nonnumerical string, get the last numerical value - for i, part in reversed(list(enumerate(spec_parts))): - if part.replace(".", "", 1).isdigit(): - part_size = int(float(part)) - spec_parts.pop(i) - break - # unit specification is only allowed to be at the end - if spec_parts[-1].lower() in VALID_PREFIXES: - part_suffix = spec_parts[-1] - spec_parts.pop(-1) - # The last remaining element, if it exists, is the mount point - if len(spec_parts) > 0: - specified_mount_point = spec_parts[0] - - if part_size is None: - # Disk spec did not include a size - raise ValueError(f"Could not parse disks = {disks_spec} because {spec} does not specify a disk size") - - - if part_suffix == "LOCAL": - # TODO: Cromwell rounds LOCAL disks up to the nearest 375 GB. I - # can't imagine that ever being standardized; just leave it - # alone so that the workflow doesn't rely on this weird and - # likely-to-change Cromwell detail. - logger.warning('Not rounding LOCAL disk to the nearest 375 GB; workflow execution will differ from Cromwell!') - elif part_suffix in ("HDD", "SSD"): - # For cromwell compatibility, assume this means GB in units - # We don't actually differentiate between HDD and SSD - part_suffix = "GB" - - per_part_size = convert_units(part_size, part_suffix) + specified_mount_point, per_part_size, part_suffix = parse_disks(spec, disks_spec) total_bytes += per_part_size if specified_mount_point == "local-disk": # Don't mount local-disk. This isn't in the spec, but is carried over from cromwell From d194edca6763af94a37e4aff3b65b9dd46c35212 Mon Sep 17 00:00:00 2001 From: stxue1 Date: Tue, 24 Sep 2024 13:42:51 -0700 Subject: [PATCH 15/21] Fix issues with cromwell compatibility --- src/toil/wdl/wdltoil.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index ed46357cd7..5d41edc530 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -598,27 +598,27 @@ def parse_disks(spec: str, disks_spec: Union[List[WDL.Value.String], str]) -> Tu spec_parts.pop(i) break # unit specification is only allowed to be at the end - if spec_parts[-1].lower() in VALID_PREFIXES: - part_suffix = spec_parts[-1] - spec_parts.pop(-1) - # The last remaining element, if it exists, is the mount point - if len(spec_parts) > 0: - specified_mount_point = spec_parts[0] - - if part_size is None: - # Disk spec did not include a size - raise ValueError(f"Could not parse disks = {disks_spec} because {spec} does not specify a disk size") - + unit_spec = spec_parts[-1] if part_suffix == "LOCAL": # TODO: Cromwell rounds LOCAL disks up to the nearest 375 GB. I # can't imagine that ever being standardized; just leave it # alone so that the workflow doesn't rely on this weird and # likely-to-change Cromwell detail. logger.warning('Not rounding LOCAL disk to the nearest 375 GB; workflow execution will differ from Cromwell!') - elif part_suffix in ("HDD", "SSD"): + elif unit_spec in ("HDD", "SSD"): # For cromwell compatibility, assume this means GB in units # We don't actually differentiate between HDD and SSD part_suffix = "GB" + if unit_spec.lower() in VALID_PREFIXES: + part_suffix = spec_parts[-1] + spec_parts.pop(-1) + # The last remaining element, if it exists, is the mount point + if len(spec_parts) > 0: + specified_mount_point = spec_parts[0] + + if part_size is None: + # Disk spec did not include a size + raise ValueError(f"Could not parse disks = {disks_spec} because {spec} does not specify a disk size") per_part_size = convert_units(part_size, part_suffix) return specified_mount_point, per_part_size, part_suffix From f2080a8978672e947eafedb868363bca2ea0ab75 Mon Sep 17 00:00:00 2001 From: stxue1 Date: Tue, 24 Sep 2024 13:57:22 -0700 Subject: [PATCH 16/21] Move local-disk into parse and dont convert_units in parse function --- src/toil/wdl/wdltoil.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index 5d41edc530..1d9ffe25eb 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -616,12 +616,16 @@ def parse_disks(spec: str, disks_spec: Union[List[WDL.Value.String], str]) -> Tu if len(spec_parts) > 0: specified_mount_point = spec_parts[0] + if specified_mount_point == "local-disk": + # Don't mount local-disk. This isn't in the spec, but is carried over from cromwell + # When the mount point is omitted, default to the task's execution directory, which None will represent + specified_mount_point = None + if part_size is None: # Disk spec did not include a size raise ValueError(f"Could not parse disks = {disks_spec} because {spec} does not specify a disk size") - per_part_size = convert_units(part_size, part_suffix) - return specified_mount_point, per_part_size, part_suffix + return specified_mount_point, part_size, part_suffix # We define a URI scheme kind of like but not actually compatible with the one @@ -1983,12 +1987,9 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # Sum up the space in each disk specification total_bytes: float = 0 for spec in all_specs: - specified_mount_point, per_part_size, part_suffix = parse_disks(spec, disks_spec) + specified_mount_point, part_size, part_suffix = parse_disks(spec, disks_spec) + per_part_size = convert_units(part_size, part_suffix) total_bytes += per_part_size - if specified_mount_point == "local-disk": - # Don't mount local-disk. This isn't in the spec, but is carried over from cromwell - # When the mount point is omitted, default to the task's execution directory, which None will represent - specified_mount_point = None if mount_spec.get(specified_mount_point) is not None: if specified_mount_point is not None: # raise an error as all mount points must be unique From f0dbe3f3747566e099e12d45b9bef26085f23337 Mon Sep 17 00:00:00 2001 From: stxue1 Date: Tue, 24 Sep 2024 13:59:24 -0700 Subject: [PATCH 17/21] Fix edge case where only size is requested --- src/toil/wdl/wdltoil.py | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index 1d9ffe25eb..23ed20c2e0 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -598,21 +598,22 @@ def parse_disks(spec: str, disks_spec: Union[List[WDL.Value.String], str]) -> Tu spec_parts.pop(i) break # unit specification is only allowed to be at the end - unit_spec = spec_parts[-1] - if part_suffix == "LOCAL": - # TODO: Cromwell rounds LOCAL disks up to the nearest 375 GB. I - # can't imagine that ever being standardized; just leave it - # alone so that the workflow doesn't rely on this weird and - # likely-to-change Cromwell detail. - logger.warning('Not rounding LOCAL disk to the nearest 375 GB; workflow execution will differ from Cromwell!') - elif unit_spec in ("HDD", "SSD"): - # For cromwell compatibility, assume this means GB in units - # We don't actually differentiate between HDD and SSD - part_suffix = "GB" - if unit_spec.lower() in VALID_PREFIXES: - part_suffix = spec_parts[-1] - spec_parts.pop(-1) - # The last remaining element, if it exists, is the mount point + if len(spec_parts) > 0: + unit_spec = spec_parts[-1] + if part_suffix == "LOCAL": + # TODO: Cromwell rounds LOCAL disks up to the nearest 375 GB. I + # can't imagine that ever being standardized; just leave it + # alone so that the workflow doesn't rely on this weird and + # likely-to-change Cromwell detail. + logger.warning('Not rounding LOCAL disk to the nearest 375 GB; workflow execution will differ from Cromwell!') + elif unit_spec in ("HDD", "SSD"): + # For cromwell compatibility, assume this means GB in units + # We don't actually differentiate between HDD and SSD + part_suffix = "GB" + if unit_spec.lower() in VALID_PREFIXES: + part_suffix = spec_parts[-1] + spec_parts.pop(-1) + # The last remaining element, if it exists, is the mount point if len(spec_parts) > 0: specified_mount_point = spec_parts[0] From e6a90829a40def78fbddcfaae8ab8b2f8ecef378 Mon Sep 17 00:00:00 2001 From: stxue1 Date: Tue, 24 Sep 2024 13:59:28 -0700 Subject: [PATCH 18/21] Add tests --- src/toil/test/wdl/wdltoil_test.py | 40 +++++++++++++++++++++++++++++-- 1 file changed, 38 insertions(+), 2 deletions(-) diff --git a/src/toil/test/wdl/wdltoil_test.py b/src/toil/test/wdl/wdltoil_test.py index 6de389b11c..06fc5fb5a2 100644 --- a/src/toil/test/wdl/wdltoil_test.py +++ b/src/toil/test/wdl/wdltoil_test.py @@ -6,7 +6,7 @@ import subprocess import unittest from uuid import uuid4 -from typing import Optional +from typing import Optional, Union from unittest.mock import patch from typing import Any, Dict, List, Set @@ -24,7 +24,7 @@ needs_wdl, slow, integrative) from toil.version import exactPython -from toil.wdl.wdltoil import WDLSectionJob, WDLWorkflowGraph, remove_common_leading_whitespace +from toil.wdl.wdltoil import WDLSectionJob, WDLWorkflowGraph, remove_common_leading_whitespace, parse_disks import WDL.Expr import WDL.Error @@ -660,6 +660,42 @@ def test_uri_packing(self): self.assertEqual(unpacked[3], file_basename) + def test_disk_parse(self): + """ + Test to make sure the disk parsing is correct + """ + # Test cromwell compatibility + spec = "local-disk 5 SSD" + specified_mount_point, part_size, part_suffix = parse_disks(spec, spec) + self.assertEqual(specified_mount_point, None) + self.assertEqual(part_size, 5) + self.assertEqual(part_suffix, "GB") + + # Test spec conformance + # https://github.com/openwdl/wdl/blob/e43e042104b728df1f1ad6e6145945d2b32331a6/SPEC.md?plain=1#L5072-L5082 + spec = "10" + specified_mount_point, part_size, part_suffix = parse_disks(spec, spec) + self.assertEqual(specified_mount_point, None) + self.assertEqual(part_size, 10) + self.assertEqual(part_suffix, "GiB") # WDL spec default + + spec = "1 MB" + specified_mount_point, part_size, part_suffix = parse_disks(spec, spec) + self.assertEqual(specified_mount_point, None) + self.assertEqual(part_size, 1) + self.assertEqual(part_suffix, "MB") + + spec = "MOUNT_POINT 3" + specified_mount_point, part_size, part_suffix = parse_disks(spec, spec) + self.assertEqual(specified_mount_point, "MOUNT_POINT") + self.assertEqual(part_size, 3) + self.assertEqual(part_suffix, "GiB") + + spec = "MOUNT_POINT 2 MB" + specified_mount_point, part_size, part_suffix = parse_disks(spec, spec) + self.assertEqual(specified_mount_point, "MOUNT_POINT") + self.assertEqual(part_size, 2) + self.assertEqual(part_suffix, "MB") if __name__ == "__main__": From b184386ab64f201b0859efed3ee24f334567452c Mon Sep 17 00:00:00 2001 From: stxue1 Date: Fri, 27 Sep 2024 16:40:50 -0700 Subject: [PATCH 19/21] Remove redef --- src/toil/wdl/wdltoil.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index 744f4c575e..2e3998c6aa 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -3577,7 +3577,6 @@ def run(self, file_store: AbstractFileStore) -> WDLBindings: for subnode in node.body: stack.append(subnode) # Collect all bindings that are task outputs - output_bindings: WDL.Env.Bindings[WDL.Value.Base] = WDL.Env.Bindings() for binding in unwrap(self._bindings): if binding.name in output_set: # The bindings will already be namespaced with the task namespaces From 7f4b452c0d114b81f4f4f6a897116336004801cf Mon Sep 17 00:00:00 2001 From: stxue1 Date: Tue, 1 Oct 2024 16:42:17 -0700 Subject: [PATCH 20/21] Add docstring and remove dead comment --- src/toil/wdl/wdltoil.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index 2e3998c6aa..bfa6243b24 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -571,7 +571,7 @@ def recursive_dependencies(root: WDL.Tree.WorkflowNode) -> Set[str]: def parse_disks(spec: str, disks_spec: Union[List[WDL.Value.String], str]) -> Tuple[Optional[str], float, str]: """ - + Parse a WDL disk spec into a disk mount specification. :param spec: Disks spec to parse :param disks_spec: All disks spec as specified in the WDL file. Only used for better error messages. :return: Specified mount point (None if omitted or local-disk), number of units, size of unit (ex GB) @@ -2504,8 +2504,6 @@ def patch_prepare_mounts_singularity() -> List[Tuple[str, str, bool]]: The singularity and docker patch are separate as they have different function signatures """ - # todo: support AWS EBS/Kubernetes persistent volumes - # this logic likely only works for local clusters as we don't deal with the size of each mount point mounts: List[Tuple[str, str, bool]] = singularity_original_prepare_mounts() # todo: support AWS EBS/Kubernetes persistent volumes # this logic likely only works for local clusters as we don't deal with the size of each mount point From f3a3a2fb40b6e0f397ed88bc3e76ba6215616d01 Mon Sep 17 00:00:00 2001 From: stxue1 Date: Wed, 2 Oct 2024 11:04:42 -0700 Subject: [PATCH 21/21] Add back dropped mount_spec argument --- src/toil/wdl/wdltoil.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index 6e33e1a539..c8a50db6e0 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -2236,7 +2236,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: virtualize_files(bindings, standard_library, enforce_existence=False), virtualize_files(runtime_bindings, standard_library, enforce_existence=False), self._task_id, cores=runtime_cores or self.cores, memory=runtime_memory or self.memory, disk=runtime_disk or self.disk, - accelerators=runtime_accelerators or self.accelerators, wdl_options=task_wdl_options) + accelerators=runtime_accelerators or self.accelerators, wdl_options=task_wdl_options, mount_spec=mount_spec) # Run that as a child self.addChild(run_job) @@ -2259,7 +2259,8 @@ class WDLTaskJob(WDLBaseJob): All bindings are in terms of task-internal names. """ - def __init__(self, task: WDL.Tree.Task, task_internal_bindings: Promised[WDLBindings], runtime_bindings: Promised[WDLBindings], task_id: List[str], wdl_options: WDLContext, **kwargs: Any) -> None: + def __init__(self, task: WDL.Tree.Task, task_internal_bindings: Promised[WDLBindings], runtime_bindings: Promised[WDLBindings], task_id: List[str], + mount_spec: Dict[Optional[str], int], wdl_options: WDLContext, **kwargs: Any) -> None: """ Make a new job to run a task. @@ -2280,6 +2281,7 @@ def __init__(self, task: WDL.Tree.Task, task_internal_bindings: Promised[WDLBind self._task_internal_bindings = task_internal_bindings self._runtime_bindings = runtime_bindings self._task_id = task_id + self._mount_spec = mount_spec ### # Runtime code injection system