Skip to content

Commit

Permalink
renamend test_iri.py and check RDF input
Browse files Browse the repository at this point in the history
Http is allowed only
  • Loading branch information
matthiasprobst committed Mar 25, 2024
1 parent 59e6496 commit dfeb53f
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 117 deletions.
188 changes: 99 additions & 89 deletions h5rdmtoolbox/wrapper/rdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ def set_predicate(attr: h5py.AttributeManager, attr_name: str, value: str) -> No
except pydantic.ValidationError as e:
raise RDFError(f'Invalid IRI: "{value}" for attr name "{attr_name}". '
f'Expecting a valid URL. This was validated with pydantic. Pydantic error: {e}')
# warnings.warn(f'Invalid IRI: "{value}" for attr name "{attr_name}". '
# f'Expecting a valid URL. This was validated with pydantic. Pydantic error: {e}',
# UserWarning)

iri_name_data = attr.get(RDF_PREDICATE_ATTR_NAME, None)
if iri_name_data is None:
Expand All @@ -47,6 +44,20 @@ def set_predicate(attr: h5py.AttributeManager, attr_name: str, value: str) -> No
attr[RDF_PREDICATE_ATTR_NAME] = iri_name_data


def set_object(attr: h5py.AttributeManager, attr_name: str, data: str) -> None:
"""Set the class of an attribute"""
try:
HttpUrl(data)
except pydantic.ValidationError as e:
raise RDFError(f'Invalid IRI: "{data}" for attr name "{attr_name}". '
f'Expecting a valid URL. This was validated with pydantic. Pydantic error: {e}')
iri_data_data = attr.get(RDF_OBJECT_ATTR_NAME, None)
if iri_data_data is None:
iri_data_data = {}
iri_data_data.update({attr_name: data})
attr[RDF_OBJECT_ATTR_NAME] = iri_data_data


def del_iri_entry(attr: h5py.AttributeManager, attr_name: str) -> None:
"""Delete the attribute name from name and data iri dicts"""
iri_name_data = attr.get(RDF_PREDICATE_ATTR_NAME, None)
Expand All @@ -61,15 +72,6 @@ def del_iri_entry(attr: h5py.AttributeManager, attr_name: str) -> None:
attr[RDF_OBJECT_ATTR_NAME] = iri_data_data


def set_object(attr: h5py.AttributeManager, attr_name: str, data: str) -> None:
"""Set the class of an attribute"""
iri_data_data = attr.get(RDF_OBJECT_ATTR_NAME, None)
if iri_data_data is None:
iri_data_data = {}
iri_data_data.update({attr_name: data})
attr[RDF_OBJECT_ATTR_NAME] = iri_data_data


def append(attr: h5py.AttributeManager,
attr_name: str,
data: Union[str, List[str]],
Expand Down Expand Up @@ -144,6 +146,72 @@ def __setitem__(self, key, value):
raise KeyError(f'key must be "{RDF_PREDICATE_ATTR_NAME}" or "{RDF_OBJECT_ATTR_NAME}"')


class _RDFPO(abc.ABC):
"""Abstract class for predicate (P) and object (O)"""
IRI_ATTR_NAME = None

def __init__(self, attr):
self._attr = attr

# def __new__(cls, attr):
# instance = super().__new__(cls, '')
# instance._attr = attr
# return instance

@abc.abstractmethod
def __setiri__(self, key, value):
"""Set IRI to an attribute"""

def get(self, item, default=None):
attrs = self._attr.get(self.IRI_ATTR_NAME, None)
if attrs is None:
return default
return attrs.get(item, default)

def __getitem__(self, item) -> Union[str, None]:
iri_attr_dict = self._attr.get(self.IRI_ATTR_NAME, None)
if iri_attr_dict is None:
return None
return iri_attr_dict.get(item, None)

def __setitem__(self, key, value: str):
if key not in self._attr:
raise KeyError(f'No attribute "{key}" found. Cannot assign an IRI to a non-existing attribute.')
self.__setiri__(key, str(value))

def keys(self):
"""Return all attribute names assigned to the IRIs"""
return self._attr.get(self.IRI_ATTR_NAME, {}).keys()

def values(self):
"""Return all IRIs assigned to the attributes"""
return self._attr.get(self.IRI_ATTR_NAME, {}).values()

def items(self):
"""Return all attribute names and IRIs"""
return self._attr.get(self.IRI_ATTR_NAME, {}).items()

def __iter__(self):
return iter(self.keys())


class RDF_Predicate(_RDFPO):
"""IRI class attribute manager"""

IRI_ATTR_NAME = RDF_PREDICATE_ATTR_NAME

def __setiri__(self, key, value):
set_predicate(self._attr, key, value)


class RDF_OBJECT(_RDFPO):
"""IRI data attribute manager for objects"""
IRI_ATTR_NAME = RDF_OBJECT_ATTR_NAME

def __setiri__(self, key, value):
set_object(self._attr, key, value)


class RDFManager:
"""IRI attribute manager"""

Expand All @@ -161,6 +229,7 @@ def subject(self) -> Union[str, None]:
return s

def add_subject(self, subject: Union[str, List[str]]):
"""Add a subject to the group or dataset. If the subject already exists, it will not be added again."""
if isinstance(subject, list):
data = [str(i) for i in subject]
else:
Expand All @@ -177,15 +246,26 @@ def add_subject(self, subject: Union[str, List[str]]):
self._attr[RDF_SUBJECT_ATTR_NAME] = list(set(iri_sbj_data))

@subject.setter
def subject(self, iri_type: Union[str, List[str]]):
def subject(self, rdf_type: Union[str, List[str]]):
"""Sets the subject of the group or dataset. Will overwrite existing subjects.
If you want to add (append), use add_subject() instead."""
if isinstance(iri_type, list):
data = [str(i) for i in iri_type]
if isinstance(rdf_type, list):
rdf_type = [str(i) for i in rdf_type]
for iri in rdf_type:
try:
HttpUrl(iri)
except pydantic.ValidationError as e:
raise RDFError(f'Invalid IRI: "{iri}" for subject "{self._attr._parent.name}". '
f'Expecting a valid URL. This was validated with pydantic. Pydantic error: {e}')
else:
data = str(iri_type)
rdf_type = str(rdf_type)
try:
HttpUrl(rdf_type)
except pydantic.ValidationError as e:
raise RDFError(f'Invalid IRI: "{rdf_type}" for subject "{self._attr._parent.name}". '
f'Expecting a valid URL. This was validated with pydantic. Pydantic error: {e}')

self._attr[RDF_SUBJECT_ATTR_NAME] = data
self._attr[RDF_SUBJECT_ATTR_NAME] = rdf_type

def append_subject(self, subject: str):
"""Append the subject"""
Expand All @@ -203,7 +283,8 @@ def append_subject(self, subject: str):
self._attr[RDF_SUBJECT_ATTR_NAME] = [curr_subjects, subject]

@property
def predicate(self):
def predicate(self) -> RDF_Predicate:
"""Return the RDF predicate manager"""
return RDF_Predicate(self._attr)

@predicate.setter
Expand All @@ -225,11 +306,6 @@ def __eq__(self, other: str):
def __contains__(self, item):
return item in self._attr.get(RDF_SUBJECT_ATTR_NAME, list())

def set_subject(self, iri):
"""Assign iri to an HDF5 object (group or dataset)"""
if iri is not None:
self._attr[RDF_SUBJECT_ATTR_NAME] = str(iri)

def get(self, attr_name: str) -> IRIDict:
return self.__getitem__(attr_name)

Expand All @@ -244,69 +320,3 @@ def __getitem__(self, item) -> IRIDict:

def __delitem__(self, attr_name: str):
del_iri_entry(self._attr, attr_name)


class _RDFPO(abc.ABC):
"""Abstract class for predicate (P) and object (O)"""
IRI_ATTR_NAME = None

def __init__(self, attr):
self._attr = attr

# def __new__(cls, attr):
# instance = super().__new__(cls, '')
# instance._attr = attr
# return instance

@abc.abstractmethod
def __setiri__(self, key, value):
"""Set IRI to an attribute"""

def get(self, item, default=None):
attrs = self._attr.get(self.IRI_ATTR_NAME, None)
if attrs is None:
return default
return attrs.get(item, default)

def __getitem__(self, item) -> Union[str, None]:
iri_attr_dict = self._attr.get(self.IRI_ATTR_NAME, None)
if iri_attr_dict is None:
return None
return iri_attr_dict.get(item, None)

def __setitem__(self, key, value: str):
if key not in self._attr:
raise KeyError(f'No attribute "{key}" found. Cannot assign an IRI to a non-existing attribute.')
self.__setiri__(key, str(value))

def keys(self):
"""Return all attribute names assigned to the IRIs"""
return self._attr.get(self.IRI_ATTR_NAME, {}).keys()

def values(self):
"""Return all IRIs assigned to the attributes"""
return self._attr.get(self.IRI_ATTR_NAME, {}).values()

def items(self):
"""Return all attribute names and IRIs"""
return self._attr.get(self.IRI_ATTR_NAME, {}).items()

def __iter__(self):
return iter(self.keys())


class RDF_Predicate(_RDFPO):
"""IRI class attribute manager"""

IRI_ATTR_NAME = RDF_PREDICATE_ATTR_NAME

def __setiri__(self, key, value):
set_predicate(self._attr, key, value)


class RDF_OBJECT(_RDFPO):
"""IRI data attribute manager for objects"""
IRI_ATTR_NAME = RDF_OBJECT_ATTR_NAME

def __setiri__(self, key, value):
set_object(self._attr, key, value)
15 changes: 10 additions & 5 deletions tests/wrapper/test_jsonld.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import json
import ontolutils
import pathlib
import rdflib
import unittest
from ontolutils import M4I
from ontolutils import namespaces, urirefs, Thing

import h5rdmtoolbox as h5tbx
import ontolutils
from h5rdmtoolbox import __version__
from h5rdmtoolbox.wrapper import jsonld
from h5rdmtoolbox.wrapper import rdf
from ontolutils import namespaces, urirefs, Thing

logger = h5tbx.logger

Expand Down Expand Up @@ -36,7 +37,6 @@ def test_dump_hdf_to_json(self):
attrs={'standard_name': 'x_velocity',
'standard_name_non_iri': 'x_velocity',
'unit': 'm/s'})
from ontolutils import M4I
ds.rdf.subject = str(M4I.NumericalVariable)
ds.rdf.predicate['standard_name'] = 'https://matthiasprobst.github.io/ssno#standard_name'
ds.rdf.object['standard_name'] = 'https://matthiasprobst.github.io/pivmeta#x_velocity'
Expand Down Expand Up @@ -91,10 +91,15 @@ def test_jsonld_dumps(self):
grp = h5.create_group('grp')
grp.attrs['test', sn_iri] = 'test'
sub_grp = grp.create_group('Fan')
sub_grp.create_dataset('D3', data=300)
ds = sub_grp.create_dataset('D3', data=300)
sub_grp['D3'].attrs['units', 'http://w3id.org/nfdi4ing/metadata4ing#hasUnits'] = 'mm'
sub_grp['D3'].rdf['units'].object = 'https://qudt.org/vocab/unit/MilliM'
sub_grp['D3'].attrs['standard_name', sn_iri] = 'blade_diameter3'
ds.rdf.subject = 'https://w3id.org/nfdi4ing/metadata4ing#NumericalVariable'
self.assertEqual(ds.rdf.subject, 'https://w3id.org/nfdi4ing/metadata4ing#NumericalVariable')
from h5rdmtoolbox.wrapper.rdf import RDF_SUBJECT_ATTR_NAME
self.assertEqual(ds.attrs[RDF_SUBJECT_ATTR_NAME],
'https://w3id.org/nfdi4ing/metadata4ing#NumericalVariable')
h5.dumps()
from pprint import pprint
out_dict = h5tbx.jsonld.dumpd(h5.hdf_filename,
Expand All @@ -104,7 +109,7 @@ def test_jsonld_dumps(self):
pprint(out_dict)
found_m4iNumericalVariable = False
for g in out_dict['@graph']:
if g['@type'] == 'm4i:NumericalVariable':
if 'https://w3id.org/nfdi4ing/metadata4ing#NumericalVariable' in g['@type']:
self.assertDictEqual(g['m4i:hasUnits'], {'@id': 'https://qudt.org/vocab/unit/MilliM'})
self.assertEqual(g['ssno:standard_name'], 'blade_diameter3')
found_m4iNumericalVariable = True
Expand Down
Loading

0 comments on commit dfeb53f

Please sign in to comment.