Skip to content

Commit

Permalink
Merge pull request #1404 from fledge-iot/2.5.0RC
Browse files Browse the repository at this point in the history
2.5.0RC
  • Loading branch information
Mohit04tomar committed Jul 1, 2024
2 parents ad48db9 + f032661 commit a81799b
Show file tree
Hide file tree
Showing 146 changed files with 5,562 additions and 1,076 deletions.
26 changes: 26 additions & 0 deletions C/common/config_category.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,8 @@ string ConfigCategory::getItemAttribute(const string& itemName,
return m_items[i]->m_listSize;
case ITEM_TYPE_ATTR:
return m_items[i]->m_listItemType;
case LIST_NAME_ATTR:
return m_items[i]->m_listName;
default:
throw new ConfigItemAttributeNotFound();
}
Expand Down Expand Up @@ -664,6 +666,9 @@ bool ConfigCategory::setItemAttribute(const string& itemName,
case ITEM_TYPE_ATTR:
m_items[i]->m_listItemType = value;
return true;
case LIST_NAME_ATTR:
m_items[i]->m_listName = value;
return true;
default:
return false;
}
Expand Down Expand Up @@ -1321,6 +1326,17 @@ ConfigCategory::CategoryItem::CategoryItem(const string& name,
throw new runtime_error("ListSize configuration item property is not a string");
}
}
if (item.HasMember("listName"))
{
if (item["listName"].IsString())
{
m_listName = item["listName"].GetString();
}
else
{
throw new runtime_error("ListName configuration item property is not a string");
}
}

std::string m_typeUpperCase = m_type;
for (auto & c: m_typeUpperCase) c = toupper(c);
Expand Down Expand Up @@ -1607,6 +1623,7 @@ ConfigCategory::CategoryItem::CategoryItem(const CategoryItem& rhs)
m_bucketProperties = rhs.m_bucketProperties;
m_listSize = rhs.m_listSize;
m_listItemType = rhs.m_listItemType;
m_listName = rhs.m_listName;
}

/**
Expand Down Expand Up @@ -1727,6 +1744,10 @@ ostringstream convert;
{
convert << ", \"items\" : \"" << m_listItemType << "\"";
}
if (!m_listName.empty())
{
convert << ", \"listName\" : \"" << m_listName << "\"";
}
}
convert << " }";

Expand Down Expand Up @@ -1822,6 +1843,11 @@ ostringstream convert;
{
convert << ", \"items\" : \"" << m_listItemType << "\"";
}
if (!m_listName.empty())
{
convert << ", \"listName\" : \"" << m_listName << "\"";
}


if (m_itemType == StringItem ||
m_itemType == EnumerationItem ||
Expand Down
4 changes: 3 additions & 1 deletion C/common/include/config_category.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ class ConfigCategory {
RULE_ATTR,
BUCKET_PROPERTIES_ATTR,
LIST_SIZE_ATTR,
ITEM_TYPE_ATTR
ITEM_TYPE_ATTR,
LIST_NAME_ATTR
};
std::string getItemAttribute(const std::string& itemName,
ItemAttribute itemAttribute) const;
Expand Down Expand Up @@ -192,6 +193,7 @@ class ConfigCategory {
std::string m_bucketProperties;
std::string m_listSize;
std::string m_listItemType;
std::string m_listName;
};
std::vector<CategoryItem *> m_items;
std::string m_name;
Expand Down
43 changes: 43 additions & 0 deletions C/common/include/readingset_circularbuffer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#ifndef _READINGSETCIRCULARBUFFER_H
#define _READINGSETCIRCULARBUFFER_H
/*
* Fledge ReadingSet Circular Buffer.
*
* Copyright (c) 2024 Dianomic Systems
*
* Released under the Apache 2.0 Licence
*
* Author: Devki Nandan Ghildiyal
*/
#include <reading_set.h>
#include <mutex>
#include <vector>
#include <memory>

/**
* Reading set circular buffer class
*
* Reading set circular buffer is a data structure to hold ReadingSet
* passed to a plugin.
*/
class ReadingSetCircularBuffer {
public:
ReadingSetCircularBuffer(unsigned long maxBufferSize=10);
~ReadingSetCircularBuffer();

void insert(ReadingSet*);
void insert(ReadingSet&);
std::vector<std::shared_ptr<ReadingSet>> extract(bool isExtractSingleElement=true);

private:
std::mutex m_mutex;
unsigned long m_maxBufferSize;
unsigned long m_nextReadIndex;
void appendReadingSet(const std::vector<Reading *>& readings);
ReadingSetCircularBuffer (const ReadingSetCircularBuffer&) = delete;
ReadingSetCircularBuffer& operator=(const ReadingSetCircularBuffer&) = delete;
std::vector<std::shared_ptr<ReadingSet>> m_circularBuffer;
};

#endif

1 change: 1 addition & 0 deletions C/common/include/storage_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ class StorageClient {
std::vector<std::string> keyValues, const std::string& operation, const std::string& callbackUrl);
void registerManagement(ManagementClient *mgmnt) { m_management = mgmnt; };
bool createSchema(const std::string&);
bool deleteHttpClient();

private:
void handleUnexpectedResponse(const char *operation,
Expand Down
16 changes: 9 additions & 7 deletions C/common/json_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,25 +96,27 @@ string escaped = subject;
std::string JSONunescape(const std::string& input)
{
std::string output;
output.reserve(input.size());
size_t inputSize = input.size();
output.reserve(inputSize);

for (size_t i = 0; i < input.size(); ++i)
for (size_t i = 0; i < inputSize; ++i)
{
// skip leading or trailing "
if ((i == 0 || i == input.size() -1) && input[i] == '"')
if ((i == 0 || i == inputSize -1) && input[i] == '"')
{
continue;
}

// \\" -> \"
if (input[i] == '\\' && i + 2 < input.size() && input[i + 1] == '\\' && input[i + 2] == '"')
// \\\" -> \"
if (input[i] == '\\' && i + 3 < inputSize && input[i + 1] == '\\' && input[i + 2] == '\\' && input[i + 3] == '"')
{
output.push_back('\\');
output.push_back('"');
i += 2;
i += 3;
}
// \\" -> \"
// \" -> "
else if (input[i] == '\\' && i + 1 < input.size() && input[i + 1] == '"')
else if (input[i] == '\\' && i + 1 < inputSize && input[i + 1] == '"')
{
output.push_back('"');
++i;
Expand Down
133 changes: 133 additions & 0 deletions C/common/readingset_circularbuffer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Fledge ReadingSet Circular Buffer.
*
* Copyright (c) 2024 Dianomic Systems
*
* Released under the Apache 2.0 Licence
*
* Author: Devki Nandan Ghildiyal
*/
#include <readingset_circularbuffer.h>
#include <logger.h>


using namespace std;
using namespace rapidjson;

/**
* Construct an empty reading set circular buffer
*
* @param maxBufferSize Maximum size of the ReadingSet circular buffer. It should be atleast one.
*/
ReadingSetCircularBuffer::ReadingSetCircularBuffer(unsigned long maxBufferSize)
{
if ( maxBufferSize <= 0)
{
maxBufferSize = 1;
Logger::getLogger()->warn("Minimum size of ReadingSetCircularBuffer cannot be less than one, setting buffer size to 1");
}
m_maxBufferSize = maxBufferSize;
m_nextReadIndex = 0;
}

/**
* Destructor for a result set
*/
ReadingSetCircularBuffer::~ReadingSetCircularBuffer()
{
lock_guard<mutex> guard(m_mutex);
/* Delete the readings */
m_circularBuffer.clear();
}

/**
* Insert a ReadingSet into circular buffer
*
* @param readings Reference for ReadingSet to be inserted into circular buffer
*/
void ReadingSetCircularBuffer::insert(ReadingSet& readings)
{
appendReadingSet(readings.getAllReadings());
}

/**
* Insert a ReadingSet into circular buffer
*
* @param readings Pointer for ReadingSet to be inserted into circular buffer
*/
void ReadingSetCircularBuffer::insert(ReadingSet* readings)
{
appendReadingSet(readings->getAllReadings());
}

/**
* Internal implementation for inserting ReadingSet into the circular buffer
*
* @param readings appends ReadingSet into the circular buffer
*/
void ReadingSetCircularBuffer::appendReadingSet(const std::vector<Reading *>& readings)
{
lock_guard<mutex> guard(m_mutex);
bool isBufferFull = (m_circularBuffer.size() == m_maxBufferSize);

//Check if there is space available to insert a new ReadingSet
if (isBufferFull)
{
Logger::getLogger()->info("ReadingSetCircularBuffer buffer is full, removing first element");
// Make space for new ReadingSet and adjust m_nextReadIndex
m_circularBuffer.erase(m_circularBuffer.begin() + 0);
m_nextReadIndex--;
}

std::vector<Reading *> *newReadings = new std::vector<Reading *>;

// Iterate over all the readings in ReadingSet
for (auto const &reading : readings)
{
newReadings->emplace_back(new Reading(*reading));
}
// Insert ReadingSet into buffer
m_circularBuffer.push_back(std::make_shared<ReadingSet>(newReadings));

}

/**
* Fetch the vector of ReadingSet from circular buffer
*
* @param isExtractSingleElement True to extract single ReadingSet otherwise for extract entire buffer
* @return Return a vector of shared pointer to ReadingSet
*/
std::vector<std::shared_ptr<ReadingSet>> ReadingSetCircularBuffer::extract(bool isExtractSingleElement)
{

lock_guard<mutex> guard(m_mutex);
bool isNoDataToRead = m_circularBuffer.empty() || (m_nextReadIndex == m_circularBuffer.size());
std::vector<std::shared_ptr<ReadingSet>> bufferedItem;
// Check for empty buffer
if (isNoDataToRead)
{
Logger::getLogger()->info("There is no more data to read in ReadingSet circualr buffer");
return bufferedItem;
}

// Return single item from buffer
if (isExtractSingleElement)
{
bufferedItem.emplace_back(m_circularBuffer[m_nextReadIndex]);
m_nextReadIndex++;
return bufferedItem;
}

// Return Entire buffer
if(m_nextReadIndex == 0)
{
m_nextReadIndex = m_circularBuffer.size();
return m_circularBuffer;
}
// Send remaining items in the buffer
for (int i = m_nextReadIndex; i < m_circularBuffer.size(); i ++)
bufferedItem.emplace_back(m_circularBuffer[i]);

m_nextReadIndex = m_circularBuffer.size();
return bufferedItem;
}
2 changes: 1 addition & 1 deletion C/common/result_set.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ ResultSet::ResultSet(const std::string& json)
{
type = NUMBER_COLUMN;
}
else if (itr->value.IsNumber())
else if (itr->value.IsNumber())
{
type = INT_COLUMN;
}
Expand Down
26 changes: 25 additions & 1 deletion C/common/storage_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,30 @@ StorageClient::~StorageClient()
}
}


/**
* Delete HttpClient object for current thread
*/
bool StorageClient::deleteHttpClient()
{
std::thread::id thread_id = std::this_thread::get_id();

lock_guard<mutex> guard(sto_mtx_client_map);

if(m_client_map.find(thread_id) == m_client_map.end())
return false;

ostringstream ss;
ss << thread_id;
Logger::getLogger()->debug("Storage client deleting HttpClient object @ %p for thread %s", m_client_map[thread_id], ss.str().c_str());

delete m_client_map[thread_id];
m_client_map.erase(thread_id);

return true;
}


/**
* Creates a HttpClient object for each thread
* it stores/retrieves the reference to the HttpClient and the associated thread id in a map
Expand Down Expand Up @@ -316,7 +340,7 @@ ReadingSet *StorageClient::readingFetch(const unsigned long readingId, const uns
try {

char url[256];
snprintf(url, sizeof(url), "/storage/reading?id=%ld&count=%ld",
snprintf(url, sizeof(url), "/storage/reading?id=%lu&count=%lu",
readingId, count);

auto res = this->getHttpClient()->request("GET", url);
Expand Down
Loading

0 comments on commit a81799b

Please sign in to comment.