Skip to content

Commit

Permalink
Fixed Threading issue by implementing Eventqueue.
Browse files Browse the repository at this point in the history
  • Loading branch information
mdornaus committed Apr 25, 2023
1 parent 3c08b1a commit 3f86fe2
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 26 deletions.
105 changes: 79 additions & 26 deletions DashboardClient/DashboardClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,80 @@ namespace Umati
std::shared_ptr<OpcUaTypeReader> pTypeReader)
: m_pDashboardDataClient(pDashboardDataClient), m_pPublisher(pPublisher), m_pTypeReader(pTypeReader)
{
this->startEventThread();
}

DashboardClient::~DashboardClient() {
this->stopEventThread();
}

void DashboardClient::startEventThread() {
if (m_eventThreadRunning)
{
LOG(INFO) << "EventThread Running";
return;
}

auto func = [this]() {
int cnt = 0;
while (this->m_eventThreadRunning) {
if(!m_eventqueue.empty()) {
IDashboardDataClient::StructureChangeEvent sce = m_eventqueue.front();
m_eventqueue.pop();
this->m_dynamicNodes.push_back(sce.refreshNode);
if(sce.nodeAdded || sce.referenceAdded) {
this->updateAddDataSet(sce.refreshNode);
this->Publish();
}
if(sce.nodeDeleted || sce.referenceDeleted) {
this -> updateDeleteDataSet(sce.refreshNode);
if(sce.nodeDeleted == true) {
auto search = browsedSimpleNodes.find(sce.refreshNode);
if(search != browsedSimpleNodes.end()) {
std::shared_ptr<const ModelOpcUa::SimpleNode> simpleNode = search->second;
this->deleteAndUnsubscribeNode(*simpleNode);
}
}
this -> Publish();
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
};
m_eventThreadRunning = true;
m_eventThread = std::thread(func);
}
void DashboardClient::stopEventThread() {
m_eventThreadRunning = false;
if (m_eventThread.joinable())
{
m_eventThread.join();
}
}
void DashboardClient::reloadDataSet(ModelOpcUa::NodeId_t nodeId) {
auto search = browsedSimpleNodes.find(nodeId);
if(search == browsedSimpleNodes.end()){
return;
}
std::shared_ptr<const ModelOpcUa::SimpleNode> simpleNode = search->second;
auto childNodes = simpleNode->ChildNodes;
if(childNodes.empty()) {
return;
}
auto child = childNodes.front();
std::shared_ptr<const ModelOpcUa::PlaceholderNode> placeholderNode = std::dynamic_pointer_cast<const ModelOpcUa::PlaceholderNode>(child);
if(placeholderNode == nullptr) {
return;
}
std::shared_ptr<ModelOpcUa::PlaceholderNode> placeholderNodeUnconst = std::const_pointer_cast<ModelOpcUa::PlaceholderNode>(placeholderNode);
for(auto instance : placeholderNode->getInstances()) {
placeholderNodeUnconst->removeInstance(instance);
deleteAndUnsubscribeNode(instance);
}
auto browseResults = m_pDashboardDataClient->Browse(nodeId, child->ReferenceType, child->SpecifiedTypeNodeId);
this->updateAddDataSet(nodeId);
}

void DashboardClient::updateDeleteDataSet(ModelOpcUa::NodeId_t refreshNodeId) {
auto search = browsedSimpleNodes.find(refreshNodeId);
if( search != browsedSimpleNodes.end()) {
Expand All @@ -43,9 +116,11 @@ namespace Umati
for(auto& el : instances) {
bool found = false;
ModelOpcUa::NodeId_t nodeId = el.pNode->NodeId;
if(std::any_of(browseResults.begin(), browseResults.end(),[nodeId](ModelOpcUa::BrowseResult_t br){ return br.NodeId == nodeId;})) {
found = true;
break;
for(ModelOpcUa::BrowseResult_t browseResult : browseResults) {
if(browseResult.NodeId == nodeId) {
found = true;
break;
}
}
if(!found) {
missingElements.push_back(el);
Expand All @@ -59,8 +134,6 @@ namespace Umati
}
}
}
//Hotfix clear subscription cache.
m_subscribedValues.clear();
}
void DashboardClient::deleteAndUnsubscribeNode(ModelOpcUa::PlaceholderElement placeHolderElement) {
std::shared_ptr<const ModelOpcUa::SimpleNode> element = placeHolderElement.pNode;
Expand Down Expand Up @@ -107,27 +180,7 @@ namespace Umati
}
void DashboardClient::subscribeEvents() {
auto ecbf = [this](IDashboardDataClient::StructureChangeEvent sce) {
if(sce.nodeAdded || sce.referenceAdded) {
std::thread t([this, sce](){
this->updateAddDataSet(sce.refreshNode);
this->Publish();
});
t.detach();
}
if(sce.nodeDeleted || sce.referenceDeleted) {
std::thread t([this, sce](){
this -> updateDeleteDataSet(sce.refreshNode);
if(sce.nodeDeleted == true) {
auto search = browsedSimpleNodes.find(sce.refreshNode);
if(search != browsedSimpleNodes.end()) {
std::shared_ptr<const ModelOpcUa::SimpleNode> simpleNode = search->second;
this->deleteAndUnsubscribeNode(*simpleNode);
}
}
this -> Publish();
});
t.detach();
}
this->m_eventqueue.push(sce);
};
m_pDashboardDataClient->SubscribeEvent(ecbf);
}
Expand Down
14 changes: 14 additions & 0 deletions DashboardClient/DashboardClient.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
#include <map>
#include <set>
#include <mutex>
#include <thread>
#include <queue>

namespace Umati {

namespace Dashboard {
Expand All @@ -42,6 +45,8 @@ namespace Umati {
std::shared_ptr<IPublisher> pPublisher,
std::shared_ptr<OpcUaTypeReader> pTypeReader);

~DashboardClient();

void addDataSet(
const ModelOpcUa::NodeId_t &startNodeId,
const std::shared_ptr<ModelOpcUa::StructureNode> &pTypeDefinition,
Expand All @@ -57,6 +62,7 @@ namespace Umati {
void updateAddDataSet(ModelOpcUa::NodeId_t nodeId);
void updateDeleteDataSet(ModelOpcUa::NodeId_t nodeId);
void deleteAndUnsubscribeNode(const ModelOpcUa::SimpleNode nodeId);
void reloadDataSet(ModelOpcUa::NodeId_t nodeId);


protected:
Expand Down Expand Up @@ -103,6 +109,14 @@ namespace Umati {
std::list<std::shared_ptr<DataSetStorage_t>> m_dataSets;
std::map<std::string, LastMessage_t> m_latestMessages;

bool m_eventThreadRunning;
std::thread m_eventThread;
std::queue<IDashboardDataClient::StructureChangeEvent> m_eventqueue;
std::list<ModelOpcUa::NodeId_t> m_dynamicNodes;

void startEventThread();
void stopEventThread();

bool isMandatoryOrOptionalVariable(const std::shared_ptr<const ModelOpcUa::SimpleNode> &pNode);

void handleSubscribeChildNodes(const std::shared_ptr<const ModelOpcUa::SimpleNode> &pNode,
Expand Down
1 change: 1 addition & 0 deletions OpcUaClient/OpcUaInterface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ namespace Umati {
stc.referenceAdded = (verb & UA_MODELCHANGESTRUCTUREVERBMASK_REFERENCEADDED) == UA_MODELCHANGESTRUCTUREVERBMASK_REFERENCEADDED;
stc.referenceDeleted = (verb & UA_MODELCHANGESTRUCTUREVERBMASK_REFERENCEDELETED) == UA_MODELCHANGESTRUCTUREVERBMASK_REFERENCEDELETED;
stc.dataTypeChanged = (verb & UA_MODELCHANGESTRUCTUREVERBMASK_DATATYPECHANGED) == UA_MODELCHANGESTRUCTUREVERBMASK_DATATYPECHANGED;
stc.refreshNode = nodeId;
pWrapper->eventcallback(stc);
}
} else {
Expand Down

0 comments on commit 3f86fe2

Please sign in to comment.