Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Host tx ready enhancements #2

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ jobs:
autoconf-archive \
uuid-dev \
libjansson-dev \
nlohmann-json3-dev \
python

- if: matrix.language == 'cpp'
Expand Down
31 changes: 0 additions & 31 deletions ThirdPartyLicenses.txt

This file was deleted.

2 changes: 1 addition & 1 deletion azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ stages:
steps:
- script: |
sudo apt-get update
sudo apt-get install -y make libtool m4 autoconf dh-exec debhelper cmake pkg-config \
sudo apt-get install -y make libtool m4 autoconf dh-exec debhelper cmake pkg-config nlohmann-json3-dev \
libhiredis-dev libnl-3-dev libnl-genl-3-dev libnl-route-3-dev libnl-nf-3-dev swig3.0 \
libpython2.7-dev libboost-dev libboost-serialization-dev uuid-dev libzmq5 libzmq3-dev
sudo apt-get install -y sudo
Expand Down
1 change: 1 addition & 0 deletions common/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ common_libswsscommon_la_SOURCES = \
common/profileprovider.cpp \
common/zmqclient.cpp \
common/zmqserver.cpp \
common/asyncdbupdater.cpp \
common/redis_table_waiter.cpp

common_libswsscommon_la_CXXFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(LIBNL_CFLAGS) $(CODE_COVERAGE_CXXFLAGS)
Expand Down
114 changes: 114 additions & 0 deletions common/asyncdbupdater.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
#include <string>
#include <deque>
#include <limits>
#include <hiredis/hiredis.h>
#include <pthread.h>
#include "asyncdbupdater.h"
#include "dbconnector.h"
#include "redisselect.h"
#include "redisapi.h"
#include "table.h"

using namespace std;

namespace swss {

AsyncDBUpdater::AsyncDBUpdater(DBConnector *db, const std::string &tableName)
: m_db(db)
, m_tableName(tableName)
{
m_runThread = true;
m_dbUpdateThread = std::make_shared<std::thread>(&AsyncDBUpdater::dbUpdateThread, this);

SWSS_LOG_DEBUG("AsyncDBUpdater ctor tableName: %s", tableName.c_str());
}

AsyncDBUpdater::~AsyncDBUpdater()
{
m_runThread = false;

// notify db update thread exit
m_dbUpdateDataNotifyCv.notify_all();
m_dbUpdateThread->join();
}

void AsyncDBUpdater::update(std::shared_ptr<KeyOpFieldsValuesTuple> pkco)
{
{
std::lock_guard<std::mutex> lock(m_dbUpdateDataQueueMutex);
m_dbUpdateDataQueue.push(pkco);
}

m_dbUpdateDataNotifyCv.notify_all();
}

void AsyncDBUpdater::dbUpdateThread()
{
SWSS_LOG_ENTER();
SWSS_LOG_NOTICE("dbUpdateThread begin");

// Different schedule policy has different min priority
pthread_attr_t attr;
int policy;
pthread_attr_getschedpolicy(&attr, &policy);
int min_priority = sched_get_priority_min(policy);
// Use min priority will block poll thread
pthread_setschedprio(pthread_self(), min_priority + 1);

// Follow same logic in ConsumerStateTable: every received data will write to 'table'.
DBConnector db(m_db->getDbName(), 0, true);
Table table(&db, m_tableName);
std::mutex cvMutex;
std::unique_lock<std::mutex> cvLock(cvMutex);

while (m_runThread)
{
size_t count;
count = queueSize();
if (count == 0)
{
// when queue is empty, wait notification, when data come, continue to check queue size again
m_dbUpdateDataNotifyCv.wait(cvLock);
continue;
}

for (size_t ie = 0; ie < count; ie++)
{
auto& kco = *(m_dbUpdateDataQueue.front());

if (kfvOp(kco) == SET_COMMAND)
{
auto& values = kfvFieldsValues(kco);

// Delete entry before Table::set(), because Table::set() does not remove the no longer existed fields from entry.
table.del(kfvKey(kco));
table.set(kfvKey(kco), values);
}
else if (kfvOp(kco) == DEL_COMMAND)
{
table.del(kfvKey(kco));
}
else
{
SWSS_LOG_ERROR("db: %s, table: %s receive unknown operation: %s", m_db->getDbName().c_str(), m_tableName.c_str(), kfvOp(kco).c_str());
}

{
std::lock_guard<std::mutex> lock(m_dbUpdateDataQueueMutex);
m_dbUpdateDataQueue.pop();
}
}
}

SWSS_LOG_DEBUG("AsyncDBUpdater dbUpdateThread end: %s", m_tableName.c_str());
}

size_t AsyncDBUpdater::queueSize()
{
// size() is not thread safe
std::lock_guard<std::mutex> lock(m_dbUpdateDataQueueMutex);

return m_dbUpdateDataQueue.size();
}

}
42 changes: 42 additions & 0 deletions common/asyncdbupdater.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#pragma once

#include <string>
#include <deque>
#include <condition_variable>
#include "dbconnector.h"
#include "table.h"

#define MQ_SIZE 100
#define MQ_MAX_RETRY 10
#define MQ_POLL_TIMEOUT (1000)

namespace swss {

class AsyncDBUpdater
{
public:
AsyncDBUpdater(DBConnector *db, const std::string &tableName);
~AsyncDBUpdater();

void update(std::shared_ptr<KeyOpFieldsValuesTuple> pkco);

size_t queueSize();
private:
void dbUpdateThread();

volatile bool m_runThread;

std::shared_ptr<std::thread> m_dbUpdateThread;

std::mutex m_dbUpdateDataQueueMutex;

std::condition_variable m_dbUpdateDataNotifyCv;

std::queue<std::shared_ptr<KeyOpFieldsValuesTuple>> m_dbUpdateDataQueue;

DBConnector *m_db;

std::string m_tableName;
};

}
84 changes: 72 additions & 12 deletions common/binaryserializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

#include "common/armhelper.h"

#include <string>

using namespace std;

namespace swss {
Expand All @@ -12,27 +14,35 @@ class BinarySerializer {
static size_t serializeBuffer(
const char* buffer,
const size_t size,
const std::string& key,
const std::vector<swss::FieldValueTuple>& values,
const std::string& command,
const std::string& dbName,
const std::string& tableName)
const std::string& tableName,
const std::vector<KeyOpFieldsValuesTuple>& kcos)
{
auto tmpSerializer = BinarySerializer(buffer, size);

// Set the first pair as DB name and table name.
tmpSerializer.setKeyAndValue(
dbName.c_str(), dbName.length(),
tableName.c_str(), tableName.length());
tmpSerializer.setKeyAndValue(
key.c_str(), key.length(),
command.c_str(), command.length());
for (auto& kvp : values)
for (auto& kco : kcos)
{
auto& field = fvField(kvp);
auto& value = fvValue(kvp);
auto& key = kfvKey(kco);
auto& fvs = kfvFieldsValues(kco);
std::string fvs_len = std::to_string(fvs.size());
// For each request, the first pair is the key and the number of attributes,
// followed by the attribute pairs.
// The operation is not set, when there is no attribute, it is a DEL request.
tmpSerializer.setKeyAndValue(
field.c_str(), field.length(),
value.c_str(), value.length());
key.c_str(), key.length(),
fvs_len.c_str(), fvs_len.length());
for (auto& fv : fvs)
{
auto& field = fvField(fv);
auto& value = fvValue(fv);
tmpSerializer.setKeyAndValue(
field.c_str(), field.length(),
value.c_str(), value.length());
}
}

return tmpSerializer.finalize();
Expand Down Expand Up @@ -88,6 +98,56 @@ class BinarySerializer {
}
}

static void deserializeBuffer(
const char* buffer,
const size_t size,
std::string& dbName,
std::string& tableName,
std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos)
{
std::vector<FieldValueTuple> values;
deserializeBuffer(buffer, size, values);
int fvs_size = -1;
KeyOpFieldsValuesTuple kco;
auto& key = kfvKey(kco);
auto& op = kfvOp(kco);
auto& fvs = kfvFieldsValues(kco);
for (auto& fv : values)
{
auto& field = fvField(fv);
auto& value = fvValue(fv);
// The first pair is the DB name and the table name.
if (fvs_size < 0)
{
dbName = field;
tableName = value;
fvs_size = 0;
continue;
}
// This is the beginning of a request.
// The first pair is the key and the number of attributes.
// If the attribute count is zero, it is a DEL request.
if (fvs_size == 0)
{
key = field;
fvs_size = std::stoi(value);
op = (fvs_size == 0) ? DEL_COMMAND : SET_COMMAND;
fvs.clear();
}
// This is an attribut pair.
else
{
fvs.push_back(fv);
--fvs_size;
}
// We got the last attribut pair. This is the end of a request.
if (fvs_size == 0)
{
kcos.push_back(std::make_shared<KeyOpFieldsValuesTuple>(kco));
}
}
}

private:
const char* m_buffer;
const size_t m_buffer_size;
Expand Down
2 changes: 1 addition & 1 deletion common/countertable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#include "common/redisreply.h"
#include "common/rediscommand.h"
#include "common/redisapi.h"
#include "common/json.hpp"
#include <nlohmann/json.hpp>
#include "common/schema.h"
#include "common/countertable.h"

Expand Down
2 changes: 1 addition & 1 deletion common/dbconnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#include <errno.h>
#include <system_error>
#include <fstream>
#include "json.hpp"
#include <nlohmann/json.hpp>
#include "logger.h"

#include "common/dbconnector.h"
Expand Down
6 changes: 3 additions & 3 deletions common/events.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ EventSubscriber::init(bool use_cache, int recv_timeout,
RET_ON_ERR(rc == 0, "Fails to set option rc=%d", rc);
}
else {
for (const auto e: *subs_sources) {
for (const auto &e: *subs_sources) {
rc = zmq_setsockopt(sock, ZMQ_SUBSCRIBE, e.c_str(), e.size());
RET_ON_ERR(rc == 0, "Fails to set option rc=%d", rc);
}
Expand Down Expand Up @@ -400,14 +400,14 @@ EventSubscriber::prune_track()
map<time_t, vector<runtime_id_t> > lst;

/* Sort entries by last touched time */
for(const auto e: m_track) {
for(const auto &e: m_track) {
lst[e.second.epoch_secs].push_back(e.first);
}

/* By default it walks from lowest value / earliest timestamp */
map<time_t, vector<runtime_id_t> >::const_iterator itc = lst.begin();
for(; (itc != lst.end()) && (m_track.size() > MAX_PUBLISHERS_COUNT); ++itc) {
for (const auto r: itc->second) {
for (const auto &r: itc->second) {
m_track.erase(r);
}
}
Expand Down
2 changes: 1 addition & 1 deletion common/events_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
#include <errno.h>
#include <cxxabi.h>
#include "string.h"
#include "json.hpp"
#include <nlohmann/json.hpp>
#include "zmq.h"
#include <unordered_map>
#include <boost/serialization/vector.hpp>
Expand Down
2 changes: 1 addition & 1 deletion common/events_pi.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
#include <fstream>
#include <uuid/uuid.h>
#include "string.h"
#include "json.hpp"
#include <nlohmann/json.hpp>
#include "zmq.h"
#include <unordered_map>

Expand Down
2 changes: 1 addition & 1 deletion common/json.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#include <limits>

#include "common/json.h"
#include "common/json.hpp"
#include <nlohmann/json.hpp>

using namespace std;

Expand Down
Loading