Skip to content

Commit

Permalink
Multiple tags write via Server-side RPC for OPC UA extension (#151)
Browse files Browse the repository at this point in the history
* Implemented multiple tags write via RPC for OPC UA extension.

* Fix merge conflicts. Fix license header.
  • Loading branch information
dvk2018 authored and mp-loki committed Dec 26, 2018
1 parent 41b25e4 commit 50f22c9
Show file tree
Hide file tree
Showing 6 changed files with 443 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.thingsboard.gateway.extensions.opc.conf.mapping.DeviceMapping;
import org.thingsboard.gateway.extensions.common.conf.mapping.KVMapping;
import org.thingsboard.gateway.extensions.opc.conf.mapping.TimeseriesMapping;
import org.thingsboard.gateway.extensions.opc.scan.OpcUaNode;
import org.thingsboard.server.common.data.kv.*;

import java.util.*;
Expand All @@ -33,7 +34,7 @@
@Data
public class OpcUaDevice {

private final NodeId nodeId;
private final OpcUaNode opcNode;
private final DeviceMapping mapping;
private final Map<String, NodeId> tagKeysMap = new HashMap<>();
private final Map<NodeId, String> tagIdsMap = new HashMap<>();
Expand Down Expand Up @@ -68,12 +69,14 @@ private NodeId registerTag(Map.Entry<String, NodeId> kv) {
return tagKeysMap.put(kv.getKey(), kv.getValue());
}

public void calculateDeviceName(Map<String, String> deviceNameTagValues) {
public String calculateDeviceName(Map<String, String> deviceNameTagValues) {
String deviceNameTmp = mapping.getDeviceNamePattern();
for (Map.Entry<String, String> kv : deviceNameTagValues.entrySet()) {
deviceNameTmp = deviceNameTmp.replace(escape(kv.getKey()), kv.getValue());
}
this.deviceName = deviceNameTmp;

return this.deviceName;
}

public void updateTag(NodeId tagId, DataValue dataValue) {
Expand Down Expand Up @@ -117,6 +120,10 @@ public List<TsKvEntry> getAffectedTimeseries(NodeId tagId, DataValue dataValue)
}
}

public NodeId getTagNodeId(String tag) {
return tagKeysMap.get(tag);
}

private List<KvEntry> getKvEntries(List<? extends KVMapping> mappings) {
List<KvEntry> result = new ArrayList<>();
for (KVMapping mapping : mappings) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/**
* Copyright © 2017 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.gateway.extensions.opc;

public interface OpcUaDeviceAware {
OpcUaDevice getDevice(String deviceName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@
import org.eclipse.milo.opcua.stack.core.AttributeId;
import org.eclipse.milo.opcua.stack.core.Identifiers;
import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
import org.eclipse.milo.opcua.stack.core.types.builtin.QualifiedName;
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
import org.eclipse.milo.opcua.stack.core.types.builtin.*;
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.*;
import org.eclipse.milo.opcua.stack.core.types.enumerated.*;
import org.eclipse.milo.opcua.stack.core.types.structured.*;
import org.thingsboard.gateway.extensions.opc.conf.OpcUaServerConfiguration;
import org.thingsboard.gateway.extensions.opc.conf.mapping.DeviceMapping;
import org.thingsboard.gateway.extensions.opc.rpc.RpcProcessor;
import org.thingsboard.gateway.extensions.opc.scan.OpcUaNode;
import org.thingsboard.gateway.extensions.opc.util.OpcUaUtils;
import org.thingsboard.gateway.service.data.RpcCommandSubscription;
import org.thingsboard.gateway.service.gateway.GatewayService;
import org.thingsboard.gateway.util.CertificateInfo;
import org.thingsboard.gateway.util.ConfigurationTools;
Expand All @@ -60,7 +60,7 @@
* Created by ashvayka on 16.01.17.
*/
@Slf4j
public class OpcUaServerMonitor {
public class OpcUaServerMonitor implements OpcUaDeviceAware {

private final GatewayService gateway;
private final OpcUaServerConfiguration configuration;
Expand All @@ -69,17 +69,21 @@ public class OpcUaServerMonitor {
private UaSubscription subscription;
private Map<NodeId, OpcUaDevice> devices;
private Map<NodeId, List<OpcUaDevice>> devicesByTags;
private Map<String, OpcUaDevice> devicesByName;
private Map<Pattern, DeviceMapping> mappings;
private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();

private final AtomicLong clientHandles = new AtomicLong(1L);

private RpcProcessor rpcProcessor;

public OpcUaServerMonitor(GatewayService gateway, OpcUaServerConfiguration configuration) {
this.gateway = gateway;
this.configuration = configuration;
this.devices = new HashMap<>();
this.devicesByTags = new HashMap<>();
this.mappings = configuration.getMapping().stream().collect(Collectors.toMap(m -> Pattern.compile(m.getDeviceNodePattern()), Function.identity()));
this.devicesByName = new HashMap<>();
}

public void connect(Boolean isRemote) {
Expand All @@ -101,6 +105,7 @@ public void connect(Boolean isRemote) {
client.connect().get();

subscription = client.getSubscriptionManager().createSubscription(1000.0).get();
rpcProcessor = new RpcProcessor(gateway, client, this);

scanForDevices();
} catch (Exception e) {
Expand Down Expand Up @@ -161,6 +166,11 @@ public void scanForDevices() {
}, configuration.getScanPeriodInSeconds(), TimeUnit.SECONDS);
}

@Override
public OpcUaDevice getDevice(String deviceName) {
return devicesByName.get(deviceName);
}

private void scanForDevices(OpcUaNode node) {
log.trace("Scanning node: {}", node);
List<DeviceMapping> matchedMappings = mappings.entrySet().stream()
Expand All @@ -176,7 +186,7 @@ private void scanForDevices(OpcUaNode node) {
});

try {
BrowseResult browseResult = client.browse(getBrowseDescription(node.getNodeId())).get();
BrowseResult browseResult = client.browse(OpcUaUtils.getBrowseDescription(node.getNodeId())).get();
List<ReferenceDescription> references = toList(browseResult.getReferences());

for (ReferenceDescription rd : references) {
Expand All @@ -201,14 +211,14 @@ private void scanDevice(OpcUaNode node, DeviceMapping m) throws Exception {
log.debug("Scanning device node: {}", node);
Set<String> tags = m.getAllTags();
log.debug("Scanning node hierarchy for tags: {}", tags);
Map<String, NodeId> tagMap = lookupTags(node.getNodeId(), node.getName(), tags);
Map<String, NodeId> tagMap = OpcUaUtils.lookupTags(client, node.getNodeId(), node.getName(), tags);
log.debug("Scanned {} tags out of {}", tagMap.size(), tags.size());

OpcUaDevice device;
if (devices.containsKey(node.getNodeId())) {
device = devices.get(node.getNodeId());
} else {
device = new OpcUaDevice(node.getNodeId(), m);
device = new OpcUaDevice(node, m);
devices.put(node.getNodeId(), device);

Map<String, NodeId> deviceNameTags = new HashMap<>();
Expand All @@ -221,8 +231,12 @@ private void scanDevice(OpcUaNode node, DeviceMapping m) throws Exception {
deviceNameTags.put(tag, tagNode);
}
}
device.calculateDeviceName(readTags(deviceNameTags));
gateway.onDeviceConnect(device.getDeviceName(), null);

String deviceName = device.calculateDeviceName(readTags(deviceNameTags));
devicesByName.put(deviceName, device);

gateway.onDeviceConnect(deviceName, null);
gateway.subscribe(new RpcCommandSubscription(deviceName, rpcProcessor));
}

device.updateScanTs();
Expand Down Expand Up @@ -313,50 +327,4 @@ private Map<String, String> readTags(Map<String, NodeId> tags) throws ExecutionE
}
return result;
}

private Map<String, NodeId> lookupTags(NodeId nodeId, String deviceNodeName, Set<String> tags) {
Map<String, NodeId> values = new HashMap<>();
try {
BrowseResult browseResult = client.browse(getBrowseDescription(nodeId)).get();
List<ReferenceDescription> references = toList(browseResult.getReferences());

for (ReferenceDescription rd : references) {
NodeId childId;
if (rd.getNodeId().isLocal()) {
childId = rd.getNodeId().local().get();
} else {
log.trace("Ignoring remote node: {}", rd.getNodeId());
continue;
}

String browseName = rd.getBrowseName().getName();
String name;
String childIdStr = childId.getIdentifier().toString();
if (childIdStr.contains(deviceNodeName)) {
name = childIdStr.substring(childIdStr.indexOf(deviceNodeName) + deviceNodeName.length() + 1, childIdStr.length());
} else {
name = rd.getBrowseName().getName();
}
if (tags.contains(name)) {
values.put(name, childId);
}
// recursively browse to children
values.putAll(lookupTags(childId, deviceNodeName, tags));
}
} catch (InterruptedException | ExecutionException e) {
log.error("Browsing nodeId={} failed: {}", nodeId, e.getMessage(), e);
}
return values;
}

private BrowseDescription getBrowseDescription(NodeId nodeId) {
return new BrowseDescription(
nodeId,
BrowseDirection.Forward,
Identifiers.References,
true,
uint(NodeClass.Object.getValue() | NodeClass.Variable.getValue()),
uint(BrowseResultMask.All.getValue())
);
}
}
Loading

0 comments on commit 50f22c9

Please sign in to comment.