diff --git a/cwltool/argparser.py b/cwltool/argparser.py index 5439d2bd2..90629d5a1 100644 --- a/cwltool/argparser.py +++ b/cwltool/argparser.py @@ -245,6 +245,13 @@ def arg_parser() -> argparse.ArgumentParser: help="Record user account info as part of provenance.", dest="user_provenance", ) + provgroup.add_argument( + "--no-data", + default=False, + action="store_true", + help="Disables the storage of input and output data files of the workflow in the provenance data folder", + dest="no_data", + ) provgroup.add_argument( "--disable-user-provenance", default=False, @@ -378,6 +385,7 @@ def arg_parser() -> argparse.ArgumentParser: volumegroup = parser.add_mutually_exclusive_group() volumegroup.add_argument("--verbose", action="store_true", help="Default logging") + volumegroup.add_argument("--no-warnings", action="store_true", help="Only print errors.") volumegroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.") volumegroup.add_argument("--debug", action="store_true", help="Print even more logging") diff --git a/cwltool/cwlprov/provenance_profile.py b/cwltool/cwlprov/provenance_profile.py index ad019f3e5..bd1c2fc02 100644 --- a/cwltool/cwlprov/provenance_profile.py +++ b/cwltool/cwlprov/provenance_profile.py @@ -20,10 +20,12 @@ cast, ) -from prov.identifier import Identifier, QualifiedName +from prov.identifier import Identifier, Namespace, QualifiedName from prov.model import PROV, PROV_LABEL, PROV_TYPE, PROV_VALUE, ProvDocument, ProvEntity from schema_salad.sourceline import SourceLine +import cwltool.workflow + from ..errors import WorkflowException from ..job import CommandLineJob, JobBase from ..loghandler import _logger @@ -55,7 +57,7 @@ from .ro import ResearchObject -def copy_job_order(job: Union[Process, JobsType], job_order_object: CWLObjectType) -> CWLObjectType: +def copy_job_order(job: Union[Process, JobsType], job_order_object: CWLObjectType, process) -> CWLObjectType: """Create copy of job object for provenance.""" if not isinstance(job, WorkflowJob): # direct command line tool execution @@ -63,12 +65,41 @@ def copy_job_order(job: Union[Process, JobsType], job_order_object: CWLObjectTyp customised_job: CWLObjectType = {} # new job object for RO debug = _logger.isEnabledFor(logging.DEBUG) + # Process the process object first + load_listing = {} + + # Implementation to capture the loadlisting from cwl to skip the inclusion of for example files of big database + # folders + for index, entry in enumerate(process.inputs_record_schema["fields"]): + if ( + entry["type"] == "org.w3id.cwl.cwl.Directory" + and "loadListing" in entry + and entry["loadListing"] + ): + load_listing[entry["name"]] = entry["loadListing"] + + # print("LOAD LISTING: ", load_listing) + # PROCESS:Workflow: file:///Users/jasperk/gitlab/cwltool/tests/wf/directory_no_listing.cwl + # print("PROCESS:" + str(process)) + for each, i in enumerate(job.tool["inputs"]): with SourceLine(job.tool["inputs"], each, WorkflowException, debug): iid = shortname(i["id"]) + # if iid in the load listing object and no_listing then.... if iid in job_order_object: - customised_job[iid] = copy.deepcopy(job_order_object[iid]) - # add the input element in dictionary for provenance + if iid in load_listing: + if load_listing[iid] == "no_listing": + _logger.warning("Skip listing of " + iid) + job_order_object[iid]["loadListing"] = "no_listing" + job_order_object[iid]["listing"] = [] + customised_job[iid] = job_order_object[iid] + else: + # Normal deep copy + customised_job[iid] = copy.deepcopy(job_order_object[iid]) + # TODO Other listing options here? + else: + # add the input element in dictionary for provenance + customised_job[iid] = copy.deepcopy(job_order_object[iid]) elif "default" in i: customised_job[iid] = copy.deepcopy(i["default"]) # add the default elements in the dictionary for provenance @@ -236,31 +267,42 @@ def evaluate( if not hasattr(process, "steps"): # record provenance of independent commandline tool executions self.prospective_prov(job) - customised_job = copy_job_order(job, job_order_object) + customised_job = copy_job_order(job, job_order_object, process) self.used_artefacts(customised_job, self.workflow_run_uri) create_job(research_obj, customised_job) elif hasattr(job, "workflow"): # record provenance of workflow executions self.prospective_prov(job) - customised_job = copy_job_order(job, job_order_object) - self.used_artefacts(customised_job, self.workflow_run_uri) + customised_job = copy_job_order(job, job_order_object, process) + self.used_artefacts( + customised_job, self.workflow_run_uri, schema=process.inputs_record_schema + ) def record_process_start( self, process: Process, job: JobsType, process_run_id: Optional[str] = None ) -> Optional[str]: if not hasattr(process, "steps"): process_run_id = self.workflow_run_uri - elif not hasattr(job, "workflow"): + elif not hasattr(job, "workflow") and isinstance(process, cwltool.workflow.Workflow): # commandline tool execution as part of workflow name = "" if isinstance(job, (CommandLineJob, JobBase, WorkflowJob)): name = job.name process_name = urllib.parse.quote(name, safe=":/,#") - process_run_id = self.start_process(process_name, datetime.datetime.now()) + # Iterator as step is not always 1, check with process_name to find the correct step.id + step = None + for step in process.steps: + if step.id.endswith("#" + process_name): + break + if step is None: + raise Exception("No / wrong step detected...!") + + process_run_id = self.start_process(step.id, process_name, datetime.datetime.now()) return process_run_id def start_process( self, + step_id: str, # The ID of the step involved process_name: str, when: datetime.datetime, process_run_id: Optional[str] = None, @@ -269,12 +311,32 @@ def start_process( if process_run_id is None: process_run_id = uuid.uuid4().urn prov_label = "Run of workflow/packed.cwl#main/" + process_name - self.document.activity( - process_run_id, - None, - None, - {PROV_TYPE: WFPROV["ProcessRun"], PROV_LABEL: prov_label}, - ) + # TESTING to include the Steps URI so linking to --print-rdf becomes possible + FILE_PATH = None + WORKFLOW_STEP = None + # Not sure if steps is always 1 element so a step name check including the # is performed + if step_id.endswith("#" + process_name): + # Temp import maybe there is another way to create the URI's ? + # Looked at --print-rdf for a possible URI + WORKFLOW = Namespace("Workflow", "https://w3id.org/cwl/cwl#Workflow/") + WORKFLOW_STEP = WORKFLOW["steps"] + # Was not sure how to create a URI without a namespace + FILE = Namespace("", "") + # The entire file://....#step path + FILE_PATH = FILE[step_id] + + # Added the WORKFLOW_STEP and FILE_PATH to the object + self.document.activity( + process_run_id, + None, + None, + { + PROV_TYPE: WFPROV["ProcessRun"], + PROV_LABEL: prov_label, + WORKFLOW_STEP: FILE_PATH, + }, + ) + self.document.wasAssociatedWith( process_run_id, self.engine_uuid, str("wf:main/" + process_name) ) @@ -287,11 +349,15 @@ def record_process_end( process_run_id: str, outputs: Union[CWLObjectType, MutableSequence[CWLObjectType], None], when: datetime.datetime, + load_listing: str = "invalid_listing", ) -> None: - self.generate_output_prov(outputs, process_run_id, process_name) + self.generate_output_prov(outputs, process_run_id, process_name, load_listing) self.document.wasEndedBy(process_run_id, None, self.workflow_run_uri, when) - def declare_file(self, value: CWLObjectType) -> Tuple[ProvEntity, ProvEntity, str]: + def declare_file( + self, value: CWLObjectType, load_listing: str = "invalid_listing" + ) -> Tuple[ProvEntity, ProvEntity, str]: + _logger.debug("What listing? " + load_listing) if value["class"] != "File": raise ValueError("Must have class:File: %s" % value) # Need to determine file hash aka RO filename @@ -345,9 +411,11 @@ def declare_file(self, value: CWLObjectType) -> Tuple[ProvEntity, ProvEntity, st for sec in cast(MutableSequence[CWLObjectType], value.get("secondaryFiles", [])): # TODO: Record these in a specializationOf entity with UUID? if sec["class"] == "File": - (sec_entity, _, _) = self.declare_file(sec) + _logger.debug("447: " + load_listing) + (sec_entity, _, _) = self.declare_file(sec, load_listing) elif sec["class"] == "Directory": - sec_entity = self.declare_directory(sec) + _logger.debug("450: " + load_listing) + sec_entity = self.declare_directory(sec, load_listing) else: raise ValueError(f"Got unexpected secondaryFiles value: {sec}") # We don't know how/when/where the secondary file was generated, @@ -362,7 +430,9 @@ def declare_file(self, value: CWLObjectType) -> Tuple[ProvEntity, ProvEntity, st return file_entity, entity, checksum - def declare_directory(self, value: CWLObjectType) -> ProvEntity: + def declare_directory( + self, value: CWLObjectType, load_listing: str = "invalid_listing" + ) -> ProvEntity: """Register any nested files/directories.""" # FIXME: Calculate a hash-like identifier for directory # so we get same value if it's the same filenames/hashes @@ -408,12 +478,23 @@ def declare_directory(self, value: CWLObjectType) -> ProvEntity: # a later call to this method will sort that is_empty = True + # if value['basename'] == "dirIgnore": + # pass if "listing" not in value: - get_listing(self.fsaccess, value) + if load_listing == "no_listing": + pass + elif load_listing == "deep_listing": + get_listing(self.fsaccess, value) + elif load_listing == "shallow_listing": + get_listing(self.fsaccess, value, False) + else: + raise ValueError("Invalid listing value: %s", load_listing) + for entry in cast(MutableSequence[CWLObjectType], value.get("listing", [])): is_empty = False # Declare child-artifacts - entity = self.declare_artefact(entry) + _logger.debug("523: " + load_listing) + entity = self.declare_artefact(entry, load_listing) self.document.membership(coll, entity) # Membership relation aka our ORE Proxy m_id = uuid.uuid4().urn @@ -481,7 +562,7 @@ def declare_string(self, value: str) -> Tuple[ProvEntity, str]: ) return entity, checksum - def declare_artefact(self, value: Any) -> ProvEntity: + def declare_artefact(self, value: Any, load_listing: str = "invalid_listing") -> ProvEntity: """Create data artefact entities for all file objects.""" if value is None: # FIXME: If this can happen in CWL, we'll @@ -523,12 +604,13 @@ def declare_artefact(self, value: Any) -> ProvEntity: # Base case - we found a File we need to update if value.get("class") == "File": - (entity, _, _) = self.declare_file(value) + _logger.debug("635: " + load_listing) + (entity, _, _) = self.declare_file(value, load_listing) value["@id"] = entity.identifier.uri return entity if value.get("class") == "Directory": - entity = self.declare_directory(value) + entity = self.declare_directory(value, load_listing) value["@id"] = entity.identifier.uri return entity coll_id = value.setdefault("@id", uuid.uuid4().urn) @@ -570,7 +652,7 @@ def declare_artefact(self, value: Any) -> ProvEntity: members = [] for each_input_obj in iter(value): # Recurse and register any nested objects - e = self.declare_artefact(each_input_obj) + e = self.declare_artefact(each_input_obj, load_listing) members.append(e) # If we reached this, then we were allowed to iterate @@ -604,11 +686,16 @@ def used_artefacts( job_order: Union[CWLObjectType, List[CWLObjectType]], process_run_id: str, name: Optional[str] = None, + schema: Any = None, + load_listing: Optional[str] = None, ) -> None: """Add used() for each data artefact.""" if isinstance(job_order, list): for entry in job_order: - self.used_artefacts(entry, process_run_id, name) + # for field in schema.fields: + # if field['name'] == entry. + # load_listing = schema.fields + self.used_artefacts(entry, process_run_id, name, load_listing) else: # FIXME: Use workflow name in packed.cwl, "main" is wrong for nested workflows base = "main" @@ -616,8 +703,19 @@ def used_artefacts( base += "/" + name for key, value in job_order.items(): prov_role = self.wf_ns[f"{base}/{key}"] + if not load_listing: + load_listing = "deep_listing" + for field in schema["fields"]: + if field["name"] == key: + if "loadListing" in field: + load_listing = field["loadListing"] + break + else: + # Need to find a way to reproduce this + _logger.warning("No loadListing info in object") + load_listing = "no_listing" try: - entity = self.declare_artefact(value) + entity = self.declare_artefact(value, load_listing) self.document.used( process_run_id, entity, @@ -633,11 +731,12 @@ def generate_output_prov( final_output: Union[CWLObjectType, MutableSequence[CWLObjectType], None], process_run_id: Optional[str], name: Optional[str], + load_listing: str = "invalid_listing", ) -> None: """Call wasGeneratedBy() for each output,copy the files into the RO.""" if isinstance(final_output, MutableSequence): for entry in final_output: - self.generate_output_prov(entry, process_run_id, name) + self.generate_output_prov(entry, process_run_id, name, load_listing) elif final_output is not None: # Timestamp should be created at the earliest timestamp = datetime.datetime.now() @@ -646,7 +745,7 @@ def generate_output_prov( # entity (UUID) and document it as generated in # a role corresponding to the output for output, value in final_output.items(): - entity = self.declare_artefact(value) + entity = self.declare_artefact(value, load_listing) if name is not None: name = urllib.parse.quote(str(name), safe=":/,#") # FIXME: Probably not "main" in nested workflows diff --git a/cwltool/cwlprov/ro.py b/cwltool/cwlprov/ro.py index 3bcc9fddf..62b8cd61c 100644 --- a/cwltool/cwlprov/ro.py +++ b/cwltool/cwlprov/ro.py @@ -66,6 +66,7 @@ def __init__( temp_prefix_ro: str = "tmp", orcid: str = "", full_name: str = "", + no_data: bool = False, ) -> None: """Initialize the ResearchObject.""" self.temp_prefix = temp_prefix_ro @@ -92,6 +93,9 @@ def __init__( self._initialize() _logger.debug("[provenance] Temporary research object: %s", self.folder) + # No data option + self.no_data = False + def self_check(self) -> None: """Raise ValueError if this RO is closed.""" if self.closed: @@ -179,14 +183,18 @@ def add_tagfile(self, path: str, timestamp: Optional[datetime.datetime] = None) # adding checksums after closing. # Below probably OK for now as metadata files # are not too large..? - - checksums[SHA1] = checksum_copy(tag_file, hasher=hashlib.sha1) - - tag_file.seek(0) - checksums[SHA256] = checksum_copy(tag_file, hasher=hashlib.sha256) - - tag_file.seek(0) - checksums[SHA512] = checksum_copy(tag_file, hasher=hashlib.sha512) + if self.no_data: + checksums[SHA1] = checksum_only(tag_file, hasher=hashlib.sha1) + tag_file.seek(0) + checksums[SHA256] = checksum_only(tag_file, hasher=hashlib.sha256) + tag_file.seek(0) + checksums[SHA512] = checksum_only(tag_file, hasher=hashlib.sha512) + else: + checksums[SHA1] = checksum_copy(tag_file, hasher=hashlib.sha1) + tag_file.seek(0) + checksums[SHA256] = checksum_copy(tag_file, hasher=hashlib.sha256) + tag_file.seek(0) + checksums[SHA512] = checksum_copy(tag_file, hasher=hashlib.sha512) rel_path = posix_path(os.path.relpath(path, self.folder)) self.tagfiles.add(rel_path) @@ -468,12 +476,16 @@ def add_data_file( timestamp: Optional[datetime.datetime] = None, content_type: Optional[str] = None, ) -> str: + # TODO only when --no-data is not used! """Copy inputs to data/ folder.""" self.self_check() tmp_dir, tmp_prefix = os.path.split(self.temp_prefix) with tempfile.NamedTemporaryFile(prefix=tmp_prefix, dir=tmp_dir, delete=False) as tmp: - checksum = checksum_copy(from_fp, tmp) - + # TODO this should depend on the arguments + if self.no_data: + checksum = checksum_only(from_fp) + else: + checksum = checksum_copy(from_fp, tmp) # Calculate hash-based file path folder = os.path.join(self.folder, DATA, checksum[0:2]) path = os.path.join(folder, checksum) @@ -484,6 +496,7 @@ def add_data_file( os.rename(tmp.name, path) # Relative posix path + # TODO only when no-data is False?... rel_path = posix_path(os.path.relpath(path, self.folder)) # Register in bagit checksum @@ -493,6 +506,14 @@ def add_data_file( _logger.warning("[provenance] Unknown hash method %s for bagit manifest", Hasher) # Inefficient, bagit support need to checksum again self._add_to_bagit(rel_path) + if "dir" in self.relativised_input_object: + _logger.debug( + "[provenance] Directory :%s", self.relativised_input_object["dir"]["basename"] + ) + else: + _logger.debug("[provenance] File: %s", str(from_fp)) + # If debug is enabled? + # _logger.debug(traceback.print_stack()) _logger.debug("[provenance] Added data file %s", path) if timestamp is not None: createdOn, createdBy = self._self_made(timestamp) @@ -557,7 +578,10 @@ def _add_to_bagit(self, rel_path: str, **checksums: str) -> None: checksums = dict(checksums) with open(lpath, "rb") as file_path: # FIXME: Need sha-256 / sha-512 as well for Research Object BagIt profile? - checksums[SHA1] = checksum_copy(file_path, hasher=hashlib.sha1) + if self.no_data: + checksums[SHA1] = checksum_only(file_path, hasher=hashlib.sha1) + else: + checksums[SHA1] = checksum_copy(file_path, hasher=hashlib.sha1) self.add_to_manifest(rel_path, checksums) @@ -612,3 +636,96 @@ def _relativise_files( for obj in structure: # Recurse and rewrite any nested File objects self._relativise_files(cast(CWLOutputType, obj)) + + def close(self, save_to: Optional[str] = None) -> None: + """Close the Research Object, optionally saving to specified folder. + + Closing will remove any temporary files used by this research object. + After calling this method, this ResearchObject instance can no longer + be used, except for no-op calls to .close(). + + The 'saveTo' folder should not exist - if it does, it will be deleted. + + It is safe to call this function multiple times without the + 'saveTo' argument, e.g. within a try..finally block to + ensure the temporary files of this Research Object are removed. + """ + if save_to is None: + if not self.closed: + _logger.debug("[provenance] Deleting temporary %s", self.folder) + shutil.rmtree(self.folder, ignore_errors=True) + else: + save_to = os.path.abspath(save_to) + _logger.info("[provenance] Finalizing Research Object") + self._finalize() # write manifest etc. + # TODO: Write as archive (.zip or .tar) based on extension? + + if os.path.isdir(save_to): + _logger.info("[provenance] Deleting existing %s", save_to) + shutil.rmtree(save_to) + shutil.move(self.folder, save_to) + _logger.info("[provenance] Research Object saved to %s", save_to) + self.folder = save_to + self.closed = True + + +def checksum_copy( + src_file: IO[Any], + dst_file: Optional[IO[Any]] = None, + hasher=Hasher, # type: Callable[[], hashlib._Hash] + buffersize: int = 1024 * 1024, +) -> str: + """Compute checksums while copying a file.""" + # TODO: Use hashlib.new(Hasher_str) instead? + checksum = hasher() + contents = src_file.read(buffersize) + if dst_file and hasattr(dst_file, "name") and hasattr(src_file, "name"): + temp_location = os.path.join(os.path.dirname(dst_file.name), str(uuid.uuid4())) + try: + os.rename(dst_file.name, temp_location) + os.link(src_file.name, dst_file.name) + dst_file = None + os.unlink(temp_location) + except OSError: + pass + if os.path.exists(temp_location): + os.rename(temp_location, dst_file.name) # type: ignore + + return content_processor(contents, src_file, dst_file, checksum, buffersize) + + +def content_processor( + contents: Any, + src_file: IO[Any], + dst_file: Optional[IO[Any]], + checksum: "hashlib._Hash", + buffersize: int, +) -> str: + """Calculate the checksum based on the content.""" + while contents != b"": + if dst_file is not None: + dst_file.write(contents) + checksum.update(contents) + contents = src_file.read(buffersize) + if dst_file is not None: + dst_file.flush() + return checksum.hexdigest().lower() + + +def checksum_only( + src_file: IO[Any], + dst_file: Optional[IO[Any]] = None, + hasher=Hasher, # type: Callable[[], hashlib._Hash] + buffersize: int = 1024 * 1024, +) -> str: + """Calculate the checksum only, does not copy the data files.""" + if dst_file is not None: + _logger.error("Destination file should be None but it is %s", dst_file) + + """Compute checksums while copying a file.""" + # TODO: Use hashlib.new(Hasher_str) instead? + checksum = hasher() + contents = src_file.read(buffersize) + + # TODO Could be a function for both checksum_only and checksum_copy? + return content_processor(contents, src_file, dst_file, checksum, buffersize) diff --git a/cwltool/job.py b/cwltool/job.py index 9d257e083..05e14900c 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -285,7 +285,10 @@ def _execute( and isinstance(job_order, (list, dict)) ): runtimeContext.prov_obj.used_artefacts( - job_order, runtimeContext.process_run_id, str(self.name) + job_order, + runtimeContext.process_run_id, + str(self.name), + load_listing=self.builder.loadListing, ) else: _logger.warning( @@ -411,6 +414,7 @@ def stderr_stdout_log_path( runtimeContext.process_run_id, outputs, datetime.datetime.now(), + builder.loadListing, # TODO FIX THIS ) if processStatus != "success": _logger.warning("[job %s] completed %s", self.name, processStatus) diff --git a/cwltool/loghandler.py b/cwltool/loghandler.py index c1f451991..7ef71206c 100644 --- a/cwltool/loghandler.py +++ b/cwltool/loghandler.py @@ -11,6 +11,7 @@ def configure_logging( stderr_handler: logging.Handler, + no_warnings: bool, quiet: bool, debug: bool, enable_color: bool, @@ -21,6 +22,8 @@ def configure_logging( rdflib_logger = logging.getLogger("rdflib.term") rdflib_logger.addHandler(stderr_handler) rdflib_logger.setLevel(logging.ERROR) + if no_warnings: + stderr_handler.setLevel(logging.ERROR) if quiet: # Silence STDERR, not an eventual provenance log file stderr_handler.setLevel(logging.WARN) diff --git a/cwltool/main.py b/cwltool/main.py index 965f863a0..dadb87bfa 100755 --- a/cwltool/main.py +++ b/cwltool/main.py @@ -11,10 +11,10 @@ import signal import subprocess # nosec import sys +import tempfile import time import urllib import warnings -from codecs import getwriter from typing import ( IO, Any, @@ -693,7 +693,9 @@ def setup_provenance( temp_prefix_ro=args.tmpdir_prefix, orcid=args.orcid, full_name=args.cwl_full_name, + no_data=args.no_data, ) + runtimeContext.research_obj = ro log_file_io = open_log_file_for_activity(ro, ro.engine_uuid) prov_log_handler = logging.StreamHandler(log_file_io) @@ -1013,6 +1015,7 @@ def main( configure_logging( stderr_handler, + args.no_warnings, args.quiet, runtimeContext.debug, args.enable_color, @@ -1138,12 +1141,27 @@ def main( print(f"{args.workflow} is valid CWL.", file=stdout) return 0 - if args.print_rdf: + if args.print_rdf or args.provenance: + output = stdout + if args.provenance: + # Write workflow to temp directory + temp_workflow_dir = tempfile.TemporaryDirectory() + os.makedirs(temp_workflow_dir.name, exist_ok=True) + workflow_provenance = temp_workflow_dir.name + "/workflow.ttl" + # Sets up a turtle file for the workflow information (not yet in the provenance folder as it does + # not exist and creating it will give issues). + output = open(workflow_provenance, "w") + _logger.info("Writing workflow rdf to %s", workflow_provenance) print( printrdf(tool, loadingContext.loader.ctx, args.rdf_serializer), - file=stdout, + file=output, ) - return 0 + # close the output + if args.provenance: + output.close() + # Only print_rdf exits this way + if args.print_rdf: + return 0 if args.print_dot: printdot(tool, loadingContext.loader.ctx, stdout) diff --git a/cwltool/utils.py b/cwltool/utils.py index 25d6c8873..a81caf8e5 100644 --- a/cwltool/utils.py +++ b/cwltool/utils.py @@ -61,6 +61,7 @@ processes_to_kill: Deque["subprocess.Popen[str]"] = collections.deque() + CWLOutputAtomType = Union[ None, bool, @@ -95,7 +96,8 @@ ScatterOutputCallbackType = Callable[[Optional[ScatterDestinationsType], str], None] SinkType = Union[CWLOutputType, CWLObjectType] DirectoryType = TypedDict( - "DirectoryType", {"class": str, "listing": List[CWLObjectType], "basename": str} + "DirectoryType", + {"class": str, "listing": List[CWLObjectType], "basename": str, "loadListing": str}, ) JSONAtomType = Union[Dict[str, Any], List[Any], str, int, float, bool, None] JSONType = Union[Dict[str, JSONAtomType], List[JSONAtomType], str, int, float, bool, None] diff --git a/cwltool/workflow.py b/cwltool/workflow.py index 8546ca72e..d19c403e5 100644 --- a/cwltool/workflow.py +++ b/cwltool/workflow.py @@ -446,6 +446,7 @@ def job( self.embedded_tool.parent_wf = self.prov_obj process_name = self.tool["id"].split("#")[1] self.prov_obj.start_process( + self.id, process_name, datetime.datetime.now(), self.embedded_tool.provenance_object.workflow_run_uri, diff --git a/mypy-stubs/prov/model.pyi b/mypy-stubs/prov/model.pyi index ee2688a4d..cc768a6b4 100644 --- a/mypy-stubs/prov/model.pyi +++ b/mypy-stubs/prov/model.pyi @@ -3,7 +3,6 @@ from typing import IO, Any, Dict, Iterable, List, Set, Tuple from _typeshed import Incomplete from prov.constants import * - # from prov import Error as Error, serializers as serializers from prov.identifier import Identifier, Namespace, QualifiedName diff --git a/mypy-stubs/rdflib/graph.pyi b/mypy-stubs/rdflib/graph.pyi index d3e6f2f54..c71cea037 100644 --- a/mypy-stubs/rdflib/graph.pyi +++ b/mypy-stubs/rdflib/graph.pyi @@ -106,7 +106,6 @@ class Graph(Node): ) -> Any: ... def namespaces(self) -> Iterator[Tuple[Any, Any]]: ... def absolutize(self, uri: Any, defrag: int = ...) -> Any: ... - # no destination and non-None positional encoding @overload def serialize( @@ -117,7 +116,6 @@ class Graph(Node): encoding: str, **args: Any, ) -> bytes: ... - # no destination and non-None keyword encoding @overload def serialize( @@ -129,7 +127,6 @@ class Graph(Node): encoding: str, **args: Any, ) -> bytes: ... - # no destination and None encoding @overload def serialize( @@ -140,7 +137,6 @@ class Graph(Node): encoding: None = ..., **args: Any, ) -> str: ... - # non-None destination @overload def serialize( @@ -151,7 +147,6 @@ class Graph(Node): encoding: Optional[str] = ..., **args: Any, ) -> "Graph": ... - # fallback @overload def serialize( diff --git a/tests/test_js_sandbox.py b/tests/test_js_sandbox.py index 9739c77a7..98b4cc115 100644 --- a/tests/test_js_sandbox.py +++ b/tests/test_js_sandbox.py @@ -22,7 +22,7 @@ ("v7.7.3\n", True), ] -configure_logging(_logger.handlers[-1], False, True, True, True) +configure_logging(_logger.handlers[-1], False, False, True, True, True) _logger.setLevel(logging.DEBUG) diff --git a/tests/test_load_tool.py b/tests/test_load_tool.py index 3d0cba161..f291d2985 100644 --- a/tests/test_load_tool.py +++ b/tests/test_load_tool.py @@ -15,7 +15,7 @@ from .util import get_data -configure_logging(_logger.handlers[-1], False, True, True, True) +configure_logging(_logger.handlers[-1], False, False, True, True, True) _logger.setLevel(logging.DEBUG) diff --git a/tests/test_provenance.py b/tests/test_provenance.py index 83eb61c22..10e590ab7 100644 --- a/tests/test_provenance.py +++ b/tests/test_provenance.py @@ -47,6 +47,20 @@ def cwltool(tmp_path: Path, *args: Any) -> Path: return prov_folder +def cwltool_no_data(tmp_path: Path, *args: Any) -> Path: + prov_folder = tmp_path / "provenance" + prov_folder.mkdir() + new_args = ["--enable-ext", "--no-data", "--provenance", str(prov_folder)] + new_args.extend(args) + # Run within a temporary directory to not pollute git checkout + tmp_dir = tmp_path / "cwltool-run" + tmp_dir.mkdir() + with working_directory(tmp_dir): + status = main(new_args) + assert status == 0, f"Failed: cwltool.main({args})" + return prov_folder + + @needs_docker def test_hello_workflow(tmp_path: Path) -> None: check_provenance( @@ -207,6 +221,99 @@ def test_directory_workflow(tmp_path: Path) -> None: assert p.is_file(), f"Could not find {letter} as {p}" +@needs_docker +def test_directory_workflow_no_listing(tmp_path: Path) -> None: + """ + This test will check for 3 files that should be there and 3 files that should not be there. + @param tmp_path: + """ + dir2 = tmp_path / "dir_deep_listing" + dir2.mkdir() + sha1 = { + # Expected hashes of ASCII letters (no linefeed) + # as returned from: + # for x in a b c ; do echo -n $x | sha1sum ; done + "a": "86f7e437faa5a7fce15d1ddcb9eaeaea377667b8", + "b": "e9d71f5ee7c92d6dc9e92ffdad17b8bd49418f98", + "c": "84a516841ba77a5b4648de2cd0dfcb30ea46dbb4", + } + for x in "abc": + # Make test files with predictable hashes + with open(dir2 / x, "w", encoding="ascii") as f: + f.write(x) + + dir3 = tmp_path / "dir_no_listing" + dir3.mkdir() + sha1 = { + # Expected hashes of ASCII letters (no linefeed) + # as returned from: + # for x in d e f ; do echo -n $x | sha1sum ; done + "d": "3c363836cf4e16666669a25da280a1865c2d2874", + "e": "58e6b3a414a1e090dfc6029add0f3555ccba127f", + "f": "4a0a19218e082a343a1b17e5333409af9d98f0f5", + } + for x in "def": + # Make test files with predictable hashes + with open(dir3 / x, "w", encoding="ascii") as f: + f.write(x) + + dir4 = tmp_path / "dir_no_info" + dir4.mkdir() + sha1 = { + # Expected hashes of ASCII letters (no linefeed) + # as returned from: + # for x in g h i ; do echo -n $x | sha1sum ; done + "g": "54fd1711209fb1c0781092374132c66e79e2241b", + "h": "27d5482eebd075de44389774fce28c69f45c8a75", + "i": "042dc4512fa3d391c5170cf3aa61e6a638f84342", + } + for x in "def": + # Make test files with predictable hashes + with open(dir4 / x, "w", encoding="ascii") as f: + f.write(x) + + folder = cwltool( + tmp_path, + get_data("tests/wf/directory_no_listing.cwl"), + "--dir", + str(dir2), + "--ignore", + str(dir3), + "--ignore_no_info", + str(dir4), + ) + # check invert? as there should be no data in there + # check_provenance(folder, directory=True) + + # Output should include ls stdout of filenames a b c on each line + file_list = ( + folder + / "data" + / "84" + / "84a516841ba77a5b4648de2cd0dfcb30ea46dbb4" + # checksum as returned from: + # echo -e "a\nb\nc" | sha1sum + # 3ca69e8d6c234a469d16ac28a4a658c92267c423 - + ) + # File should be empty and in the future not existing... + # print("FILE LIST: ", file_list.absolute()) + # assert os.path.getsize(file_list.absolute()) == 0 + # To be discared when file really does not exist anymore + assert file_list.is_file() + + # Input files should be captured by hash value, + # even if they were inside a class: Directory + for l, l_hash in sha1.items(): + prefix = l_hash[:2] # first 2 letters + p = folder / "data" / prefix / l_hash + # File should be empty and in the future not existing... + # assert os.path.getsize(p.absolute()) == 0 + # To be discared when file really does not exist anymore + if l not in ["d", "e", "f", "g", "h", "i"]: + print("Analysing file %s", l) + assert p.is_file(), f"Could not find {l} as {p}" + + @needs_docker def test_no_data_files(tmp_path: Path) -> None: folder = cwltool( diff --git a/tests/wf/directory_no_listing.cwl b/tests/wf/directory_no_listing.cwl new file mode 100755 index 000000000..a8f32fca5 --- /dev/null +++ b/tests/wf/directory_no_listing.cwl @@ -0,0 +1,80 @@ +#!/usr/bin/env cwl-runner +cwlVersion: v1.2 +class: Workflow + +doc: > + Inspect provided directory and return filenames. + Generate a new directory and return it (including content). + +hints: + - class: DockerRequirement + dockerPull: docker.io/debian:stable-slim + +inputs: + dir: + type: Directory + loadListing: deep_listing + ignore: + type: Directory + loadListing: no_listing + ignore_no_info: + type: Directory + + +steps: + ls: + in: + dir: dir + ignore: ignore + out: + [listing] + run: + class: CommandLineTool + baseCommand: ls + inputs: + dir: + type: Directory + inputBinding: + position: 1 + ignore: + type: Directory + inputBinding: + position: 2 + outputs: + listing: + type: stdout + + generate: + in: [] + out: + [dir1] + run: + class: CommandLineTool + requirements: + ShellCommandRequirement: {} + LoadListingRequirement: + loadListing: deep_listing + + arguments: + - shellQuote: false + valueFrom: > + pwd; + mkdir -p dir1/a/b; + echo -n a > dir1/a.txt; + echo -n b > dir1/a/b.txt; + echo -n c > dir1/a/b/c.txt; + inputs: [] + outputs: + dir1: + type: Directory + outputBinding: + glob: "dir1" + +outputs: + output_1: + type: File + outputSource: ls/listing + output_2: + type: Directory + outputSource: generate/dir1 + diff --git a/tests/wf/directory_no_listing_nested.cwl b/tests/wf/directory_no_listing_nested.cwl new file mode 100755 index 000000000..25e25008d --- /dev/null +++ b/tests/wf/directory_no_listing_nested.cwl @@ -0,0 +1,81 @@ +#!/usr/bin/env cwl-runner +cwlVersion: v1.2 +class: Workflow + +doc: > + Inspect provided directory and return filenames. + Generate a new directory and return it (including content). + +hints: + - class: DockerRequirement + dockerPull: docker.io/debian:stable-slim + +inputs: + dir: + type: Directory? + loadListing: no_listing + ignore: + type: Directory? + loadListing: no_listing + ignore_no_info: + type: Directory? + loadListing: no_listing + + +steps: + ls: + in: + dir: dir + ignore: ignore + out: + [listing] + run: + class: CommandLineTool + baseCommand: ls + inputs: + dir: + type: Directory + inputBinding: + position: 1 + ignore: + type: Directory + inputBinding: + position: 2 + outputs: + listing: + type: stdout + + generate: + in: [] + out: + [dir1] + run: + class: CommandLineTool + requirements: + ShellCommandRequirement: {} + LoadListingRequirement: + loadListing: deep_listing + + arguments: + - shellQuote: false + valueFrom: > + pwd; + mkdir -p dir1/a/b; + echo -n a > dir1/a.txt; + echo -n b > dir1/a/b.txt; + echo -n c > dir1/a/b/c.txt; + inputs: [] + outputs: + dir1: + type: Directory + outputBinding: + glob: "dir1" + +outputs: + output_1: + type: File + outputSource: ls/listing + output_2: + type: Directory + outputSource: generate/dir1 +