diff --git a/h5rdmtoolbox/wrapper/rdf.py b/h5rdmtoolbox/wrapper/rdf.py index 2e3e8959..f18b6631 100644 --- a/h5rdmtoolbox/wrapper/rdf.py +++ b/h5rdmtoolbox/wrapper/rdf.py @@ -36,9 +36,6 @@ def set_predicate(attr: h5py.AttributeManager, attr_name: str, value: str) -> No except pydantic.ValidationError as e: raise RDFError(f'Invalid IRI: "{value}" for attr name "{attr_name}". ' f'Expecting a valid URL. This was validated with pydantic. Pydantic error: {e}') - # warnings.warn(f'Invalid IRI: "{value}" for attr name "{attr_name}". ' - # f'Expecting a valid URL. This was validated with pydantic. Pydantic error: {e}', - # UserWarning) iri_name_data = attr.get(RDF_PREDICATE_ATTR_NAME, None) if iri_name_data is None: @@ -47,6 +44,20 @@ def set_predicate(attr: h5py.AttributeManager, attr_name: str, value: str) -> No attr[RDF_PREDICATE_ATTR_NAME] = iri_name_data +def set_object(attr: h5py.AttributeManager, attr_name: str, data: str) -> None: + """Set the class of an attribute""" + try: + HttpUrl(data) + except pydantic.ValidationError as e: + raise RDFError(f'Invalid IRI: "{data}" for attr name "{attr_name}". ' + f'Expecting a valid URL. This was validated with pydantic. Pydantic error: {e}') + iri_data_data = attr.get(RDF_OBJECT_ATTR_NAME, None) + if iri_data_data is None: + iri_data_data = {} + iri_data_data.update({attr_name: data}) + attr[RDF_OBJECT_ATTR_NAME] = iri_data_data + + def del_iri_entry(attr: h5py.AttributeManager, attr_name: str) -> None: """Delete the attribute name from name and data iri dicts""" iri_name_data = attr.get(RDF_PREDICATE_ATTR_NAME, None) @@ -61,15 +72,6 @@ def del_iri_entry(attr: h5py.AttributeManager, attr_name: str) -> None: attr[RDF_OBJECT_ATTR_NAME] = iri_data_data -def set_object(attr: h5py.AttributeManager, attr_name: str, data: str) -> None: - """Set the class of an attribute""" - iri_data_data = attr.get(RDF_OBJECT_ATTR_NAME, None) - if iri_data_data is None: - iri_data_data = {} - iri_data_data.update({attr_name: data}) - attr[RDF_OBJECT_ATTR_NAME] = iri_data_data - - def append(attr: h5py.AttributeManager, attr_name: str, data: Union[str, List[str]], @@ -144,6 +146,72 @@ def __setitem__(self, key, value): raise KeyError(f'key must be "{RDF_PREDICATE_ATTR_NAME}" or "{RDF_OBJECT_ATTR_NAME}"') +class _RDFPO(abc.ABC): + """Abstract class for predicate (P) and object (O)""" + IRI_ATTR_NAME = None + + def __init__(self, attr): + self._attr = attr + + # def __new__(cls, attr): + # instance = super().__new__(cls, '') + # instance._attr = attr + # return instance + + @abc.abstractmethod + def __setiri__(self, key, value): + """Set IRI to an attribute""" + + def get(self, item, default=None): + attrs = self._attr.get(self.IRI_ATTR_NAME, None) + if attrs is None: + return default + return attrs.get(item, default) + + def __getitem__(self, item) -> Union[str, None]: + iri_attr_dict = self._attr.get(self.IRI_ATTR_NAME, None) + if iri_attr_dict is None: + return None + return iri_attr_dict.get(item, None) + + def __setitem__(self, key, value: str): + if key not in self._attr: + raise KeyError(f'No attribute "{key}" found. Cannot assign an IRI to a non-existing attribute.') + self.__setiri__(key, str(value)) + + def keys(self): + """Return all attribute names assigned to the IRIs""" + return self._attr.get(self.IRI_ATTR_NAME, {}).keys() + + def values(self): + """Return all IRIs assigned to the attributes""" + return self._attr.get(self.IRI_ATTR_NAME, {}).values() + + def items(self): + """Return all attribute names and IRIs""" + return self._attr.get(self.IRI_ATTR_NAME, {}).items() + + def __iter__(self): + return iter(self.keys()) + + +class RDF_Predicate(_RDFPO): + """IRI class attribute manager""" + + IRI_ATTR_NAME = RDF_PREDICATE_ATTR_NAME + + def __setiri__(self, key, value): + set_predicate(self._attr, key, value) + + +class RDF_OBJECT(_RDFPO): + """IRI data attribute manager for objects""" + IRI_ATTR_NAME = RDF_OBJECT_ATTR_NAME + + def __setiri__(self, key, value): + set_object(self._attr, key, value) + + class RDFManager: """IRI attribute manager""" @@ -161,6 +229,7 @@ def subject(self) -> Union[str, None]: return s def add_subject(self, subject: Union[str, List[str]]): + """Add a subject to the group or dataset. If the subject already exists, it will not be added again.""" if isinstance(subject, list): data = [str(i) for i in subject] else: @@ -177,15 +246,26 @@ def add_subject(self, subject: Union[str, List[str]]): self._attr[RDF_SUBJECT_ATTR_NAME] = list(set(iri_sbj_data)) @subject.setter - def subject(self, iri_type: Union[str, List[str]]): + def subject(self, rdf_type: Union[str, List[str]]): """Sets the subject of the group or dataset. Will overwrite existing subjects. If you want to add (append), use add_subject() instead.""" - if isinstance(iri_type, list): - data = [str(i) for i in iri_type] + if isinstance(rdf_type, list): + rdf_type = [str(i) for i in rdf_type] + for iri in rdf_type: + try: + HttpUrl(iri) + except pydantic.ValidationError as e: + raise RDFError(f'Invalid IRI: "{iri}" for subject "{self._attr._parent.name}". ' + f'Expecting a valid URL. This was validated with pydantic. Pydantic error: {e}') else: - data = str(iri_type) + rdf_type = str(rdf_type) + try: + HttpUrl(rdf_type) + except pydantic.ValidationError as e: + raise RDFError(f'Invalid IRI: "{rdf_type}" for subject "{self._attr._parent.name}". ' + f'Expecting a valid URL. This was validated with pydantic. Pydantic error: {e}') - self._attr[RDF_SUBJECT_ATTR_NAME] = data + self._attr[RDF_SUBJECT_ATTR_NAME] = rdf_type def append_subject(self, subject: str): """Append the subject""" @@ -203,7 +283,8 @@ def append_subject(self, subject: str): self._attr[RDF_SUBJECT_ATTR_NAME] = [curr_subjects, subject] @property - def predicate(self): + def predicate(self) -> RDF_Predicate: + """Return the RDF predicate manager""" return RDF_Predicate(self._attr) @predicate.setter @@ -225,11 +306,6 @@ def __eq__(self, other: str): def __contains__(self, item): return item in self._attr.get(RDF_SUBJECT_ATTR_NAME, list()) - def set_subject(self, iri): - """Assign iri to an HDF5 object (group or dataset)""" - if iri is not None: - self._attr[RDF_SUBJECT_ATTR_NAME] = str(iri) - def get(self, attr_name: str) -> IRIDict: return self.__getitem__(attr_name) @@ -244,69 +320,3 @@ def __getitem__(self, item) -> IRIDict: def __delitem__(self, attr_name: str): del_iri_entry(self._attr, attr_name) - - -class _RDFPO(abc.ABC): - """Abstract class for predicate (P) and object (O)""" - IRI_ATTR_NAME = None - - def __init__(self, attr): - self._attr = attr - - # def __new__(cls, attr): - # instance = super().__new__(cls, '') - # instance._attr = attr - # return instance - - @abc.abstractmethod - def __setiri__(self, key, value): - """Set IRI to an attribute""" - - def get(self, item, default=None): - attrs = self._attr.get(self.IRI_ATTR_NAME, None) - if attrs is None: - return default - return attrs.get(item, default) - - def __getitem__(self, item) -> Union[str, None]: - iri_attr_dict = self._attr.get(self.IRI_ATTR_NAME, None) - if iri_attr_dict is None: - return None - return iri_attr_dict.get(item, None) - - def __setitem__(self, key, value: str): - if key not in self._attr: - raise KeyError(f'No attribute "{key}" found. Cannot assign an IRI to a non-existing attribute.') - self.__setiri__(key, str(value)) - - def keys(self): - """Return all attribute names assigned to the IRIs""" - return self._attr.get(self.IRI_ATTR_NAME, {}).keys() - - def values(self): - """Return all IRIs assigned to the attributes""" - return self._attr.get(self.IRI_ATTR_NAME, {}).values() - - def items(self): - """Return all attribute names and IRIs""" - return self._attr.get(self.IRI_ATTR_NAME, {}).items() - - def __iter__(self): - return iter(self.keys()) - - -class RDF_Predicate(_RDFPO): - """IRI class attribute manager""" - - IRI_ATTR_NAME = RDF_PREDICATE_ATTR_NAME - - def __setiri__(self, key, value): - set_predicate(self._attr, key, value) - - -class RDF_OBJECT(_RDFPO): - """IRI data attribute manager for objects""" - IRI_ATTR_NAME = RDF_OBJECT_ATTR_NAME - - def __setiri__(self, key, value): - set_object(self._attr, key, value) diff --git a/tests/wrapper/test_jsonld.py b/tests/wrapper/test_jsonld.py index 3a541446..e1e5d4e0 100644 --- a/tests/wrapper/test_jsonld.py +++ b/tests/wrapper/test_jsonld.py @@ -1,14 +1,15 @@ import json +import ontolutils import pathlib import rdflib import unittest +from ontolutils import M4I +from ontolutils import namespaces, urirefs, Thing import h5rdmtoolbox as h5tbx -import ontolutils from h5rdmtoolbox import __version__ from h5rdmtoolbox.wrapper import jsonld from h5rdmtoolbox.wrapper import rdf -from ontolutils import namespaces, urirefs, Thing logger = h5tbx.logger @@ -36,7 +37,6 @@ def test_dump_hdf_to_json(self): attrs={'standard_name': 'x_velocity', 'standard_name_non_iri': 'x_velocity', 'unit': 'm/s'}) - from ontolutils import M4I ds.rdf.subject = str(M4I.NumericalVariable) ds.rdf.predicate['standard_name'] = 'https://matthiasprobst.github.io/ssno#standard_name' ds.rdf.object['standard_name'] = 'https://matthiasprobst.github.io/pivmeta#x_velocity' @@ -91,10 +91,15 @@ def test_jsonld_dumps(self): grp = h5.create_group('grp') grp.attrs['test', sn_iri] = 'test' sub_grp = grp.create_group('Fan') - sub_grp.create_dataset('D3', data=300) + ds = sub_grp.create_dataset('D3', data=300) sub_grp['D3'].attrs['units', 'http://w3id.org/nfdi4ing/metadata4ing#hasUnits'] = 'mm' sub_grp['D3'].rdf['units'].object = 'https://qudt.org/vocab/unit/MilliM' sub_grp['D3'].attrs['standard_name', sn_iri] = 'blade_diameter3' + ds.rdf.subject = 'https://w3id.org/nfdi4ing/metadata4ing#NumericalVariable' + self.assertEqual(ds.rdf.subject, 'https://w3id.org/nfdi4ing/metadata4ing#NumericalVariable') + from h5rdmtoolbox.wrapper.rdf import RDF_SUBJECT_ATTR_NAME + self.assertEqual(ds.attrs[RDF_SUBJECT_ATTR_NAME], + 'https://w3id.org/nfdi4ing/metadata4ing#NumericalVariable') h5.dumps() from pprint import pprint out_dict = h5tbx.jsonld.dumpd(h5.hdf_filename, @@ -104,7 +109,7 @@ def test_jsonld_dumps(self): pprint(out_dict) found_m4iNumericalVariable = False for g in out_dict['@graph']: - if g['@type'] == 'm4i:NumericalVariable': + if 'https://w3id.org/nfdi4ing/metadata4ing#NumericalVariable' in g['@type']: self.assertDictEqual(g['m4i:hasUnits'], {'@id': 'https://qudt.org/vocab/unit/MilliM'}) self.assertEqual(g['ssno:standard_name'], 'blade_diameter3') found_m4iNumericalVariable = True diff --git a/tests/wrapper/test_iri.py b/tests/wrapper/test_rdf.py similarity index 59% rename from tests/wrapper/test_iri.py rename to tests/wrapper/test_rdf.py index da6d4f38..6dc3fa11 100644 --- a/tests/wrapper/test_iri.py +++ b/tests/wrapper/test_rdf.py @@ -1,17 +1,44 @@ import unittest +from ontolutils.namespacelib import M4I, OBO +from rdflib import FOAF import h5rdmtoolbox as h5tbx from h5rdmtoolbox import use +from h5rdmtoolbox.wrapper.rdf import RDFError from h5rdmtoolbox.wrapper.rdf import RDF_PREDICATE_ATTR_NAME -from ontolutils.namespacelib import M4I, OBO -class TestIRI(unittest.TestCase): +class TestRDF(unittest.TestCase): def setUp(self) -> None: """setup""" use(None) + def test_rdf_error(self): + with h5tbx.File() as h5: + with self.assertRaises(RDFError): + h5.attrs['title', 'hasTitle'] = 'test' + h5.attrs['title', FOAF.title] = 'test' + self.assertIsInstance(h5.rdf['title'].predicate, str) + self.assertEqual(h5.rdf['title'].predicate, str(FOAF.title)) + h5.attrs['title'] = 'test' + with self.assertRaises(RDFError): + h5.rdf['title'].object = 'first object' + + with self.assertRaises(RDFError): + h5.rdf.subject = 'invalid URI' + + with self.assertRaises(RDFError): + h5.rdf.subject = ['invalid URI', 'invalid URI 2'] + + with self.assertRaises(RDFError): + h5.rdf.subject = ['invalid URI', 'https://example.org/validURI'] + h5.rdf.subject = 'https://example.org/validURI' + self.assertEqual(h5.rdf.subject, 'https://example.org/validURI') + + h5.rdf.subject = ['https://example.org/validURI', 'https://example.org/validURI2'] + self.assertEqual(h5.rdf.subject, ['https://example.org/validURI', 'https://example.org/validURI2']) + def test_group_predicate(self): with h5tbx.File() as h5: grp = h5.create_group('has_contact') @@ -35,46 +62,58 @@ def test_group_predicate(self): def test_none_value(self): with h5tbx.File() as h5: - h5.attrs['title', 'hasTitle'] = 'test' - self.assertEqual(h5.rdf['title'].predicate, 'hasTitle') + h5.attrs['title', 'https://example.org/hasTitle'] = 'test' + self.assertEqual(h5.rdf['title'].predicate, 'https://example.org/hasTitle') + # self.assertEqual(h5.rdf['title'].predicate, 'https://example.org/hasTitle') with h5tbx.File() as h5: h5.attrs['title'] = 'test' self.assertEqual(h5.rdf['title'].predicate, None) h5.attrs['title', None] = 'test2' self.assertEqual(len(h5.attrs.get(RDF_PREDICATE_ATTR_NAME, {})), 0) - h5.attrs['title', 'hasTitle'] = 'test2' + h5.attrs['title', 'https://example.org/hasTitle'] = 'test2' self.assertEqual(len(h5.attrs.get(RDF_PREDICATE_ATTR_NAME, None)), 1) - self.assertEqual(h5.rdf['title'].predicate, 'hasTitle') + self.assertEqual(h5.rdf['title'].predicate, 'https://example.org/hasTitle') def test_multiple_subjects_or_objects(self): with h5tbx.File() as h5: - h5.attrs['title', 'hasTitle'] = 'test' + h5.attrs['title', 'https://example.org/hasTitle'] = 'test' - h5.rdf['title'].object = 'first object' + h5.rdf['title'].object = 'https://example.org/object' self.assertEqual(h5.attrs['title'], 'test') - self.assertEqual(h5.rdf['title'].object, 'first object') - h5.rdf['title'].object = 'overwritten object' + self.assertEqual(h5.rdf['title'].object, 'https://example.org/object') + + h5.rdf['title'].object = 'https://example.org/object2' + self.assertEqual(h5.rdf['title'].object, 'https://example.org/object2') + self.assertEqual(h5.attrs['title'], 'test') - self.assertEqual(h5.rdf['title'].object, 'overwritten object') + self.assertEqual(h5.rdf['title'].object, 'https://example.org/object2') - h5.rdf['title'].object = ['one', 'two'] + h5.rdf['title'].object = ['https://example.org/objectURI1', 'https://example.org/objectURI2'] self.assertEqual(h5.attrs['title'], 'test') - self.assertEqual(h5.rdf['title'].object, ['one', 'two']) - h5.rdf['title'].append_object('three') - self.assertEqual(h5.rdf['title'].object, ['one', 'two', 'three']) - h5['/'].rdf.subject = 'is group' - self.assertEqual(h5.rdf.subject, 'is group') + self.assertEqual(h5.rdf['title'].object, + ['https://example.org/objectURI1', 'https://example.org/objectURI2']) + h5.rdf['title'].append_object('https://example.org/objectURI3') + self.assertListEqual(h5.rdf['title'].object, + ['https://example.org/objectURI1', + 'https://example.org/objectURI2', + 'https://example.org/objectURI3']) + + h5['/'].rdf.subject = 'https://example.org/is group' + self.assertEqual(h5.rdf.subject, 'https://example.org/is group') - h5['/'].rdf.subject = 'is root group' - self.assertEqual(h5.rdf.subject, 'is root group') + h5['/'].rdf.subject = 'https://example.org/is root group' + self.assertEqual(h5.rdf.subject, 'https://example.org/is root group') - h5['/'].rdf.append_subject('is group') - self.assertEqual(h5.rdf.subject, ['is root group', 'is group']) + h5['/'].rdf.append_subject('https://example.org/is group') + self.assertEqual(h5.rdf.subject, ['https://example.org/is root group', + 'https://example.org/is group']) - h5['/'].rdf.subject = ['root', 'group'] - self.assertEqual(h5.rdf.subject, ['root', 'group']) + h5['/'].rdf.subject = ['https://example.org/is root group 1', + 'https://example.org/is group 2'] + self.assertEqual(h5.rdf.subject, ['https://example.org/is root group 1', + 'https://example.org/is group 2']) def test_set_single_PSO(self): """IRI can be assigned to attributes. A protected attribute IRI is created for each dataset or groups"""