Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support more methods of WDL task disk specification #5001

Open
wants to merge 54 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
3971be6
better disk logic and add logic to mount specific points
stxue1 Jun 29, 2024
48990d0
cromwell compatibility
stxue1 Jun 29, 2024
e8d223c
Convert from wdl string to normal string
stxue1 Jun 29, 2024
fbe0eef
Merge branch 'master' into issues/4995-disk-spec-wdl
stxue1 Jun 29, 2024
8f7199a
floats
stxue1 Jul 1, 2024
166bf41
Merge branch 'issues/4995-disk-spec-wdl' of github.com:DataBiosphere/…
stxue1 Jul 1, 2024
d7719b9
Satisfy mypy
stxue1 Jul 1, 2024
0c131e0
Merge master into issues/4995-disk-spec-wdl
github-actions[bot] Jul 2, 2024
b4714ec
Merge master into issues/4995-disk-spec-wdl
github-actions[bot] Jul 10, 2024
4443728
Merge master into issues/4995-disk-spec-wdl
github-actions[bot] Jul 11, 2024
8da0d7d
Merge master into issues/4995-disk-spec-wdl
github-actions[bot] Jul 16, 2024
9379715
Merge master into issues/4995-disk-spec-wdl
github-actions[bot] Jul 17, 2024
85a4df9
Merge master into issues/4995-disk-spec-wdl
github-actions[bot] Jul 18, 2024
6aa73b1
Merge master into issues/4995-disk-spec-wdl
github-actions[bot] Jul 19, 2024
cc36f71
Merge master into issues/4995-disk-spec-wdl
github-actions[bot] Jul 19, 2024
36fd277
Merge master into issues/4995-disk-spec-wdl
github-actions[bot] Jul 19, 2024
c7bec56
Merge master into issues/4995-disk-spec-wdl
github-actions[bot] Jul 20, 2024
4b4e7f0
Merge master into issues/4995-disk-spec-wdl
github-actions[bot] Jul 25, 2024
6bf2a4e
Merge master into issues/4995-disk-spec-wdl
github-actions[bot] Jul 25, 2024
31b7e27
Merge master into issues/4995-disk-spec-wdl
github-actions[bot] Jul 25, 2024
98bba55
Merge master into issues/4995-disk-spec-wdl
github-actions[bot] Jul 25, 2024
25e0e51
Merge master into issues/4995-disk-spec-wdl
github-actions[bot] Jul 26, 2024
42fedf6
Merge master into issues/4995-disk-spec-wdl
github-actions[bot] Jul 29, 2024
d35d033
Merge master into issues/4995-disk-spec-wdl
github-actions[bot] Jul 30, 2024
9d75e4b
Merge master into issues/4995-disk-spec-wdl
github-actions[bot] Jul 31, 2024
f752535
Merge master into issues/4995-disk-spec-wdl
github-actions[bot] Aug 1, 2024
4ce33d5
Merge master into issues/4995-disk-spec-wdl
github-actions[bot] Aug 3, 2024
e9df2f9
Merge master into issues/4995-disk-spec-wdl
github-actions[bot] Aug 8, 2024
01b8102
Merge master into issues/4995-disk-spec-wdl
github-actions[bot] Aug 14, 2024
2955c4d
Merge master into issues/4995-disk-spec-wdl
github-actions[bot] Aug 15, 2024
a364601
Merge master into issues/4995-disk-spec-wdl
github-actions[bot] Aug 19, 2024
02d873f
Merge master into issues/4995-disk-spec-wdl
github-actions[bot] Aug 19, 2024
b65b315
Merge master into issues/4995-disk-spec-wdl
github-actions[bot] Aug 21, 2024
6f30676
Merge master into issues/4995-disk-spec-wdl
github-actions[bot] Aug 22, 2024
58196ce
Merge master into issues/4995-disk-spec-wdl
github-actions[bot] Aug 22, 2024
66d3e50
Merge master into issues/4995-disk-spec-wdl
github-actions[bot] Aug 22, 2024
ea19cb6
Follow new spec
stxue1 Aug 23, 2024
32c65dd
mypy
stxue1 Aug 23, 2024
63b4410
Merge branch 'issues/4995-disk-spec-wdl' of github.com:DataBiosphere/…
stxue1 Aug 23, 2024
a1a8651
Support cromwell disks attributes for backwards compatibility
stxue1 Aug 23, 2024
29ffd3f
Deal with pipes deprecation
stxue1 Aug 23, 2024
7068810
Update md5sum test to be compatible with newer docker/singularity ver…
stxue1 Aug 23, 2024
c090823
Merge master into issues/4995-disk-spec-wdl
github-actions[bot] Aug 27, 2024
14e2ee1
Merge branch 'master' of github.com:DataBiosphere/toil into issues/49…
stxue1 Aug 29, 2024
901c4c2
Merge master into issues/4995-disk-spec-wdl
github-actions[bot] Sep 3, 2024
aa58e2f
Merge master into issues/4995-disk-spec-wdl
github-actions[bot] Sep 10, 2024
8b15af6
Merge master into issues/4995-disk-spec-wdl
github-actions[bot] Sep 10, 2024
e04f5c1
Merge master into issues/4995-disk-spec-wdl
github-actions[bot] Sep 12, 2024
ae2f169
Merge master into issues/4995-disk-spec-wdl
github-actions[bot] Sep 16, 2024
eb56ef9
Merge master into issues/4995-disk-spec-wdl
github-actions[bot] Sep 16, 2024
a21fc3a
Merge master into issues/4995-disk-spec-wdl
github-actions[bot] Sep 16, 2024
1a098b4
Address comments
stxue1 Sep 17, 2024
ceccb07
Merge branch 'issues/4995-disk-spec-wdl' of github.com:DataBiosphere/…
stxue1 Sep 17, 2024
839e09b
Update src/toil/wdl/wdltoil.py
stxue1 Sep 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/toil/provisioners/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.
import datetime
import logging
import pipes
from shlex import quote
import socket
import subprocess
import time
Expand Down Expand Up @@ -279,7 +279,7 @@ def coreSSH(self, *args, **kwargs):
commandTokens += ['docker', 'exec', '-i', ttyFlag, 'toil_leader']

logger.debug('Node %s: %s', self.effectiveIP, ' '.join(args))
args = list(map(pipes.quote, args))
args = list(map(quote, args))
commandTokens += args
logger.debug('Full command %s', ' '.join(commandTokens))
process = subprocess.Popen(commandTokens, **kwargs)
Expand Down
9 changes: 5 additions & 4 deletions src/toil/test/wdl/md5sum/md5sum.1.0.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@ task md5 {
input {
File inputFile
}
command {
/bin/my_md5sum ${inputFile}
}
command <<<
set -euf -o pipefail
md5sum ~{inputFile} | awk '{print $1}' > md5sum.txt
>>>

output {
File value = "md5sum.txt"
}

runtime {
docker: "quay.io/briandoconnor/dockstore-tool-md5sum:1.0.4"
docker: "ubuntu:22.04"
cpu: 1
memory: "512 MB"
disks: "local-disk 10 HDD"
Expand Down
210 changes: 181 additions & 29 deletions src/toil/wdl/wdltoil.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
from toil.jobStores.abstractJobStore import (AbstractJobStore, UnimplementedURLException,
InvalidImportExportUrlException, LocatorException)
from toil.lib.accelerators import get_individual_local_accelerators
from toil.lib.conversions import convert_units, human2bytes
from toil.lib.conversions import convert_units, human2bytes, VALID_PREFIXES
from toil.lib.io import mkdtemp
from toil.lib.memoize import memoize
from toil.lib.misc import get_user_name
Expand All @@ -90,6 +90,11 @@
logger = logging.getLogger(__name__)


class InsufficientMountDiskSpace(Exception):
def __init__(self, mount_targets: List[str], desired_bytes: int, available_bytes: int) -> None:
super().__init__("Not enough available disk space for the target mount points %s. Needed %d bytes but there is only %d available."
% (", ".join(mount_targets), desired_bytes, available_bytes))

@contextmanager
def wdl_error_reporter(task: str, exit: bool = False, log: Callable[[str], None] = logger.critical) -> Generator[None, None, None]:
"""
Expand All @@ -109,7 +114,8 @@ def wdl_error_reporter(task: str, exit: bool = False, log: Callable[[str], None]
LocatorException,
InvalidImportExportUrlException,
UnimplementedURLException,
JobTooBigError
JobTooBigError,
InsufficientMountDiskSpace
) as e:
# Don't expose tracebacks to the user for exceptions that may be expected
log("Could not " + task + " because:")
Expand Down Expand Up @@ -1915,43 +1921,87 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]:
memory_spec = human2bytes(memory_spec)
runtime_memory = memory_spec

mount_spec: Dict[Optional[str], int] = dict()
if runtime_bindings.has_binding('disks'):
# Miniwdl doesn't have this, but we need to be able to parse things like:
# local-disk 5 SSD
# which would mean we need 5 GB space. Cromwell docs for this are at https://cromwell.readthedocs.io/en/stable/RuntimeAttributes/#disks
# We ignore all disk types, and complain if the mount point is not `local-disk`.
disks_spec: str = runtime_bindings.resolve('disks').value
all_specs = disks_spec.split(',')
# Sum up the gigabytes in each disk specification
total_gb = 0
disks_spec: Union[List[WDL.Value.String], str] = runtime_bindings.resolve('disks').value
if isinstance(disks_spec, list):
# SPEC says to use the first one
# the parser gives an array of WDL string objects
all_specs = [part.value for part in disks_spec]
else:
all_specs = disks_spec.split(',')
# Sum up the space in each disk specification
total_bytes: float = 0
for spec in all_specs:
# Split up each spec as space-separated. We assume no fields
# are empty, and we want to allow people to use spaces after
# their commas when separating the list, like in Cromwell's
# examples, so we strip whitespace.
spec_parts = spec.strip().split(' ')
if len(spec_parts) != 3:
# TODO: Add a WDL line to this error
raise ValueError(f"Could not parse disks = {disks_spec} because {spec} does not have 3 space-separated parts")
if spec_parts[0] != 'local-disk':
# TODO: Add a WDL line to this error
raise NotImplementedError(f"Could not provide disks = {disks_spec} because only the local-disks mount point is implemented")
try:
total_gb += int(spec_parts[1])
except:
# TODO: Add a WDL line to this error
raise ValueError(f"Could not parse disks = {disks_spec} because {spec_parts[1]} is not an integer")
# TODO: we always ignore the disk type and assume we have the right one.
# TODO: Cromwell rounds LOCAL disks up to the nearest 375 GB. I
# can't imagine that ever being standardized; just leave it
# alone so that the workflow doesn't rely on this weird and
# likely-to-change Cromwell detail.
if spec_parts[2] == 'LOCAL':
logger.warning('Not rounding LOCAL disk to the nearest 375 GB; workflow execution will differ from Cromwell!')
total_bytes: float = convert_units(total_gb, 'GB')
runtime_disk = int(total_bytes)

# First check that this is a format we support. Both the WDL spec and Cromwell allow a max 3-piece specification
# So if there are more than 3 pieces, raise an error
if len(spec_parts) > 3:
raise RuntimeError(f"Could not parse disks = {disks_spec} because {all_specs} contains more than 3 parts")
part_size = None
# default to GiB as per spec
part_suffix: str = "GiB" # The WDL spec's default is 1 GiB
# default to the execution directory
specified_mount_point = None
# first get the size, since units should always be some nonnumerical string, get the last numerical value
for i, part in enumerate(spec_parts):
if part.replace(".", "", 1).isdigit():
part_size = int(float(part))
spec_parts.pop(i)
break
# unit specification is only allowed to be at the end
if spec_parts[-1].lower() in VALID_PREFIXES:
part_suffix = spec_parts[-1]
spec_parts.pop(-1)
# The last remaining element, if it exists, is the mount point
if len(spec_parts) > 0:
specified_mount_point = spec_parts[0]

if part_size is None:
# Disk spec did not include a size
raise ValueError(f"Could not parse disks = {disks_spec} because {spec} does not specify a disk size")


if part_suffix == "LOCAL":
# TODO: Cromwell rounds LOCAL disks up to the nearest 375 GB. I
# can't imagine that ever being standardized; just leave it
# alone so that the workflow doesn't rely on this weird and
# likely-to-change Cromwell detail.
logger.warning('Not rounding LOCAL disk to the nearest 375 GB; workflow execution will differ from Cromwell!')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also probably switch the default unit to GB here, since that is what the Cromwell syntax expects.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to keep the default unit to GiB as that is the WDL spec default https://github.com/openwdl/wdl/blob/e43e042104b728df1f1ad6e6145945d2b32331a6/SPEC.md?plain=1#L5082

elif part_suffix in ("HDD", "SSD"):
# For cromwell compatibility, assume this means GB in units
# We don't actually differentiate between HDD and SSD
part_suffix = "GB"

per_part_size = convert_units(part_size, part_suffix)
total_bytes += per_part_size
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The total space needed (including any local-disk or mount-point-less size) doesn't ever get copied to runtime_disk and so isn't actually used for scheduling.

if mount_spec.get(specified_mount_point) is not None:
if specified_mount_point is not None:
# raise an error as all mount points must be unique
raise ValueError(f"Could not parse disks = {disks_spec} because the mount point {specified_mount_point} is specified multiple times")
else:
if mount_spec.get(specified_mount_point) is not None:
raise ValueError(f"Could not parse disks = {disks_spec} because the mount point is omitted more than once")

# TODO: we always ignore the disk type and assume we have the right one.
if specified_mount_point != "local-disk":
# Don't mount local-disk. This isn't in the spec, but is carried over from cromwell
# When the mount point is omitted, default to the task's execution directory, which None will represent
mount_spec[specified_mount_point] = int(per_part_size)
else:
# local-disk is equivalent to an omitted mount point
mount_spec[None] = int(per_part_size)
runtime_disk = int(total_bytes)

if not runtime_bindings.has_binding("gpu") and self._task.effective_wdl_version in ('1.0', 'draft-2'):
# For old WDL versions, guess whether the task wants GPUs if not specified.
use_gpus = (runtime_bindings.has_binding('gpuCount') or
Expand Down Expand Up @@ -1985,7 +2035,9 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]:
runtime_accelerators = [accelerator_requirement]

# Schedule to get resources. Pass along the bindings from evaluating all the inputs and decls, and the runtime, with files virtualized.
run_job = WDLTaskJob(self._task, virtualize_files(bindings, standard_library), virtualize_files(runtime_bindings, standard_library), self._task_id, self._namespace, self._task_path, cores=runtime_cores or self.cores, memory=runtime_memory or self.memory, disk=runtime_disk or self.disk, accelerators=runtime_accelerators or self.accelerators, wdl_options=self._wdl_options)
run_job = WDLTaskJob(self._task, virtualize_files(bindings, standard_library), virtualize_files(runtime_bindings, standard_library), self._task_id, self._namespace,
self._task_path, mount_spec, cores=runtime_cores or self.cores, memory=runtime_memory or self.memory, disk=runtime_disk or self.disk,
accelerators=runtime_accelerators or self.accelerators, wdl_options=self._wdl_options)
# Run that as a child
self.addChild(run_job)

Expand All @@ -2008,7 +2060,8 @@ class WDLTaskJob(WDLBaseJob):
All bindings are in terms of task-internal names.
"""

def __init__(self, task: WDL.Tree.Task, task_internal_bindings: Promised[WDLBindings], runtime_bindings: Promised[WDLBindings], task_id: List[str], namespace: str, task_path: str, **kwargs: Any) -> None:
def __init__(self, task: WDL.Tree.Task, task_internal_bindings: Promised[WDLBindings], runtime_bindings: Promised[WDLBindings], task_id: List[str], namespace: str,
task_path: str, mount_spec: Dict[Optional[str], int], **kwargs: Any) -> None:
"""
Make a new job to run a task.

Expand All @@ -2032,6 +2085,7 @@ def __init__(self, task: WDL.Tree.Task, task_internal_bindings: Promised[WDLBind
self._task_id = task_id
self._namespace = namespace
self._task_path = task_path
self._mount_spec = mount_spec

###
# Runtime code injection system
Expand Down Expand Up @@ -2211,6 +2265,58 @@ def can_mount_proc(self) -> bool:
"""
return "KUBERNETES_SERVICE_HOST" not in os.environ

def ensure_mount_point(self, file_store: AbstractFileStore, mount_spec: Dict[Optional[str], int]) -> Dict[str, str]:
"""
Ensure the mount point sources are available.

Will check if the mount point source has the requested amount of space available.

Note: We are depending on Toil's job scheduling backend to error when the sum of multiple mount points disk requests is greater than the total available
For example, if a task has two mount points request 100 GB each but there is only 100 GB available, the df check may pass
but Toil should fail to schedule the jobs internally

:param mount_spec: Mount specification from the disks attribute in the WDL task. Is a dict where key is the mount point target and value is the size
:param file_store: File store to create a tmp directory for the mount point source
:return: Dict mapping mount point target to mount point source
"""
logger.debug("Detected mount specifications, creating mount points.")
mount_src_mapping = {}
# Create one tmpdir to encapsulate all mount point sources, each mount point will be associated with a subdirectory
tmpdir = file_store.getLocalTempDir()

# The POSIX standard doesn't specify how to escape spaces in mount points and file system names
# The only defect of this regex is if the target mount point is the same format as the df output
# It is likely reliable enough to trust the user has not created a mount with a df output-like name
regex_df = re.compile(r".+ \d+ +\d+ +(\d+) +\d+% +.+")
total_mount_size = sum(mount_spec.values())
try:
# Use arguments from the df POSIX standard
df_line = subprocess.check_output(["df", "-k", "-P", tmpdir], encoding="utf-8").split("\n")[1]
m = re.match(regex_df, df_line)
if m is None:
logger.debug("Output of df may be malformed: %s", df_line)
logger.warning("Unable to check disk requirements as output of 'df' command is malformed. Will assume storage is always available.")
else:
# Block size will always be 1024
available_space = int(m[1]) * 1024
if available_space < total_mount_size:
# We do not have enough space available for this mount point
# An omitted mount point is the task's execution directory so show that to the user instead
raise InsufficientMountDiskSpace([mount_point if mount_point is not None else "/mnt/miniwdl_task_container/work" for mount_point in mount_spec.keys()],
total_mount_size, available_space)
except subprocess.CalledProcessError as e:
# If df somehow isn't available
logger.debug("Unable to call df. stdout: %s stderr: %s", e.stdout, e.stderr)
logger.warning("Unable to check disk requirements as call to 'df' command failed. Will assume storage is always available.")
for mount_target in mount_spec.keys():
# Create a new subdirectory for each mount point
source_location = os.path.join(tmpdir, str(uuid.uuid4()))
os.mkdir(source_location)
if mount_target is not None:
# None represents an omitted mount point, which will default to the task's work directory. MiniWDL's internals will mount the task's work directory by itself
mount_src_mapping[mount_target] = source_location
return mount_src_mapping

@report_wdl_errors("run task command", exit=True)
def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]:
"""
Expand All @@ -2224,6 +2330,9 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]:
# We process nonexistent files in WDLTaskWrapperJob as those must be run locally, so don't try to devirtualize them
standard_library = ToilWDLStdLibBase(file_store, self._task_path, enforce_existence=False)

# Create mount points and get a mapping of target mount points to locations on disk
mount_mapping = self.ensure_mount_point(file_store, self._mount_spec)

# Get the bindings from after the input section
bindings = unwrap(self._task_internal_bindings)
# And the bindings from evaluating the runtime section
Expand Down Expand Up @@ -2385,8 +2494,51 @@ def patched_run_invocation(*args: Any, **kwargs: Any) -> List[str]:
return command_line

# Apply the patch
task_container._run_invocation = patched_run_invocation # type: ignore
task_container._run_invocation = patched_run_invocation # type: ignore

singularity_original_prepare_mounts = task_container.prepare_mounts

def patch_prepare_mounts_singularity() -> List[Tuple[str, str, bool]]:
"""
Mount the mount points specified from the disk requirements.

The singularity and docker patch are separate as they have different function signatures
"""
# todo: support AWS EBS/Kubernetes persistent volumes
# this logic likely only works for local clusters as we don't deal with the size of each mount point
mounts: List[Tuple[str, str, bool]] = singularity_original_prepare_mounts()
# todo: support AWS EBS/Kubernetes persistent volumes
# this logic likely only works for local clusters as we don't deal with the size of each mount point
for mount_point, source_location in mount_mapping.items():
mounts.append((mount_point, source_location, True))
return mounts
task_container.prepare_mounts = patch_prepare_mounts_singularity # type: ignore[method-assign]
elif isinstance(task_container, SwarmContainer):
docker_original_prepare_mounts = task_container.prepare_mounts

try:
# miniwdl depends on docker so this should be available but check just in case
import docker
# docker stubs are still WIP: https://github.com/docker/docker-py/issues/2796
from docker.types import Mount # type: ignore[import-untyped]

def patch_prepare_mounts_docker(logger: logging.Logger) -> List[Mount]:
"""
Same as the singularity patch but for docker
"""
mounts: List[Mount] = docker_original_prepare_mounts(logger)
for mount_point, source_location in mount_mapping.items():
mounts.append(
Mount(
mount_point.rstrip("/").replace("{{", '{{"{{"}}'),
source_location.rstrip("/").replace("{{", '{{"{{"}}'),
type="bind",
)
)
return mounts
task_container.prepare_mounts = patch_prepare_mounts_docker # type: ignore[method-assign]
except ImportError:
logger.warning("Docker package not installed. Unable to add mount points.")
# Show the runtime info to the container
task_container.process_runtime(miniwdl_logger, {binding.name: binding.value for binding in devirtualize_files(runtime_bindings, standard_library)})

Expand Down
Loading