Skip to content

Commit

Permalink
extract AID information when added to ObjectStore
Browse files Browse the repository at this point in the history
  • Loading branch information
shenchenruwo committed Aug 13, 2024
1 parent ffa8d84 commit 296cdcf
Showing 1 changed file with 202 additions and 3 deletions.
205 changes: 202 additions & 3 deletions basyx/aas/model/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import abc
import hashlib
from typing import List, Optional, TypeVar, MutableSet, Generic, \
Iterable, Dict, Iterator, Tuple, Union
Iterable, Dict, Iterator, Tuple, Union, Any

from .base import Referable, Identifiable, Identifier, \
UniqueIdShortNamespace, NameType, NamespaceSet, Qualifier, Extension
Expand Down Expand Up @@ -107,13 +107,213 @@ def __init__(self, objects: Iterable[_IT] = ()) -> None:
def get_identifiable(self, identifier: Identifier) -> _IT:
return self._backend[identifier]

def add(self, x: _IT) -> None:
def add(self, x: _IT, is_aimc: bool = False) -> None:
if x.id in self._backend and self._backend.get(x.id) is not x:
raise KeyError(
"Identifiable object with same id {} is already stored in this store"
.format(x.id))
self._backend[x.id] = x

# Check if the added object is AssetInterfacesMappingConfiguration
if is_aimc or (isinstance(x, model.Submodel) and self._is_aimc(x)):
self._add_source_from_AIMC(x)

def _is_aimc(self, submodel: model.Submodel) -> bool:
"""
Check if the submodel is an AssetInterfacesMappingConfiguration.
"""
# Check by idShort
if submodel.id_short == "AssetInterfacesMappingConfiguration":
return True

# Check by semanticId
if submodel.semantic_id:
if isinstance(submodel.semantic_id, model.ExternalReference):
# Check each key in the ExternalReference
for key in submodel.semantic_id.key:
if "assetinterfacesmappingconfiguration" in key.value.lower():
return True

return False

def _add_source_from_AIMC(self, aimc: model.Submodel) -> None:
"""
Process AssetInterfacesMappingConfiguration and update _mapping.
"""
# TODO: Add checking with semanticID
mapping_configurations = next((sme for sme in aimc.submodel_element if sme.id_short == "MappingConfigurations"),
None)
if mapping_configurations and isinstance(mapping_configurations, model.SubmodelElementList):
for config in mapping_configurations.value:
self._process_mapping_configuration(config)

def _process_mapping_configuration(self, config: model.SubmodelElementCollection) -> None:
"""
Process a single MappingConfiguration.
"""
interface_reference = next((sme for sme in config.value if sme.id_short == "InterfaceReference"), None)
if not interface_reference or not isinstance(interface_reference, model.ReferenceElement):
print("InterfaceReference not found or not of type ReferenceElement")
return

protocol = self._extract_protocol_from_interface_reference(interface_reference.value)
if not protocol:
print(f"Unable to determine protocol for interface reference: {interface_reference.value}")
return

# Extract AID parameters
aid_parameters = self._extract_aid_parameters(interface_reference.value, protocol)

mapping_relations = next((sme for sme in config.value if sme.id_short == "MappingSourceSinkRelations"), None)
if mapping_relations and isinstance(mapping_relations, model.SubmodelElementList):
for relation in mapping_relations.value:
if isinstance(relation, model.RelationshipElement):
self._process_relationship_element(relation, protocol, aid_parameters)
else:
print("MappingSourceSinkRelations not found or not of type SubmodelElementList")

def _process_relationship_element(self, relation: model.RelationshipElement, protocol: Protocol,
aid_parameters: Dict[str, Any]) -> None:
"""
Process a single RelationshipElement and update _mapping.
"""
if relation.second:
second_hash = self.generate_model_reference_hash(relation.second)
if second_hash not in self._mapping:
self._mapping[second_hash] = {}

# Combine AID parameters with the protocol
self._mapping[second_hash][protocol] = aid_parameters

def _extract_aid_parameters(self, aid_reference: model.ModelReference, protocol: Protocol) -> Dict[str, Any]:
"""
Extract parameters from the AID element referenced by aid_reference
:param aid_reference: ModelReference to the AID element
:param protocol: Protocol type
:return: Dictionary containing extracted parameters
"""
try:
aid_element = aid_reference.resolve(self)
except Exception as e:
print(f"Error resolving AID element for reference: {aid_reference}. Error: {e}")
return {}

if not isinstance(aid_element, model.SubmodelElementCollection):
print(f"AID element is not a SubmodelElementCollection: {type(aid_element)}")
return {}

if protocol == Protocol.HTTP:
return self._extract_http_parameters(aid_element)
elif protocol == Protocol.MQTT:
return self._extract_mqtt_parameters(aid_element)
elif protocol == Protocol.MODBUS:
return self._extract_modbus_parameters(aid_element)
else:
print(f"Unsupported protocol: {protocol}")
return {}

def _extract_http_parameters(self, aid_element: model.SubmodelElementCollection) -> Dict[str, Any]:
parameters = {}
for element in aid_element.value:
if element.id_short == "EndpointMetadata":
for endpoint_element in element.value:
if endpoint_element.id_short == "base":
parameters["base"] = endpoint_element.value
elif element.id_short == "InterfaceMetadata":
for interface_element in element.value:
if interface_element.id_short == "Properties":
for property_element in interface_element.value:
if isinstance(property_element, model.SubmodelElementCollection):
property_params = self._extract_property_parameters(property_element)
parameters[property_element.id_short] = property_params
return parameters

def _extract_mqtt_parameters(self, aid_element: model.SubmodelElementCollection) -> Dict[str, Any]:
parameters = {}
for element in aid_element.value:
if element.id_short == "EndpointMetadata":
for endpoint_element in element.value:
if endpoint_element.id_short == "base":
parameters["base"] = endpoint_element.value
elif element.id_short == "InterfaceMetadata":
for interface_element in element.value:
if interface_element.id_short == "Properties":
for property_element in interface_element.value:
if isinstance(property_element, model.SubmodelElementCollection):
property_params = self._extract_property_parameters(property_element)
parameters[property_element.id_short] = property_params
return parameters

def _extract_modbus_parameters(self, aid_element: model.SubmodelElementCollection) -> Dict[str, Any]:
parameters = {}
for element in aid_element.value:
if element.id_short == "EndpointMetadata":
for endpoint_element in element.value:
if endpoint_element.id_short == "base":
parameters["base"] = endpoint_element.value
elif element.id_short == "InterfaceMetadata":
for interface_element in element.value:
if interface_element.id_short == "Properties":
for property_element in interface_element.value:
if isinstance(property_element, model.SubmodelElementCollection):
property_params = self._extract_property_parameters(property_element)
parameters[property_element.id_short] = property_params
return parameters

def _extract_property_parameters(self, property_element: model.SubmodelElementCollection) -> Dict[str, Any]:
property_params = {}
for element in property_element.value:
if element.id_short in ["type", "valueType", "unit"]:
property_params[element.id_short] = element.value
elif element.id_short == "forms":
for form_element in element.value:
if form_element.id_short == "href":
property_params["href"] = form_element.value
elif form_element.id_short.startswith("modv_") or form_element.id_short.startswith(
"htv_") or form_element.id_short.startswith("mqv_"):
property_params[form_element.id_short] = form_element.value
return property_params

def _extract_protocol_from_interface_reference(self, interface_ref: model.ModelReference) -> Optional[Protocol]:
"""
Extract Protocol information from InterfaceReference.
"""
try:
interface_element = interface_ref.resolve(self)
except Exception as e:
print(f"Error resolving InterfaceReference: {e}")
return None

if not isinstance(interface_element, model.SubmodelElementCollection):
print(f"Unexpected type for interface element: {type(interface_element)}")
return None

# Check idShort
id_short = interface_element.id_short.lower()
if "http" in id_short:
return Protocol.HTTP
elif "mqtt" in id_short:
return Protocol.MQTT
elif "modbus" in id_short:
return Protocol.MODBUS

# Check Supplemental Semantic IDs
if interface_element._supplemental_semantic_ids:
for semantic_id in interface_element._supplemental_semantic_ids:
if isinstance(semantic_id, model.ExternalReference):
for key in semantic_id.key:
key_value = key.value.lower()
if "http" in key_value:
return Protocol.HTTP
elif "mqtt" in key_value:
return Protocol.MQTT
elif "modbus" in key_value:
return Protocol.MODBUS

print(f"Unable to determine protocol for interface: {interface_element.id_short}")
return None

def discard(self, x: _IT) -> None:
if self._backend.get(x.id) is x:
del self._backend[x.id]
Expand All @@ -133,7 +333,6 @@ def generate_model_reference_hash(self, model_ref: model.ModelReference) -> str:
def add_source(self, referable: model.Referable, protocol: Protocol, source: Union[str, dict]):
"""
Add a corresponding source to the mapping table. The input is extracted either from the AID or custom input.
TODO: add a new function to use the AIMC and AID as input
TODO: adapt the source dict to a source class
"""
model_ref = model.ModelReference.from_referable(referable)
Expand Down

0 comments on commit 296cdcf

Please sign in to comment.