Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of store RDF export of the workflow in CWL Prov RO-Bundle #1709

Draft
wants to merge 55 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
d4851f6
Implementation of store RDF export of the workflow in CWL Prov RO-Bun…
jjkoehorst Aug 16, 2022
00ce190
formatting changes according to make format
jjkoehorst Aug 16, 2022
562d13c
formatting corrections
jjkoehorst Aug 16, 2022
e063735
remove need for type ignore
mr-c Aug 16, 2022
e0bb0e0
hard change to checksum_only
jjkoehorst Aug 16, 2022
16335b7
Added sha checksum to file_entity, need to look into what predicate s…
jjkoehorst Aug 16, 2022
59703ac
Merge branch 'cwlprov-cwl-rdf' of github.com:jjkoehorst/cwltool into …
jjkoehorst Aug 16, 2022
40c6705
formatting cleanup
jjkoehorst Aug 17, 2022
87c304b
--no-data argument added
jjkoehorst Aug 17, 2022
049dcd7
added no_data variable to some functions as i was unable to access th…
jjkoehorst Aug 17, 2022
21ecba9
test provenance --no-data added and a TODO check for check_bagit if w…
jjkoehorst Aug 17, 2022
879d5ce
Global no-data option for now to test the same environment with or wi…
jjkoehorst Aug 17, 2022
723c643
NO_DATA global variable added to know if there should be no data for …
jjkoehorst Aug 18, 2022
540a5a8
formatting
jjkoehorst Aug 18, 2022
211348a
cleaning logger and no_data access implementation
jjkoehorst Aug 18, 2022
bd61e43
Merge branch 'cwlprov-cwl-rdf'
jjkoehorst Aug 24, 2022
ad90be6
cleaning up imports
jjkoehorst Aug 25, 2022
76abff0
make remove_unused_imports, cleaning up all kinds of imports
jjkoehorst Sep 5, 2022
33d1551
some empty line formatting
jjkoehorst Sep 5, 2022
81b48de
Merge branch 'main' into cwlprov-cwl-rdf
jjkoehorst Sep 5, 2022
bc56733
if not none instead of !=
jjkoehorst Sep 5, 2022
e5b498d
make cleanup sync
jjkoehorst Sep 6, 2022
3666b65
docstrings added
jjkoehorst Sep 6, 2022
f58e90e
Default NO_DATA set to false
jjkoehorst Sep 6, 2022
4a6906b
move NO_DATA to utils
jjkoehorst Sep 6, 2022
08e18b0
remove global NO_DATA
mr-c Sep 6, 2022
33f706b
missed two NO_DATA's
jjkoehorst Sep 6, 2022
406ae69
Merge branch 'cwlprov-cwl-rdf' of github.com:jjkoehorst/cwltool into …
mr-c Sep 6, 2022
b288fb4
added return type str: to the checksum content processor
jjkoehorst Sep 6, 2022
cb28e1a
Merge branch 'cwlprov-cwl-rdf' of github.com:jjkoehorst/cwltool into …
mr-c Sep 6, 2022
1dbcdad
fix type
mr-c Sep 6, 2022
ab71278
restore regular prov tests
mr-c Sep 6, 2022
112f4f0
Duplicated a test case and the cwltool function to allow for --no-dat…
jjkoehorst Sep 6, 2022
cd0a4af
formatting
jjkoehorst Sep 6, 2022
50dac83
nolisting workflow and test added
jjkoehorst Sep 7, 2022
d3048af
with copy files but excluding a specific folder test
jjkoehorst Sep 7, 2022
ac532d4
working on load listing recognition for files and provenance
jjkoehorst Sep 8, 2022
fb5a65a
expanded the test case, server testing showed a loadListing option no…
jjkoehorst Sep 8, 2022
373b600
issue with load listing field
jjkoehorst Sep 8, 2022
eb93204
unused import removal
jjkoehorst Sep 8, 2022
a4b26af
show file name with debugger
jjkoehorst Sep 9, 2022
95c2c63
from_fp does not always carry name
jjkoehorst Sep 9, 2022
401918e
testing to print stacktrace to identify path to print file
jjkoehorst Sep 20, 2022
6fe74f3
check listing value
jjkoehorst Sep 20, 2022
7f370bb
change default to invalid_listing
jjkoehorst Sep 20, 2022
26fec21
debugging in progress
jjkoehorst Sep 20, 2022
d01a0df
trace in debug
jjkoehorst Oct 12, 2022
c15156b
stack trace only at debug level
jjkoehorst Oct 12, 2022
315e78f
stacktrace disabled
jjkoehorst Oct 12, 2022
8158340
Merge branch 'main' into cwlprov-cwl-rdf
jjkoehorst Aug 16, 2023
cad4896
formatting
jjkoehorst Aug 16, 2023
b930842
sort imports
jjkoehorst Aug 16, 2023
aa0054e
No warnings test
jjkoehorst Aug 17, 2023
87946a3
missed one attribute
jjkoehorst Aug 17, 2023
420dd1c
work in progress to fix the main merge
jjkoehorst Aug 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cwltool/argparser.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,13 @@ def arg_parser() -> argparse.ArgumentParser:
help="Record user account info as part of provenance.",
dest="user_provenance",
)
provgroup.add_argument(
"--no-data",
default=False,
action="store_true",
help="Disables the storage of input and output data files of the workflow in the provenance data folder",
dest="no_data",
)
provgroup.add_argument(
"--disable-user-provenance",
default=False,
Expand Down
42 changes: 38 additions & 4 deletions cwltool/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@
import io
import logging
import os
import parser
mr-c marked this conversation as resolved.
Show resolved Hide resolved
import shutil
import signal
import subprocess # nosec
import sys
import tempfile
import time
import urllib
import warnings
Expand Down Expand Up @@ -710,6 +713,7 @@ def setup_provenance(
orcid=args.orcid,
full_name=args.cwl_full_name,
)

runtimeContext.research_obj = ro
log_file_io = ro.open_log_file_for_activity(ro.engine_uuid)
prov_log_handler = logging.StreamHandler(log_file_io)
Expand Down Expand Up @@ -969,6 +973,9 @@ def print_targets(
)


NO_DATA = None


def main(
argsl: Optional[List[str]] = None,
args: Optional[argparse.Namespace] = None,
Expand Down Expand Up @@ -1045,6 +1052,11 @@ def main(
return 0
_logger.info(versionfunc())

# TODO How can we access args.no_data from other places in a nice way?...
_logger.error("No data status %s", args.no_data)
global NO_DATA
NO_DATA = args.no_data

if args.print_supported_versions:
print("\n".join(supported_cwl_versions(args.enable_dev)), file=stdout)
return 0
Expand Down Expand Up @@ -1172,12 +1184,27 @@ def main(
print(f"{args.workflow} is valid CWL.", file=stdout)
return 0

if args.print_rdf:
if args.print_rdf or args.provenance:
output = stdout
if args.provenance:
# Write workflow to temp directory
temp_workflow_dir = tempfile.TemporaryDirectory()
os.makedirs(temp_workflow_dir.name, exist_ok=True)
workflow_provenance = temp_workflow_dir.name + "/workflow.ttl"
# Sets up a turtle file for the workflow information (not yet in the provenance folder as it does
# not exist and creating it will give issues).
output = open(workflow_provenance, "w")
_logger.info("Writing workflow rdf to %s", workflow_provenance)
print(
printrdf(tool, loadingContext.loader.ctx, args.rdf_serializer),
file=stdout,
file=output,
)
return 0
# close the output
if args.provenance:
output.close()
# Only print_rdf exits this way
if args.print_rdf:
return 0

if args.print_dot:
printdot(tool, loadingContext.loader.ctx, stdout)
Expand Down Expand Up @@ -1433,7 +1460,7 @@ def loc_to_path(obj: CWLObjectType) -> None:
research_obj = runtimeContext.research_obj
if loadingContext.loader is not None:
research_obj.generate_snapshot(
prov_deps(workflowobj, loadingContext.loader, uri)
prov_deps(workflowobj, loadingContext.loader, uri), args.no_data
)
else:
_logger.warning(
Expand All @@ -1455,6 +1482,13 @@ def loc_to_path(obj: CWLObjectType) -> None:
# public API for logging.StreamHandler
prov_log_handler.close()
research_obj.close(args.provenance)
# Copy workflow.ttl to args.provenance
if os.path.isfile(workflow_provenance):
shutil.copy(
workflow_provenance, args.provenance + "/workflow/workflow.ttl"
)
# Remove temp directory
shutil.rmtree(temp_workflow_dir.name)

_logger.removeHandler(stderr_handler)
_logger.addHandler(defaultStreamHandler)
Expand Down
77 changes: 63 additions & 14 deletions cwltool/provenance.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from schema_salad.utils import json_dumps
from typing_extensions import TYPE_CHECKING, TypedDict

import cwltool
from .loghandler import _logger
from .provenance_constants import (
ACCOUNT_UUID,
Expand Down Expand Up @@ -411,7 +412,10 @@ def write_bag_file(
return bag_file

def add_tagfile(
self, path: str, timestamp: Optional[datetime.datetime] = None
self,
path: str,
no_data: bool = False,
timestamp: Optional[datetime.datetime] = None,
) -> None:
"""Add tag files to our research object."""
self.self_check()
Expand All @@ -426,14 +430,18 @@ def add_tagfile(
# adding checksums after closing.
# Below probably OK for now as metadata files
# are not too large..?

checksums[SHA1] = checksum_copy(tag_file, hasher=hashlib.sha1)

tag_file.seek(0)
checksums[SHA256] = checksum_copy(tag_file, hasher=hashlib.sha256)

tag_file.seek(0)
checksums[SHA512] = checksum_copy(tag_file, hasher=hashlib.sha512)
if cwltool.main.NO_DATA:
checksums[SHA1] = checksum_only(tag_file, hasher=hashlib.sha1)
tag_file.seek(0)
checksums[SHA256] = checksum_only(tag_file, hasher=hashlib.sha256)
tag_file.seek(0)
checksums[SHA512] = checksum_only(tag_file, hasher=hashlib.sha512)
else:
checksums[SHA1] = checksum_copy(tag_file, hasher=hashlib.sha1)
tag_file.seek(0)
checksums[SHA256] = checksum_copy(tag_file, hasher=hashlib.sha256)
tag_file.seek(0)
checksums[SHA512] = checksum_copy(tag_file, hasher=hashlib.sha512)

rel_path = posix_path(os.path.relpath(path, self.folder))
self.tagfiles.add(rel_path)
Expand Down Expand Up @@ -738,7 +746,7 @@ def _write_bag_info(self) -> None:
info_file.write("Payload-Oxum: %d.%d\n" % (total_size, num_files))
_logger.debug("[provenance] Generated bagit metadata: %s", self.folder)

def generate_snapshot(self, prov_dep: CWLObjectType) -> None:
def generate_snapshot(self, prov_dep: CWLObjectType, no_data: bool) -> None:
"""Copy all of the CWL files to the snapshot/ directory."""
self.self_check()
for key, value in prov_dep.items():
Expand All @@ -762,13 +770,13 @@ def generate_snapshot(self, prov_dep: CWLObjectType) -> None:
timestamp = datetime.datetime.fromtimestamp(
os.path.getmtime(filepath)
)
self.add_tagfile(path, timestamp)
self.add_tagfile(path, no_data, timestamp)
except PermissionError:
pass # FIXME: avoids duplicate snapshotting; need better solution
elif key in ("secondaryFiles", "listing"):
for files in cast(MutableSequence[CWLObjectType], value):
if isinstance(files, MutableMapping):
self.generate_snapshot(files)
self.generate_snapshot(files, no_data)
else:
pass

Expand All @@ -793,13 +801,18 @@ def add_data_file(
timestamp: Optional[datetime.datetime] = None,
content_type: Optional[str] = None,
) -> str:
# TODO only when --no-data is not used!
"""Copy inputs to data/ folder."""
self.self_check()
tmp_dir, tmp_prefix = os.path.split(self.temp_prefix)
with tempfile.NamedTemporaryFile(
prefix=tmp_prefix, dir=tmp_dir, delete=False
) as tmp:
checksum = checksum_copy(from_fp, tmp)
# TODO this should depend on the arguments
if cwltool.main.NO_DATA:
checksum = checksum_only(from_fp)
else:
checksum = checksum_copy(from_fp, tmp)

# Calculate hash-based file path
folder = os.path.join(self.folder, DATA, checksum[0:2])
Expand All @@ -811,6 +824,7 @@ def add_data_file(
os.rename(tmp.name, path)

# Relative posix path
# TODO only when no-data is False?...
rel_path = posix_path(os.path.relpath(path, self.folder))

# Register in bagit checksum
Expand Down Expand Up @@ -887,7 +901,10 @@ def _add_to_bagit(self, rel_path: str, **checksums: str) -> None:
checksums = dict(checksums)
with open(lpath, "rb") as file_path:
# FIXME: Need sha-256 / sha-512 as well for Research Object BagIt profile?
checksums[SHA1] = checksum_copy(file_path, hasher=hashlib.sha1)
if cwltool.main.NO_DATA:
checksums[SHA1] = checksum_only(file_path, hasher=hashlib.sha1)
else:
checksums[SHA1] = checksum_copy(file_path, hasher=hashlib.sha1)

self.add_to_manifest(rel_path, checksums)

Expand Down Expand Up @@ -1034,6 +1051,11 @@ def checksum_copy(
pass
if os.path.exists(temp_location):
os.rename(temp_location, dst_file.name) # type: ignore

return content_processor(contents, src_file, dst_file, checksum, buffersize)


def content_processor(contents, src_file, dst_file, checksum, buffersize):
while contents != b"":
if dst_file is not None:
dst_file.write(contents)
Expand All @@ -1042,3 +1064,30 @@ def checksum_copy(
if dst_file is not None:
dst_file.flush()
return checksum.hexdigest().lower()


def checksum_only(
src_file: IO[Any],
dst_file: Optional[IO[Any]] = None,
hasher=Hasher, # type: Callable[[], hashlib._Hash]
buffersize: int = 1024 * 1024,
) -> str:

if dst_file != None:
_logger.error("Destination file should be None but it is %s", dst_file)

"""Compute checksums while copying a file."""
# TODO: Use hashlib.new(Hasher_str) instead?
checksum = hasher()
contents = src_file.read(buffersize)

# TODO Could be a function for both checksum_only and checksum_copy?
return content_processor(contents, src_file, dst_file, checksum, buffersize)
# while contents != b"":
# if dst_file is not None:
# dst_file.write(contents)
# checksum.update(contents)
# contents = src_file.read(buffersize)
# if dst_file is not None:
# dst_file.flush()
# return checksum.hexdigest().lower()
64 changes: 55 additions & 9 deletions cwltool/provenance_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
cast,
)

from prov.identifier import Identifier, QualifiedName
from prov.identifier import Identifier, Namespace, QualifiedName
from prov.model import (
PROV,
PROV_LABEL,
Expand All @@ -32,6 +32,8 @@
from schema_salad.sourceline import SourceLine
from typing_extensions import TYPE_CHECKING

import cwltool.workflow

from .errors import WorkflowException
from .job import CommandLineJob, JobBase
from .loghandler import _logger
Expand Down Expand Up @@ -266,17 +268,30 @@ def record_process_start(
) -> Optional[str]:
if not hasattr(process, "steps"):
process_run_id = self.workflow_run_uri
elif not hasattr(job, "workflow"):
elif not hasattr(job, "workflow") and isinstance(
process, cwltool.workflow.Workflow
):
# commandline tool execution as part of workflow
name = ""
if isinstance(job, (CommandLineJob, JobBase, WorkflowJob)):
name = job.name
process_name = urllib.parse.quote(name, safe=":/,#")
process_run_id = self.start_process(process_name, datetime.datetime.now())
# Iterator as step is not always 1, check with process_name to find the correct step.id
step = None
for step in process.steps:
if step.id.endswith("#" + process_name):
break
if step is None:
raise Exception("No / wrong step detected...!")

process_run_id = self.start_process(
step.id, process_name, datetime.datetime.now()
)
return process_run_id

def start_process(
self,
step_id: str, # The ID of the step involved
process_name: str,
when: datetime.datetime,
process_run_id: Optional[str] = None,
Expand All @@ -285,12 +300,32 @@ def start_process(
if process_run_id is None:
process_run_id = uuid.uuid4().urn
prov_label = "Run of workflow/packed.cwl#main/" + process_name
self.document.activity(
process_run_id,
None,
None,
{PROV_TYPE: WFPROV["ProcessRun"], PROV_LABEL: prov_label},
)
# TESTING to include the Steps URI so linking to --print-rdf becomes possible
FILE_PATH = None
WORKFLOW_STEP = None
# Not sure if steps is always 1 element so a step name check including the # is performed
if step_id.endswith("#" + process_name):
# Temp import maybe there is another way to create the URI's ?
# Looked at --print-rdf for a possible URI
WORKFLOW = Namespace("Workflow", "https://w3id.org/cwl/cwl#Workflow/")
WORKFLOW_STEP = WORKFLOW["steps"]
# Was not sure how to create a URI without a namespace
FILE = Namespace("", "")
# The entire file://....#step path
FILE_PATH = FILE[step_id]

# Added the WORKFLOW_STEP and FILE_PATH to the object
self.document.activity(
process_run_id,
None,
None,
{
PROV_TYPE: WFPROV["ProcessRun"],
PROV_LABEL: prov_label,
WORKFLOW_STEP: FILE_PATH,
},
)

self.document.wasAssociatedWith(
process_run_id, self.engine_uuid, str("wf:main/" + process_name)
)
Expand Down Expand Up @@ -367,6 +402,17 @@ def declare_file(self, value: CWLObjectType) -> Tuple[ProvEntity, ProvEntity, st
file_entity.add_attributes(
{CWLPROV["nameext"]: cast(str, value["nameext"])}
)

if "size" in value:
file_entity.add_attributes({CWLPROV["size"]: cast(int, value["size"])})

# TODO check is there a URI for a checksum? and a base uri for checksum?
# e.g. <item> <http://checksum.com> <sha1:checksum_of_the_file>
if "checksum" in value:
file_entity.add_attributes(
{CWLPROV["checksum"]: cast(str, value["checksum"])}
)

self.document.specializationOf(file_entity, entity)

# Check for secondaries
Expand Down
1 change: 1 addition & 0 deletions cwltool/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ def job(
self.embedded_tool.parent_wf = self.prov_obj
process_name = self.tool["id"].split("#")[1]
self.prov_obj.start_process(
self.id,
process_name,
datetime.datetime.now(),
self.embedded_tool.provenance_object.workflow_run_uri,
Expand Down
1 change: 0 additions & 1 deletion mypy-stubs/prov/model.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ from typing import IO, Any, Dict, Iterable, List, Set, Tuple

from _typeshed import Incomplete
from prov.constants import *

# from prov import Error as Error, serializers as serializers
from prov.identifier import Identifier, Namespace, QualifiedName

Expand Down
Loading