Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate input annotations to primary.cwlprov files #1678

Draft
wants to merge 19 commits into
base: main
Choose a base branch
from
Draft
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions cwltool/provenance_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,25 @@ def record_process_end(
self.generate_output_prov(outputs, process_run_id, process_name)
self.document.wasEndedBy(process_run_id, None, self.workflow_run_uri, when)



# def _add_nested_annotations(dataset, e: ProvEntity) -> ProvEntity:
# for annotation in dataset:
# if isinstance(dataset[annotation], (str, bool, int, float)): # check if these are all allowed types
# e.add_attributes({annotation: dataset[annotation]})
# else:
# nested_id = uuid.uuid4().urn
# # e.add_attributes({annotation: nested_id})
# nested_entity = self.document.entity(nested_id)
# e.add_attributes({annotation: nested_entity.identifier})
# nested_entity = _add_nested_annotations(dataset[annotation], nested_entity)
# return e

# def _propagate_input_annotations(entity):
# entity.add_attributes( {PROV_TYPE: SCHEMA["Dataset"]})
# entity = _add_nested_annotations(value[SCHEMA["Dataset"].uri], entity)
# return entity

def declare_file(self, value: CWLObjectType) -> Tuple[ProvEntity, ProvEntity, str]:
if value["class"] != "File":
raise ValueError("Must have class:File: %s" % value)
Expand Down Expand Up @@ -350,6 +369,29 @@ def declare_file(self, value: CWLObjectType) -> Tuple[ProvEntity, ProvEntity, st
file_entity.add_attributes({CWLPROV["nameext"]: value["nameext"]})
self.document.specializationOf(file_entity, entity)



def _add_nested_annotations(dataset, e: ProvEntity) -> ProvEntity:
for annotation in dataset:
if isinstance(dataset[annotation], (str, bool, int, float)): # check if these are all allowed types
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isinstance(dataset[annotation], list), isinstance(dataset[annotation], MutableMapping)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MutableSequence = list

e.add_attributes({annotation: dataset[annotation]})
else:
nested_id = uuid.uuid4().urn
# e.add_attributes({annotation: nested_id})
nested_entity = self.document.entity(nested_id)
e.add_attributes({annotation: nested_entity.identifier})
nested_entity = _add_nested_annotations(dataset[annotation], nested_entity)
return e

# Transfer input data annotations to provenance:
if SCHEMA["Dataset"].uri in value: # TODO: modify so both http:/ and https:/ are recognized
entity.add_attributes( {PROV_TYPE: SCHEMA["Dataset"]})
entity = _add_nested_annotations(value[SCHEMA["Dataset"].uri], entity)

# Transfer format annotations to provenance:
if "format" in value:
entity.add_attributes({SCHEMA["encodingFormat"]: value["format"]})

# Check for secondaries
for sec in cast(
MutableSequence[CWLObjectType], value.get("secondaryFiles", [])
Expand Down Expand Up @@ -395,6 +437,7 @@ def declare_directory(self, value: CWLObjectType) -> ProvEntity:
(PROV_TYPE, RO["Folder"]),
],
)

# ORE description of ro:Folder, saved separately
coll_b = dir_bundle.entity(
dir_id,
Expand Down Expand Up @@ -455,6 +498,20 @@ def declare_directory(self, value: CWLObjectType) -> ProvEntity:
coll.add_attributes(coll_attribs)
coll_b.add_attributes(coll_b_attribs)

# Propagate input data annotations
if SCHEMA["Dataset"].uri in value:
# coll_annotations = [ (PROV_TYPE, SCHEMA["Dataset"]) ]
coll.add_attributes([ (PROV_TYPE, SCHEMA["Dataset"]) ])

dataset = value[SCHEMA["Dataset"].uri]

for annotation in dataset:
if isinstance(dataset[annotation], (str, bool, int, float)): # check if these are all allowed types
coll.add_attributes({annotation: dataset[annotation]})

if "format" in value:
coll.add_attributes({SCHEMA["encodingFormat"]: value["format"]})

# Also Save ORE Folder as annotation metadata
ore_doc = ProvDocument()
ore_doc.add_namespace(ORE)
Expand Down