From defe3b7415537274eacada9906f4802d6769acad Mon Sep 17 00:00:00 2001 From: Igor Garmaev <56840636+zrgt@users.noreply.github.com> Date: Tue, 14 Nov 2023 13:37:00 +0100 Subject: [PATCH] model.base: Improve `NamespaceSet` (#155) - Refactor `NamespaceSet.add()` as too big - Extract some methods, in particular `_execute_item_del_hook` to avoid code duplication - As we check different constraints for uniqueness in the namespace, I defined `ATTRIBUTES_CONSTRAINT_IDS`. The dict will be used when throwing exception. The solution with the dict is temporary, we need another solution here. - Use correct constraint ids for each NamespaceSet in tests, use 000 if not constraint id is suitable --- basyx/aas/model/base.py | 121 ++++++++++++++++++++++++---------------- test/model/test_base.py | 6 +- 2 files changed, 77 insertions(+), 50 deletions(-) diff --git a/basyx/aas/model/base.py b/basyx/aas/model/base.py index 5677fb19b..9afc63cfb 100644 --- a/basyx/aas/model/base.py +++ b/basyx/aas/model/base.py @@ -1769,6 +1769,14 @@ def remove_object_by_semantic_id(self, semantic_id: Reference) -> None: ATTRIBUTE_TYPES = Union[NameType, Reference, QualifierType] +# TODO: Find a better solution for providing constraint ids +ATTRIBUTES_CONSTRAINT_IDS = { + "id_short": 22, # Referable, + "type": 21, # Qualifier, + "name": 77, # Extension, + # "id_short": 134, # model.OperationVariable +} + class NamespaceSet(MutableSet[_NSO], Generic[_NSO]): """ @@ -1868,35 +1876,65 @@ def __len__(self) -> int: def __iter__(self) -> Iterator[_NSO]: return iter(next(iter(self._backend.values()))[0].values()) - def add(self, value: _NSO): - if value.parent is not None and value.parent is not self.parent: - raise ValueError("Object has already a parent, but it must not be part of two namespaces.") + def add(self, element: _NSO): + if element.parent is not None and element.parent is not self.parent: + raise ValueError("Object has already a parent; it cannot belong to two namespaces.") # TODO remove from current parent instead (allow moving)? - if self._item_id_set_hook is not None: - self._item_id_set_hook(value) + + self._execute_item_id_set_hook(element) + self._validate_namespace_constraints(element) + self._execute_item_add_hook(element) + + element.parent = self.parent + for key_attr_name, (backend, case_sensitive) in self._backend.items(): + backend[self._get_attribute(element, key_attr_name, case_sensitive)] = element + + def _validate_namespace_constraints(self, element: _NSO): for set_ in self.parent.namespace_element_sets: - for attr_name, (backend, case_sensitive) in set_._backend.items(): - if hasattr(value, attr_name): - attr_value = self._get_attribute(value, attr_name, case_sensitive) - if attr_value is None: - raise AASConstraintViolation(117, f"{value!r} has attribute {attr_name}=None, which is not " - f"allowed within a {self.parent.__class__.__name__}!") - if attr_value in backend: - raise AASConstraintViolation(22, "Object with attribute (name='{}', value='{}') is already " - "present in {}" - .format(attr_name, str(getattr(value, attr_name)), - "this set of objects" - if set_ is self else "another set in the same namespace")) + for key_attr_name, (backend_dict, case_sensitive) in set_._backend.items(): + if hasattr(element, key_attr_name): + key_attr_value = self._get_attribute(element, key_attr_name, case_sensitive) + self._check_attr_is_not_none(element, key_attr_name, key_attr_value) + self._check_value_is_not_in_backend(element, key_attr_name, key_attr_value, backend_dict, set_) + + def _check_attr_is_not_none(self, element: _NSO, attr_name: str, attr): + if attr is None: + if attr_name == "id_short": + raise AASConstraintViolation(117, f"{element!r} has attribute {attr_name}=None, " + f"which is not allowed within a {self.parent.__class__.__name__}!") + else: + raise ValueError(f"{element!r} has attribute {attr_name}=None, which is not allowed!") + + def _check_value_is_not_in_backend(self, element: _NSO, attr_name: str, attr, + backend_dict: Dict[ATTRIBUTE_TYPES, _NSO], set_: "NamespaceSet"): + if attr in backend_dict: + if set_ is self: + text = f"Object with attribute (name='{attr_name}', value='{getattr(element, attr_name)}') " \ + f"is already present in this set of objects" + else: + text = f"Object with attribute (name='{attr_name}', value='{getattr(element, attr_name)}') " \ + f"is already present in another set in the same namespace" + raise AASConstraintViolation(ATTRIBUTES_CONSTRAINT_IDS.get(attr_name, 0), text) + + def _execute_item_id_set_hook(self, element: _NSO): + if self._item_id_set_hook is not None: + self._item_id_set_hook(element) + + def _execute_item_add_hook(self, element: _NSO): if self._item_add_hook is not None: try: - self._item_add_hook(value, self.__iter__()) - except Exception: - if self._item_id_del_hook is not None: - self._item_id_del_hook(value) + self._item_add_hook(element, self.__iter__()) + except Exception as e: + self._execute_item_del_hook(element) raise - value.parent = self.parent - for attr_name, (backend, case_sensitive) in self._backend.items(): - backend[self._get_attribute(value, attr_name, case_sensitive)] = value + + def _execute_item_del_hook(self, element: _NSO): + # parent needs to be unset first, otherwise generated id_shorts cannot be unset + # see SubmodelElementList + if hasattr(element, "parent"): + element.parent = None + if self._item_id_del_hook is not None: + self._item_id_del_hook(element) def remove_by_id(self, attribute_name: str, identifier: ATTRIBUTE_TYPES) -> None: item = self.get_object_by_attribute(attribute_name, identifier) @@ -1904,22 +1942,16 @@ def remove_by_id(self, attribute_name: str, identifier: ATTRIBUTE_TYPES) -> None def remove(self, item: _NSO) -> None: item_found = False - for attr_name, (backend, case_sensitive) in self._backend.items(): - item_in_dict = backend[self._get_attribute(item, attr_name, case_sensitive)] - if item_in_dict is item: + for key_attr_name, (backend_dict, case_sensitive) in self._backend.items(): + key_attr_value = self._get_attribute(item, key_attr_name, case_sensitive) + if backend_dict[key_attr_value] is item: + # item has to be removed from backend before _item_del_hook() is called, + # as the hook may unset the id_short, as in SubmodelElementLists + del backend_dict[key_attr_value] item_found = True - break if not item_found: raise KeyError("Object not found in NamespaceDict") - # parent needs to be unset first, otherwise generated id_shorts cannot be unset - # see SubmodelElementList - item.parent = None - # item has to be removed from backend before _item_del_hook() is called, as the hook may unset the id_short, - # as in SubmodelElementLists - for attr_name, (backend, case_sensitive) in self._backend.items(): - del backend[self._get_attribute(item, attr_name, case_sensitive)] - if self._item_id_del_hook is not None: - self._item_id_del_hook(item) + self._execute_item_del_hook(item) def discard(self, x: _NSO) -> None: if x not in self: @@ -1928,19 +1960,14 @@ def discard(self, x: _NSO) -> None: def pop(self) -> _NSO: _, value = next(iter(self._backend.values()))[0].popitem() - if self._item_id_del_hook is not None: - self._item_id_del_hook(value) + self._execute_item_del_hook(value) value.parent = None return value def clear(self) -> None: for attr_name, (backend, case_sensitive) in self._backend.items(): for value in backend.values(): - # parent needs to be unset first, otherwise generated id_shorts cannot be unset - # see SubmodelElementList - value.parent = None - if self._item_id_del_hook is not None: - self._item_id_del_hook(value) + self._execute_item_del_hook(value) for attr_name, (backend, case_sensitive) in self._backend.items(): backend.clear() @@ -2051,9 +2078,9 @@ def __init__(self, parent: Union[UniqueIdShortNamespace, UniqueSemanticIdNamespa def __iter__(self) -> Iterator[_NSO]: return iter(self._order) - def add(self, value: _NSO): - super().add(value) - self._order.append(value) + def add(self, element: _NSO): + super().add(element) + self._order.append(element) def remove(self, item: Union[Tuple[str, ATTRIBUTE_TYPES], _NSO]): if isinstance(item, tuple): diff --git a/test/model/test_base.py b/test/model/test_base.py index aa40e0199..3fa8a64a3 100644 --- a/test/model/test_base.py +++ b/test/model/test_base.py @@ -374,7 +374,7 @@ def test_NamespaceSet(self) -> None: self.assertEqual( "Object with attribute (name='semantic_id', value='ExternalReference(key=(Key(" "type=GLOBAL_REFERENCE, value=http://acplt.org/Test1),))') is already present in this set of objects " - "(Constraint AASd-022)", + "(Constraint AASd-000)", str(cm.exception)) self.namespace.set2.add(self.prop5) self.namespace.set2.add(self.prop6) @@ -389,7 +389,7 @@ def test_NamespaceSet(self) -> None: self.assertEqual( "Object with attribute (name='semantic_id', value='" "ExternalReference(key=(Key(type=GLOBAL_REFERENCE, value=http://acplt.org/Test1),))')" - " is already present in another set in the same namespace (Constraint AASd-022)", + " is already present in another set in the same namespace (Constraint AASd-000)", str(cm.exception)) self.assertIs(self.prop1, self.namespace.set1.get("id_short", "Prop1")) @@ -456,7 +456,7 @@ def test_NamespaceSet(self) -> None: with self.assertRaises(model.AASConstraintViolation) as cm: self.namespace3.set1.add(self.qualifier1alt) self.assertEqual("Object with attribute (name='type', value='type1') is already present in this set " - "of objects (Constraint AASd-022)", + "of objects (Constraint AASd-021)", str(cm.exception)) def test_namespaceset_hooks(self) -> None: