diff --git a/cwltool.code-workspace b/cwltool.code-workspace new file mode 100644 index 000000000..876a1499c --- /dev/null +++ b/cwltool.code-workspace @@ -0,0 +1,8 @@ +{ + "folders": [ + { + "path": "." + } + ], + "settings": {} +} \ No newline at end of file diff --git a/cwltool/argparser.py b/cwltool/argparser.py index 5439d2bd2..dbb8349b4 100644 --- a/cwltool/argparser.py +++ b/cwltool/argparser.py @@ -287,6 +287,24 @@ def arg_parser() -> argparse.ArgumentParser: type=str, ) + # TO DO: Not yet implemented + provgroup.add_argument( + "--no-data", # Maybe change to no-input and no-intermediate to ignore those kind of files?... + default=False, + action="store_true", + help="Disables the storage of input and output data files", + dest="no_data", + ) + + # TO DO: Not yet implemented + provgroup.add_argument( + "--no-input", # Maybe change to no-input and no-intermediate to ignore those kind of files?... + default=False, + action="store_true", + help="Disables the storage of input data files", + dest="no_input", + ) + printgroup = parser.add_mutually_exclusive_group() printgroup.add_argument( "--print-rdf", diff --git a/cwltool/builder.py b/cwltool/builder.py index ea2a708a6..3d781db9b 100644 --- a/cwltool/builder.py +++ b/cwltool/builder.py @@ -572,6 +572,10 @@ def addsf( datum = cast(CWLObjectType, datum) ll = schema.get("loadListing") or self.loadListing if ll and ll != "no_listing": + # Debug show + for k in datum: + _logger.debug("Datum: %s: %s" % (k, datum[k])) + _logger.debug("----------------------------------------") get_listing( self.fs_access, datum, diff --git a/cwltool/cwlprov/__init__.py b/cwltool/cwlprov/__init__.py index 98e8c911c..8ffcff4c3 100644 --- a/cwltool/cwlprov/__init__.py +++ b/cwltool/cwlprov/__init__.py @@ -6,10 +6,14 @@ import re import uuid from getpass import getuser -from typing import IO, Any, Callable, Dict, List, Optional, Tuple, Union +from typing import IO, Any, Dict, List, Optional, Tuple, Union from typing_extensions import TypedDict +from cwltool.cwlprov.provenance_constants import Hasher + +from ..loghandler import _logger + def _whoami() -> Tuple[str, str]: """Return the current operating system account as (username, fullname).""" @@ -135,7 +139,7 @@ def _valid_orcid(orcid: Optional[str]) -> str: def checksum_copy( src_file: IO[Any], dst_file: Optional[IO[Any]] = None, - hasher: Optional[Callable[[], "hashlib._Hash"]] = None, + hasher=Hasher, # type: Callable[[], hashlib._Hash] buffersize: int = 1024 * 1024, ) -> str: """Compute checksums while copying a file.""" @@ -158,6 +162,35 @@ def checksum_copy( pass if os.path.exists(temp_location): os.rename(temp_location, dst_file.name) # type: ignore + + return content_processor(contents, src_file, dst_file, checksum, buffersize) + + +def checksum_only( + src_file: IO[Any], + dst_file: Optional[IO[Any]] = None, + hasher=Hasher, # type: Callable[[], hashlib._Hash] + buffersize: int = 1024 * 1024, +) -> str: + """Calculate the checksum only, does not copy the data files.""" + if dst_file is not None: + _logger.error("Destination file should be None but it is %s", dst_file) + """Compute checksums while copying a file.""" + # TODO: Use hashlib.new(Hasher_str) instead? + checksum = hasher() + contents = src_file.read(buffersize) + # TODO Could be a function for both checksum_only and checksum_copy? + return content_processor(contents, src_file, dst_file, checksum, buffersize) + + +def content_processor( + contents: Any, + src_file: IO[Any], + dst_file: Optional[IO[Any]], + checksum: "hashlib._Hash", + buffersize: int, +) -> str: + """Calculate the checksum based on the content.""" while contents != b"": if dst_file is not None: dst_file.write(contents) diff --git a/cwltool/cwlprov/provenance_profile.py b/cwltool/cwlprov/provenance_profile.py index ad019f3e5..289b9e6d0 100644 --- a/cwltool/cwlprov/provenance_profile.py +++ b/cwltool/cwlprov/provenance_profile.py @@ -29,7 +29,7 @@ from ..loghandler import _logger from ..process import Process, shortname from ..stdfsaccess import StdFsAccess -from ..utils import CWLObjectType, JobsType, get_listing, posix_path, versionstring +from ..utils import CWLObjectType, JobsType, posix_path, versionstring from ..workflow_job import WorkflowJob from .provenance_constants import ( ACCOUNT_UUID, @@ -243,6 +243,7 @@ def evaluate( # record provenance of workflow executions self.prospective_prov(job) customised_job = copy_job_order(job, job_order_object) + # Note to self: Listing goes ok here self.used_artefacts(customised_job, self.workflow_run_uri) def record_process_start( @@ -287,6 +288,7 @@ def record_process_end( process_run_id: str, outputs: Union[CWLObjectType, MutableSequence[CWLObjectType], None], when: datetime.datetime, + # load_listing: None, ) -> None: self.generate_output_prov(outputs, process_run_id, process_name) self.document.wasEndedBy(process_run_id, None, self.workflow_run_uri, when) @@ -408,8 +410,8 @@ def declare_directory(self, value: CWLObjectType) -> ProvEntity: # a later call to this method will sort that is_empty = True - if "listing" not in value: - get_listing(self.fsaccess, value) + # if "listing" not in value: + # get_listing(self.fsaccess, value) for entry in cast(MutableSequence[CWLObjectType], value.get("listing", [])): is_empty = False # Declare child-artifacts @@ -604,6 +606,7 @@ def used_artefacts( job_order: Union[CWLObjectType, List[CWLObjectType]], process_run_id: str, name: Optional[str] = None, + load_listing=None, ) -> None: """Add used() for each data artefact.""" if isinstance(job_order, list): diff --git a/cwltool/cwlprov/ro.py b/cwltool/cwlprov/ro.py index 3bcc9fddf..47089a8af 100644 --- a/cwltool/cwlprov/ro.py +++ b/cwltool/cwlprov/ro.py @@ -34,7 +34,15 @@ posix_path, versionstring, ) -from . import Aggregate, Annotation, AuthoredBy, _valid_orcid, _whoami, checksum_copy +from . import ( + Aggregate, + Annotation, + AuthoredBy, + _valid_orcid, + _whoami, + checksum_copy, + checksum_only, +) from .provenance_constants import ( ACCOUNT_UUID, CWLPROV_VERSION, @@ -66,6 +74,8 @@ def __init__( temp_prefix_ro: str = "tmp", orcid: str = "", full_name: str = "", + no_data: bool = False, + no_input: bool = False, ) -> None: """Initialize the ResearchObject.""" self.temp_prefix = temp_prefix_ro @@ -88,6 +98,8 @@ def __init__( self.cwltool_version = f"cwltool {versionstring().split()[-1]}" self.has_manifest = False self.relativised_input_object: CWLObjectType = {} + self.no_data = no_data + self.no_input = no_input self._initialize() _logger.debug("[provenance] Temporary research object: %s", self.folder) @@ -180,13 +192,22 @@ def add_tagfile(self, path: str, timestamp: Optional[datetime.datetime] = None) # Below probably OK for now as metadata files # are not too large..? - checksums[SHA1] = checksum_copy(tag_file, hasher=hashlib.sha1) + if self.no_input: + _logger.debug("NO INPUT DATA TO BE CAPTURED!!!") + + checksums[SHA1] = checksum_only(tag_file, hasher=hashlib.sha1) + tag_file.seek(0) + checksums[SHA256] = checksum_only(tag_file, hasher=hashlib.sha256) + tag_file.seek(0) + checksums[SHA512] = checksum_only(tag_file, hasher=hashlib.sha512) + else: + checksums[SHA1] = checksum_copy(tag_file, hasher=hashlib.sha1) - tag_file.seek(0) - checksums[SHA256] = checksum_copy(tag_file, hasher=hashlib.sha256) + tag_file.seek(0) + checksums[SHA256] = checksum_copy(tag_file, hasher=hashlib.sha256) - tag_file.seek(0) - checksums[SHA512] = checksum_copy(tag_file, hasher=hashlib.sha512) + tag_file.seek(0) + checksums[SHA512] = checksum_copy(tag_file, hasher=hashlib.sha512) rel_path = posix_path(os.path.relpath(path, self.folder)) self.tagfiles.add(rel_path) @@ -469,10 +490,14 @@ def add_data_file( content_type: Optional[str] = None, ) -> str: """Copy inputs to data/ folder.""" + # TODO Skip if no-input or no-data is used...? self.self_check() tmp_dir, tmp_prefix = os.path.split(self.temp_prefix) with tempfile.NamedTemporaryFile(prefix=tmp_prefix, dir=tmp_dir, delete=False) as tmp: - checksum = checksum_copy(from_fp, tmp) + if self.no_data: + checksum = checksum_only(from_fp) + else: + checksum = checksum_copy(from_fp, tmp) # Calculate hash-based file path folder = os.path.join(self.folder, DATA, checksum[0:2]) @@ -493,7 +518,12 @@ def add_data_file( _logger.warning("[provenance] Unknown hash method %s for bagit manifest", Hasher) # Inefficient, bagit support need to checksum again self._add_to_bagit(rel_path) - _logger.debug("[provenance] Added data file %s", path) + if "dir" in self.relativised_input_object: + _logger.debug( + "[provenance] Directory :%s", self.relativised_input_object["dir"]["basename"] + ) + else: + _logger.debug("[provenance] Added data file %s", path) if timestamp is not None: createdOn, createdBy = self._self_made(timestamp) self._file_provenance[rel_path] = cast( @@ -557,7 +587,10 @@ def _add_to_bagit(self, rel_path: str, **checksums: str) -> None: checksums = dict(checksums) with open(lpath, "rb") as file_path: # FIXME: Need sha-256 / sha-512 as well for Research Object BagIt profile? - checksums[SHA1] = checksum_copy(file_path, hasher=hashlib.sha1) + if self.data_option: + checksums[SHA1] = checksum_only(file_path, hasher=hashlib.sha1) + else: + checksums[SHA1] = checksum_copy(file_path, hasher=hashlib.sha1) self.add_to_manifest(rel_path, checksums) diff --git a/cwltool/executors.py b/cwltool/executors.py index 31c9b052c..2585daad5 100644 --- a/cwltool/executors.py +++ b/cwltool/executors.py @@ -172,7 +172,9 @@ def check_for_abstract_op(tool: CWLObjectType) -> None: ): process_run_id: Optional[str] = None name = "primary" - process.parent_wf.generate_output_prov(self.final_output[0], process_run_id, name) + process.parent_wf.generate_output_prov( + self.final_output[0], process_run_id, name + ) # Note to self... # , "generate_output_prov") process.parent_wf.document.wasEndedBy( process.parent_wf.workflow_run_uri, None, diff --git a/cwltool/job.py b/cwltool/job.py index f109e48ae..30aa42c2a 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -285,7 +285,10 @@ def _execute( and isinstance(job_order, (list, dict)) ): runtimeContext.prov_obj.used_artefacts( - job_order, runtimeContext.process_run_id, str(self.name) + job_order, + runtimeContext.process_run_id, + str(self.name), + load_listing=self.builder.loadListing, ) else: _logger.warning( @@ -411,6 +414,7 @@ def stderr_stdout_log_path( runtimeContext.process_run_id, outputs, datetime.datetime.now(), + # builder.loadListing # TODO FIX THIS ) if processStatus != "success": _logger.warning("[job %s] completed %s", self.name, processStatus) diff --git a/cwltool/main.py b/cwltool/main.py index 965f863a0..999e9f743 100755 --- a/cwltool/main.py +++ b/cwltool/main.py @@ -11,6 +11,7 @@ import signal import subprocess # nosec import sys +import tempfile import time import urllib import warnings @@ -693,6 +694,8 @@ def setup_provenance( temp_prefix_ro=args.tmpdir_prefix, orcid=args.orcid, full_name=args.cwl_full_name, + no_data=args.no_data, + no_input=args.no_input, ) runtimeContext.research_obj = ro log_file_io = open_log_file_for_activity(ro, ro.engine_uuid) @@ -1138,12 +1141,28 @@ def main( print(f"{args.workflow} is valid CWL.", file=stdout) return 0 - if args.print_rdf: + if args.print_rdf or args.provenance: + output = stdout + if args.provenance: + # Write workflow to temp directory + temp_workflow_dir = tempfile.TemporaryDirectory() + os.makedirs(temp_workflow_dir.name, exist_ok=True) + workflow_provenance = temp_workflow_dir.name + "/workflow.ttl" + # Sets up a turtle file for the workflow information + # (not yet in the provenance folder as it does + # not exist and creating it will give issues). + output = open(workflow_provenance, "w") + _logger.info("Writing workflow rdf to %s", workflow_provenance) print( printrdf(tool, loadingContext.loader.ctx, args.rdf_serializer), - file=stdout, + file=output, ) - return 0 + # close the output + if args.provenance: + output.close() + # Only print_rdf exits this way + if args.print_rdf: + return 0 if args.print_dot: printdot(tool, loadingContext.loader.ctx, stdout) diff --git a/tests/test_provenance.py b/tests/test_provenance.py index 83eb61c22..0f6542e91 100644 --- a/tests/test_provenance.py +++ b/tests/test_provenance.py @@ -206,6 +206,9 @@ def test_directory_workflow(tmp_path: Path) -> None: p = folder / "data" / prefix / l_hash assert p.is_file(), f"Could not find {letter} as {p}" + # List content + list_files(tmp_path) + @needs_docker def test_no_data_files(tmp_path: Path) -> None: @@ -787,3 +790,138 @@ def test_research_object() -> None: def test_research_object_picklability(research_object: ResearchObject) -> None: """Research object may need to be pickled (for Toil).""" assert pickle.dumps(research_object) is not None + + +# Function to list filestructure +def list_files(startpath): + startpath = str(startpath) + print("Root: ", startpath) + for root, _dirs, files in os.walk(startpath): + level = root.replace(startpath, "").count(os.sep) + indent = " " * 4 * (level) + print("{}{}/".format(indent, os.path.basename(root))) + subindent = " " * 4 * (level + 1) + for f in files: + print("{}{}".format(subindent, f)) + + +@needs_docker +def test_directory_workflow_no_listing(tmp_path: Path) -> None: + """ + This test will check for 3 files that should be there and 3 files that should not be there. + @param tmp_path: + """ + # TODO no data is currently manually set + data_option = "--no-input" + + sha1 = { + # Expected hashes of ASCII letters (no linefeed) + # as returned from: + # for x in a b c ; do echo -n $x | sha1sum ; done + "a": "86f7e437faa5a7fce15d1ddcb9eaeaea377667b8", + "b": "e9d71f5ee7c92d6dc9e92ffdad17b8bd49418f98", + "c": "84a516841ba77a5b4648de2cd0dfcb30ea46dbb4", + # Expected hashes of ASCII letters (no linefeed) + # as returned from: + # for x in d e f ; do echo -n $x | sha1sum ; done + "d": "3c363836cf4e16666669a25da280a1865c2d2874", + "e": "58e6b3a414a1e090dfc6029add0f3555ccba127f", + "f": "4a0a19218e082a343a1b17e5333409af9d98f0f5", + # Expected hashes of ASCII letters (no linefeed) + # as returned from: + # for x in g h i ; do echo -n $x | sha1sum ; done + "g": "54fd1711209fb1c0781092374132c66e79e2241b", + "h": "27d5482eebd075de44389774fce28c69f45c8a75", + "i": "042dc4512fa3d391c5170cf3aa61e6a638f84342", + } + + dir2 = tmp_path / "dir_deep_listing" + dir2.mkdir() + for x in "abc": + # Make test files with predictable hashes + with open(dir2 / x, "w", encoding="ascii") as f: + f.write(x) + + dir3 = tmp_path / "dir_no_listing" + dir3.mkdir() + + for x in "def": + # Make test files with predictable hashes + with open(dir3 / x, "w", encoding="ascii") as f: + f.write(x) + # Temporarily generate 10.000 files to test performance + for i in range(10000): + with open(dir3 / f"{x}_{i}", "w", encoding="ascii") as f: + f.write(x) + print("Created 10.000 files in dir_no_listing") + # list_files(dir3) + + dir4 = tmp_path / "dir_no_info" + dir4.mkdir() + + for x in "ghi": + # Make test files with predictable hashes + with open(dir4 / x, "w", encoding="ascii") as f: + f.write(x) + + # Run the workflow + folder = cwltool( + tmp_path, + # CWL Arguments + "--debug", + # No data argument based on boolean + data_option, + # Workflow arguments + get_data("tests/wf/directory_no_listing.cwl"), + "--dir_deep_listing", + str(dir2), + "--dir_no_listing", + str(dir3), + "--dir_no_info", + str(dir4), + ) + + # Visualize the path structure + # list_files(tmp_path) + + # Output should include ls stdout of filenames a b c on each line + file_list = ( + folder + / "data" + / "84" + / "84a516841ba77a5b4648de2cd0dfcb30ea46dbb4" + # checksum as returned from: + # echo -e "a\nb\nc" | sha1sum + # 3ca69e8d6c234a469d16ac28a4a658c92267c423 - + ) + + assert file_list.is_file() + + # Input files should be captured by hash value, + # even if they were inside a class: Directory + for f, file_hash in sha1.items(): + prefix = file_hash[:2] # first 2 letters + p = folder / "data" / prefix / file_hash + # File should be empty and in the future not existing... + # assert os.path.getsize(p.absolute()) == 0 + # To be discared when file really does not exist anymore + if f in ["d", "e", "f", "g", "h", "i"]: + print(f"Analysing file {f}") + assert not p.is_file(), f"Could find {f} as {p}" + else: + print(f"Analysing file {f}") + assert p.is_file(), f"Could not find {f} as {p}" + + +def cwltool_no_data(tmp_path: Path, *args: Any) -> Path: + prov_folder = tmp_path / "provenance" + prov_folder.mkdir() + new_args = ["--enable-ext", "--no-data", "--provenance", str(prov_folder)] + new_args.extend(args) + # Run within a temporary directory to not pollute git checkout + tmp_dir = tmp_path / "cwltool-run" + tmp_dir.mkdir() + with working_directory(tmp_dir): + status = main(new_args) + assert status == 0, f"Failed: cwltool.main({args})" + return prov_folder diff --git a/tests/wf/directory_no_listing.cwl b/tests/wf/directory_no_listing.cwl new file mode 100644 index 000000000..f261361e1 --- /dev/null +++ b/tests/wf/directory_no_listing.cwl @@ -0,0 +1,79 @@ +#!/usr/bin/env cwl-runner +cwlVersion: v1.2 +class: Workflow + +doc: > + Inspect provided directory and return filenames. + Generate a new directory and return it (including content). + +hints: + - class: DockerRequirement + dockerPull: docker.io/debian:stable-slim + +inputs: + dir_deep_listing: + type: Directory + loadListing: deep_listing + dir_no_listing: + type: Directory + loadListing: no_listing + dir_no_info: + type: Directory + + +steps: + ls: + in: + dir: dir_deep_listing + ignore: dir_no_listing + out: + [listing] + run: + class: CommandLineTool + baseCommand: ls + inputs: + dir: + type: Directory + inputBinding: + position: 1 + ignore: + type: Directory + inputBinding: + position: 2 + outputs: + listing: + type: stdout + + generate: + in: [] + out: + [dir1] + run: + class: CommandLineTool + requirements: + ShellCommandRequirement: {} + LoadListingRequirement: + loadListing: deep_listing + + arguments: + - shellQuote: false + valueFrom: > + pwd; + mkdir -p dir1/a/b; + echo -n a > dir1/a.txt; + echo -n b > dir1/a/b.txt; + echo -n c > dir1/a/b/c.txt; + inputs: [] + outputs: + dir1: + type: Directory + outputBinding: + glob: "dir1" + +outputs: + output_1: + type: File + outputSource: ls/listing + output_2: + type: Directory + outputSource: generate/dir1