Skip to content

Commit

Permalink
Refactor NamespaceSet
Browse files Browse the repository at this point in the history
- Refactored `NamespaceSet.add()` as too big
  - Extracted some methods, in particular `_execute_item_del_hook`. I used the method also in other places
- As we check different constraints for uniqueness in the namespace, I defined ATTRIBUTES_CONSTRAINT_IDS. The dict will be used when throwing exception. The solution with the dict is temporary, we need other solution here.
  • Loading branch information
zrgt committed Nov 8, 2023
1 parent 5eb895e commit 4515de1
Showing 1 changed file with 73 additions and 47 deletions.
120 changes: 73 additions & 47 deletions basyx/aas/model/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1765,6 +1765,14 @@ def remove_object_by_semantic_id(self, semantic_id: Reference) -> None:

ATTRIBUTE_TYPES = Union[NameType, Reference, QualifierType]

# TODO: Find a better solution for providing constraint ids
ATTRIBUTES_CONSTRAINT_IDS = {
"id_short": 117, # Referable,
"type": 21, # Qualifier,
"name": 77, # Extension,
"semantic_id": 134, # model.OperationVariable
}


class NamespaceSet(MutableSet[_NSO], Generic[_NSO]):
"""
Expand Down Expand Up @@ -1864,58 +1872,81 @@ def __len__(self) -> int:
def __iter__(self) -> Iterator[_NSO]:
return iter(next(iter(self._backend.values()))[0].values())

def add(self, value: _NSO):
if value.parent is not None and value.parent is not self.parent:
raise ValueError("Object has already a parent, but it must not be part of two namespaces.")
def add(self, element: _NSO):
if element.parent is not None and element.parent is not self.parent:
raise ValueError("Object has already a parent; it cannot belong to two namespaces.")
# TODO remove from current parent instead (allow moving)?
if self._item_id_set_hook is not None:
self._item_id_set_hook(value)

self._execute_item_id_set_hook(element)
self._validate_namespace_constraints(element)
self._execute_item_add_hook(element)

element.parent = self.parent
for key_attr_name, (backend, case_sensitive) in self._backend.items():
backend[self._get_attribute(element, key_attr_name, case_sensitive)] = element

def _validate_namespace_constraints(self, element: _NSO):
for set_ in self.parent.namespace_element_sets:
for attr_name, (backend, case_sensitive) in set_._backend.items():
if hasattr(value, attr_name):
attr_value = self._get_attribute(value, attr_name, case_sensitive)
if attr_value is None:
raise AASConstraintViolation(117, f"{value!r} has attribute {attr_name}=None, which is not "
f"allowed within a {self.parent.__class__.__name__}!")
if attr_value in backend:
raise AASConstraintViolation(22, "Object with attribute (name='{}', value='{}') is already "
"present in {}"
.format(attr_name, str(getattr(value, attr_name)),
"this set of objects"
if set_ is self else "another set in the same namespace"))
for key_attr_name, (backend_dict, case_sensitive) in set_._backend.items():
if hasattr(element, key_attr_name):
key_attr_value = self._get_attribute(element, key_attr_name, case_sensitive)
self._check_attr_is_not_none(element, key_attr_name, key_attr_value)
self._check_value_is_not_in_backend(key_attr_name, key_attr_value, backend_dict, set_)

def _check_attr_is_not_none(self, element: _NSO, attr_name: str, attr):
if attr is None:
if attr_name == "id_short":
raise AASConstraintViolation(117, f"{element!r} has attribute {attr_name}=None, "
f"which is not allowed within a {self.parent.__class__.__name__}!")
else:
raise ValueError(f"{element!r} has attribute {attr_name}=None, which is not allowed!")

def _check_value_is_not_in_backend(self, attr_name: str, attr, backend_dict: Dict, set_: "NamespaceSet"):
if attr in backend_dict:
if set_ is self:
text = f"Object with attribute (name='{attr_name}', value='{attr}') " \
f"is already present in this set of objects"
else:
text = f"Object with attribute (name='{attr_name}', value='{attr}') " \
f"is already present in another set in the same namespace"
raise AASConstraintViolation(ATTRIBUTES_CONSTRAINT_IDS.get(attr_name, 0), text)

def _execute_item_id_set_hook(self, element: _NSO):
if self._item_id_set_hook is not None:
self._item_id_set_hook(element)

def _execute_item_add_hook(self, element: _NSO):
if self._item_add_hook is not None:
try:
self._item_add_hook(value, self.__iter__())
except Exception:
if self._item_id_del_hook is not None:
self._item_id_del_hook(value)
self._item_add_hook(element, self.__iter__())
except Exception as e:
self._execute_item_del_hook(element)
raise
value.parent = self.parent
for attr_name, (backend, case_sensitive) in self._backend.items():
backend[self._get_attribute(value, attr_name, case_sensitive)] = value

def _execute_item_del_hook(self, element: _NSO):
# parent needs to be unset first, otherwise generated id_shorts cannot be unset
# see SubmodelElementList
if hasattr(element, "parent"):
element.parent = None
if self._item_id_del_hook is not None:
self._item_id_del_hook(element)

def remove_by_id(self, attribute_name: str, identifier: ATTRIBUTE_TYPES) -> None:
item = self.get_object_by_attribute(attribute_name, identifier)
self.remove(item)

def remove(self, item: _NSO) -> None:
item_found = False
for attr_name, (backend, case_sensitive) in self._backend.items():
item_in_dict = backend[self._get_attribute(item, attr_name, case_sensitive)]
if item_in_dict is item:
for key_attr_name, (backend_dict, case_sensitive) in self._backend.items():
key_attr_value = self._get_attribute(item, key_attr_name, case_sensitive)
if backend_dict[key_attr_value] is item:
# item has to be removed from backend before _item_del_hook() is called,
# as the hook may unset the id_short, as in SubmodelElementLists
del backend_dict[key_attr_value]
item_found = True
break
if not item_found:
raise KeyError("Object not found in NamespaceDict")
# parent needs to be unset first, otherwise generated id_shorts cannot be unset
# see SubmodelElementList
item.parent = None
# item has to be removed from backend before _item_del_hook() is called, as the hook may unset the id_short,
# as in SubmodelElementLists
for attr_name, (backend, case_sensitive) in self._backend.items():
del backend[self._get_attribute(item, attr_name, case_sensitive)]
if self._item_id_del_hook is not None:
self._item_id_del_hook(item)
self._execute_item_del_hook(item)

def discard(self, x: _NSO) -> None:
if x not in self:
Expand All @@ -1924,19 +1955,14 @@ def discard(self, x: _NSO) -> None:

def pop(self) -> _NSO:
_, value = next(iter(self._backend.values()))[0].popitem()
if self._item_id_del_hook is not None:
self._item_id_del_hook(value)
self._execute_item_del_hook(value)
value.parent = None
return value

def clear(self) -> None:
for attr_name, (backend, case_sensitive) in self._backend.items():
for value in backend.values():
# parent needs to be unset first, otherwise generated id_shorts cannot be unset
# see SubmodelElementList
value.parent = None
if self._item_id_del_hook is not None:
self._item_id_del_hook(value)
self._execute_item_del_hook(value)
for attr_name, (backend, case_sensitive) in self._backend.items():
backend.clear()

Expand Down Expand Up @@ -2047,9 +2073,9 @@ def __init__(self, parent: Union[UniqueIdShortNamespace, UniqueSemanticIdNamespa
def __iter__(self) -> Iterator[_NSO]:
return iter(self._order)

def add(self, value: _NSO):
super().add(value)
self._order.append(value)
def add(self, element: _NSO):
super().add(element)
self._order.append(element)

def remove(self, item: Union[Tuple[str, ATTRIBUTE_TYPES], _NSO]):
if isinstance(item, tuple):
Expand Down

0 comments on commit 4515de1

Please sign in to comment.