This SparkplugB decoder is an MQTT Application Node deployed on the edge with the following functionality
-
subscribes to the Topic
spBv1.0/#
-
decodes the payload received and extract the metrics received
-
enhance the decoded message to add
- *<group_id>
- *<edge_node_id>
- *<device_id>*if provided*
-
extract metric name to get ISA-95 UNS namespace
-
if metric alias is used, map the metric alias to previously provided metric name
-
collate all metrics specific to one metric name and create a consolidated message payload
-
publish consolidate message payload to the UNS namespace
This is not a SCADA/IIOT host and will not be publishing any control messages to the broker
With reference to the article Using Sparkplug to Map ISA 95, each SparkPlugB message contains an attribute name which contains the ISA namespace for the message in addition to the tag. e.g.
{
"timestamp": 1486144502122,
"metrics": [
{
"name": "Enterprise/Site/Area/Line/Cell/Tag1",
"alias": 1,
"timestamp": 1479123452194,
"dataType": "String",
"isHistorical": false,
"value": "Test"
},
{
"name": "Enterprise/Site/Area/Line/Tag2",
"alias": 2,
"timestamp": 1479123452196,
"dataType": "String",
"isHistorical": false,
"value": "LineTest"
}
],
"seq": 2
}
When we parse the metric name we get - The topic name in the UNS i.e. Enterprise/Site/Area/Line/Cell
- The name of the tag i.e. Tag1
and Tag2
which will now be added as an attribute to the JSON object being published to the UNS
The spB payload may contain multiple metrics which belong to different topics hence we merge all tags belonging to the name UNS topic. The logic for merging is as following - the name of the metric will be split into the topic and tag - tags for the same topic are merged in to one JSON object - if a metric is historical, we create an array of tag values with the latest version being at 0 position - if a metric name is duplicated and not historical, the newer tag value will override the older tag value in the JSON object - the overall timestamp for the JSON object is the max of timestamps of all the tags
This mapping is to be done only for the message types
- NDATA
- DDATA
- NCMD
- DCMD
IMPORTANT: The birth and death messages are not mapped to the UNS currently
This application has two configuration file.
-
settings.yaml: Contain the key configurations need to connect with MQTT brokers
key sub key description default value mqtt host* Hostname of the mqtt broker instant. Mandatory configuration None mqtt port Port of the mqtt broker (int) 1883 mqtt topics Array of topics to be subscribed to. Must be in the names space of SpB i.e. spBv1.0/# ["spBv1.0/#"] mqtt qos QOS for the subscription. Valid values are 0,1,2 1 mqtt keep*alive Maximum time interval in seconds between two control packet published by the client (int) _60* mqtt reconnect*on_failure Makes the client handle reconnection(s). Recommend keeping this True (True,False) _True* mqtt version The MQTT version to be used for connecting to the broker. Valid values are : 5 (for MQTTv5), 4 (for MQTTv311) , 3(for MQTTv31) 5 mqtt clean*session Boolean value to be specified only if MQTT Version is not 5 _None* mqtt transport Valid values are "websockets", "tcp" "tcp" mqtt ignored*attributes Map of topic & list of attributes which are to be ignored from persistence. supports wild cards for topics and nested via . notation for the attributes
e.g.
{
'topic1' : ["attr1", "attr2", "attr2.subAttr1" ],
'topic2/+' : ["A", "A.B.C"],
'topic3/#' : ["x", "Y"]
}None mqtt timestamp_attribute the attribute name which should contain the timestamp of the message's publishing *"timestamp"_ sparkplugb _currently empty_ dynaconf_merge* Mandatory param. Always keep value as true -
.secret.yaml : Contains the username and passwords to connect to the MQTT cluster This file is not checked into the repository for security purposes. However there is a template file provided
.secrets_template.yaml
which should be edited and renamed to.secrets.yaml
key sub key sub key description default value mqtt username The user id needed to authenticate with the MQTT broker None mqtt password The password needed to authenticate with the MQTT broker None mqtt tls Provide a map of attributes needed for a TLS connection to the MQTT broker. See below attributes None mqtt tls ca*certs fully qualified path to the ca cert file. Mandatory for an SSL connection _None* mqtt tls certfile fully qualified path to the cert file None mqtt tls keyfile fully qualified path to the keyfile for the cert None mqtt tls cert*reqs Boolean. If note provided then ssl.CERT_NONE is used. if True the ssl.CERT_REQUIRED is used. else ssl.CERT_OPTIONAL is used _None* mqtt tls ciphers Specify which encryption ciphers are allowed for this connection None mqtt tls keyfile*password Password used to encrypt your certfile and keyfile _None* mqtt tls insecure*cert Boolean. Skips hostname checking required for self signed certificates. _True* dynaconf_merge* Mandatory param. Always keep value as true
This sub module can be independently setup as a dev environment in the folder 05_sparkplugb
This has been tested on Unix(bash), Windows(powershell) and Mac(zsh)
python -m pip install --upgrade pip
pip install poetry
# Ensure that the poetry shell is activated
poetry shell
python -m pip install --upgrade pip poetry
poetry install
python ./src/uns_spb_mapper/uns_sparkplugb_listener.py
Setting up VSCode
While importing the folder into VSCode remember to do the following steps the first time
Open a terminal in VSCode
Activate the poetry shell
poetry shell python -m pip install --upgrade pip poetry poetry installSelect the correct python interpreter in VSCode (should automatically detect the poetry virtual environment)
This function is executed by the following command with the current folder as 05_sparkplugb
Ensure that the configuration files are correctly updated to your MQTT broker and database instance
# Ensure that the poetry shell is activated
poetry shell
poetry install
python ./src/uns_spb_mapper/uns_sparkplugb_listener.py
The set of test for this module is executed by
#run all tests excluding integration tests
poetry run pytest -m "not integrationtest" test/
# runs all tests
poetry run pytest test/
The docker container image for this module are built and store in the Dockerize module published to Github Container Registry
The way to run the container is
# docker pull ghcr.io/mkashwin/unifiednamespace/uns/spb_mapper:<tag>
# e.g.
docker pull ghcr.io/mkashwin/unifiednamespace/uns/spb_mapper:latest
# docker run --name <container name> -d s-v <full path to conf>/:/app/conf uns/spb_mapper:<tag>
docker run --name spb_to_uns_mqtt -d -v $PWD/conf:/app/conf ghcr.io/mkashwin/unifiednamespace/uns/spb_mapper:latest
Note: Remember to update the following before executing
-
<container name> (optional): Identifier for the container so you can work with the same container instance using
docker start <container name> docker stop <container name>
-
<full path to conf> (Mandatory): Volume mounted to the container containing the configurations. See Key Configurations to provide. Give the complete path and not relative path
-
If you are running this image on the host as the MQTT broker pass the flag
--network host
along with docker run to enurelocalhost
services on the host are correctly resolved
- The application assumes the the MQTT broker for SparkPlugB and the UNS are one and the same as it does not sense to have separate brokers for the same. This can be enhanced easily if there is a requirement for the same. Please create issue on the Github project
- Need to understand how to handle NDEATH, DDEATH,
STATEmessage types- STATE messages are not Protobuf messages but rather JSON messages. They cannot be mapped to the UNS as they have no metrics
-
Handle non compliant messages- non compliant messages will be logged as an error and ignored
- Need to understand how to handle metric types DataSet, Template
- Need to understand how to handle metadata, properties, is_multi_part etc.