From c4518b5ea4bc3f98bb1b65625c34060ce00f6322 Mon Sep 17 00:00:00 2001 From: Matthias Dornaus Date: Tue, 25 Apr 2023 13:34:36 +0200 Subject: [PATCH] Fixed Threading issue by implementing Eventqueue. --- DashboardClient/DashboardClient.cpp | 105 +++++++++++++++++++++------- DashboardClient/DashboardClient.hpp | 14 ++++ OpcUaClient/OpcUaInterface.hpp | 1 + 3 files changed, 94 insertions(+), 26 deletions(-) diff --git a/DashboardClient/DashboardClient.cpp b/DashboardClient/DashboardClient.cpp index be00f087..05a97ff9 100644 --- a/DashboardClient/DashboardClient.cpp +++ b/DashboardClient/DashboardClient.cpp @@ -26,7 +26,80 @@ namespace Umati std::shared_ptr pTypeReader) : m_pDashboardDataClient(pDashboardDataClient), m_pPublisher(pPublisher), m_pTypeReader(pTypeReader) { + this->startEventThread(); } + + DashboardClient::~DashboardClient() { + this->stopEventThread(); + } + + void DashboardClient::startEventThread() { + if (m_eventThreadRunning) + { + LOG(INFO) << "EventThread Running"; + return; + } + + auto func = [this]() { + int cnt = 0; + while (this->m_eventThreadRunning) { + if(!m_eventqueue.empty()) { + IDashboardDataClient::StructureChangeEvent sce = m_eventqueue.front(); + m_eventqueue.pop(); + this->m_dynamicNodes.push_back(sce.refreshNode); + if(sce.nodeAdded || sce.referenceAdded) { + this->updateAddDataSet(sce.refreshNode); + this->Publish(); + } + if(sce.nodeDeleted || sce.referenceDeleted) { + this -> updateDeleteDataSet(sce.refreshNode); + if(sce.nodeDeleted == true) { + auto search = browsedSimpleNodes.find(sce.refreshNode); + if(search != browsedSimpleNodes.end()) { + std::shared_ptr simpleNode = search->second; + this->deleteAndUnsubscribeNode(*simpleNode); + } + } + this -> Publish(); + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + }; + m_eventThreadRunning = true; + m_eventThread = std::thread(func); + } + void DashboardClient::stopEventThread() { + m_eventThreadRunning = false; + if (m_eventThread.joinable()) + { + m_eventThread.join(); + } + } + void DashboardClient::reloadDataSet(ModelOpcUa::NodeId_t nodeId) { + auto search = browsedSimpleNodes.find(nodeId); + if(search == browsedSimpleNodes.end()){ + return; + } + std::shared_ptr simpleNode = search->second; + auto childNodes = simpleNode->ChildNodes; + if(childNodes.empty()) { + return; + } + auto child = childNodes.front(); + std::shared_ptr placeholderNode = std::dynamic_pointer_cast(child); + if(placeholderNode == nullptr) { + return; + } + std::shared_ptr placeholderNodeUnconst = std::const_pointer_cast(placeholderNode); + for(auto instance : placeholderNode->getInstances()) { + placeholderNodeUnconst->removeInstance(instance); + deleteAndUnsubscribeNode(instance); + } + auto browseResults = m_pDashboardDataClient->Browse(nodeId, child->ReferenceType, child->SpecifiedTypeNodeId); + this->updateAddDataSet(nodeId); + } + void DashboardClient::updateDeleteDataSet(ModelOpcUa::NodeId_t refreshNodeId) { auto search = browsedSimpleNodes.find(refreshNodeId); if( search != browsedSimpleNodes.end()) { @@ -43,9 +116,11 @@ namespace Umati for(auto& el : instances) { bool found = false; ModelOpcUa::NodeId_t nodeId = el.pNode->NodeId; - if(std::any_of(browseResults.begin(), browseResults.end(),[nodeId](ModelOpcUa::BrowseResult_t br){ return br.NodeId == nodeId;})) { - found = true; - break; + for(ModelOpcUa::BrowseResult_t browseResult : browseResults) { + if(browseResult.NodeId == nodeId) { + found = true; + break; + } } if(!found) { missingElements.push_back(el); @@ -59,8 +134,6 @@ namespace Umati } } } - //Hotfix clear subscription cache. - m_subscribedValues.clear(); } void DashboardClient::deleteAndUnsubscribeNode(ModelOpcUa::PlaceholderElement placeHolderElement) { std::shared_ptr element = placeHolderElement.pNode; @@ -107,27 +180,7 @@ namespace Umati } void DashboardClient::subscribeEvents() { auto ecbf = [this](IDashboardDataClient::StructureChangeEvent sce) { - if(sce.nodeAdded || sce.referenceAdded) { - std::thread t([this, sce](){ - this->updateAddDataSet(sce.refreshNode); - this->Publish(); - }); - t.detach(); - } - if(sce.nodeDeleted || sce.referenceDeleted) { - std::thread t([this, sce](){ - this -> updateDeleteDataSet(sce.refreshNode); - if(sce.nodeDeleted == true) { - auto search = browsedSimpleNodes.find(sce.refreshNode); - if(search != browsedSimpleNodes.end()) { - std::shared_ptr simpleNode = search->second; - this->deleteAndUnsubscribeNode(*simpleNode); - } - } - this -> Publish(); - }); - t.detach(); - } + this->m_eventqueue.push(sce); }; m_pDashboardDataClient->SubscribeEvent(ecbf); } diff --git a/DashboardClient/DashboardClient.hpp b/DashboardClient/DashboardClient.hpp index e524264d..211d8145 100644 --- a/DashboardClient/DashboardClient.hpp +++ b/DashboardClient/DashboardClient.hpp @@ -16,6 +16,9 @@ #include #include #include +#include +#include + namespace Umati { namespace Dashboard { @@ -42,6 +45,8 @@ namespace Umati { std::shared_ptr pPublisher, std::shared_ptr pTypeReader); + ~DashboardClient(); + void addDataSet( const ModelOpcUa::NodeId_t &startNodeId, const std::shared_ptr &pTypeDefinition, @@ -57,6 +62,7 @@ namespace Umati { void updateAddDataSet(ModelOpcUa::NodeId_t nodeId); void updateDeleteDataSet(ModelOpcUa::NodeId_t nodeId); void deleteAndUnsubscribeNode(const ModelOpcUa::SimpleNode nodeId); + void reloadDataSet(ModelOpcUa::NodeId_t nodeId); protected: @@ -103,6 +109,14 @@ namespace Umati { std::list> m_dataSets; std::map m_latestMessages; + bool m_eventThreadRunning; + std::thread m_eventThread; + std::queue m_eventqueue; + std::list m_dynamicNodes; + + void startEventThread(); + void stopEventThread(); + bool isMandatoryOrOptionalVariable(const std::shared_ptr &pNode); void handleSubscribeChildNodes(const std::shared_ptr &pNode, diff --git a/OpcUaClient/OpcUaInterface.hpp b/OpcUaClient/OpcUaInterface.hpp index e100a328..60222cdc 100644 --- a/OpcUaClient/OpcUaInterface.hpp +++ b/OpcUaClient/OpcUaInterface.hpp @@ -267,6 +267,7 @@ namespace Umati { stc.referenceAdded = (verb & UA_MODELCHANGESTRUCTUREVERBMASK_REFERENCEADDED) == UA_MODELCHANGESTRUCTUREVERBMASK_REFERENCEADDED; stc.referenceDeleted = (verb & UA_MODELCHANGESTRUCTUREVERBMASK_REFERENCEDELETED) == UA_MODELCHANGESTRUCTUREVERBMASK_REFERENCEDELETED; stc.dataTypeChanged = (verb & UA_MODELCHANGESTRUCTUREVERBMASK_DATATYPECHANGED) == UA_MODELCHANGESTRUCTUREVERBMASK_DATATYPECHANGED; + stc.refreshNode = nodeId; pWrapper->eventcallback(stc); } } else {