Skip to content

Commit

Permalink
telemetry - introduce telemetry aggFile
Browse files Browse the repository at this point in the history
  • Loading branch information
SiskaPavel committed Apr 15, 2024
1 parent a806de4 commit 791a70c
Show file tree
Hide file tree
Showing 8 changed files with 289 additions and 4 deletions.
69 changes: 69 additions & 0 deletions include/telemetry/aggFile.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/**
* @file
* @author Pavel Siska <siska@cesnet.cz>
* @brief Aggregated telemetry file
*
* SPDX-License-Identifier: BSD-3-Clause
*/

#pragma once

#include "aggMethod.hpp"
#include "content.hpp"
#include "file.hpp"
#include "node.hpp"

#include <memory>
#include <string>
#include <string_view>
#include <vector>

namespace telemetry {

/**
* @brief Class representing an aggregated file
*
* AggregatedFile is a subclass of File and is responsible for aggregating telemetry data from
* multiple files. It reads data from matched files based on a regex pattern and applies aggregation
* operations defined by aggregation methods.
*/
class AggregatedFile : public File {
public:
~AggregatedFile() override = default;

// Object cannot be copied or moved as it would break references from directories.
AggregatedFile(const AggregatedFile& other) = delete;
AggregatedFile& operator=(const AggregatedFile& other) = delete;
AggregatedFile(AggregatedFile&& other) = delete;
AggregatedFile& operator=(AggregatedFile&& other) = delete;

/**
* @brief Execute read operation over all matched files and aggregate them.
*
* This function reads data from all matched files based on the regex pattern and aggregates it
* using aggregation methods specified during object creation.
*
* @return Aggregated content.
* @throw NodeException if an error occurs during aggregation.
*/
Content read();

private:
// Allow directory to call AggregatedFile constructor
friend class Directory;
// Can be created only from a directory. Must be always created as a shared_ptr.
AggregatedFile(
const std::shared_ptr<Node>& parent,
std::string_view name,
std::string aggFilesPattern,
const std::vector<AggOperation>& ops);

FileOps getOps();

const std::string m_filesRegexPattern;

std::vector<std::string> m_paths;
std::vector<std::unique_ptr<AggMethod>> m_aggMethods;
};

} // namespace telemetry
30 changes: 30 additions & 0 deletions include/telemetry/directory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#pragma once

#include "aggFile.hpp"
#include "file.hpp"
#include "node.hpp"

Expand Down Expand Up @@ -64,6 +65,35 @@ class Directory : public Node {
*/
[[nodiscard]] std::shared_ptr<File> addFile(std::string_view name, FileOps ops);

/**
* @brief Add an aggregated file to the directory
*
* This function adds a new aggregated file to the directory, which aggregates data from
* multiple files matching a specified pattern using the provided aggregation operations.
*
* An aggregated file combines data from multiple source files into a single cohesive dataset
* according to the specified aggregation operations. These operations define how data from
* individual files should be combined, such as computing averages, sums, or joining values.
*
* The aggregation file pattern specifies a regular expression pattern used to match files
* within the directory. Only files whose names match this pattern will be included in the
* aggregation process.
*
* Syntax of regex is default std::regex()
*
* If an entry with the same name already exists in the directory, an exception is thrown.
*
* @param name Name of the aggregated file
* @param aggFilesPattern Regular expression pattern used to match files for aggregation
* @param aggOps Vector of aggregation operations to be applied to the data
* @return Shared pointer to the newly created aggregated file
* @throw NodeException If an entry with the same name already exists in the directory
*/
[[nodiscard]] std::shared_ptr<AggregatedFile> addAggFile(
std::string_view name,
const std::string& aggFilesPattern,
const std::vector<AggOperation>& aggOps);

/**
* @brief List all available entries of the directory.
* @return All available entries.
Expand Down
5 changes: 4 additions & 1 deletion include/telemetry/file.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,10 @@ class File : public Node {

// Allow directory to call File constructor
friend class Directory;
// Can be created only from a directory. Must be always created as a shared_ptr.

protected:
// Can be created only from a directory or a derived class.
// Must be always created as a shared_ptr.
File(const std::shared_ptr<Node>& parent, std::string_view name, FileOps ops);
};

Expand Down
4 changes: 3 additions & 1 deletion include/telemetry/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,12 @@ class Node : public std::enable_shared_from_this<Node> {
/** @brief Get full path from the root to this node (including this node name). */
std::string getFullPath();

protected:
std::shared_ptr<Node> m_parent;

private:
std::mutex m_mutex;
std::string m_name;
std::shared_ptr<Node> m_parent;

void checkName(std::string_view name);
[[noreturn]] void throwNodeException(std::string_view err);
Expand Down
1 change: 1 addition & 0 deletions src/telemetry/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ list(APPEND TELEMETRY_SOURCE_FILES
directory.cpp
holder.cpp
utility.cpp
aggFile.cpp
aggregator/aggMethod.cpp
aggregator/aggSum.cpp
aggregator/aggAvg.cpp
Expand Down
161 changes: 161 additions & 0 deletions src/telemetry/aggFile.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/**
* @file
* @author Pavel Siska <siska@cesnet.cz>
* @brief Telemetry file aggregator
*
* SPDX-License-Identifier: BSD-3-Clause
*/

#include "aggregator/aggMethodFactory.hpp"

#include <telemetry/aggFile.hpp>
#include <telemetry/directory.hpp>
#include <telemetry/file.hpp>
#include <telemetry/utility.hpp>

#include <iomanip>
#include <regex>

namespace telemetry {

template <typename T>
static std::vector<std::shared_ptr<T>>
getMatchesInDirectory(const std::regex& regex, const std::shared_ptr<Directory>& directory)
{
std::vector<std::shared_ptr<T>> matches;

const auto& entries = directory->listEntries();
for (const auto& entry : entries) {
if (!std::regex_match(entry, regex)) {
continue;
}
auto node = directory->getEntry(entry);
if (const auto& derivedNode = std::dynamic_pointer_cast<T>(node)) {
matches.push_back(derivedNode);
}
}

return matches;
}

static std::vector<std::shared_ptr<File>>
getFilesMatchingPattern(const std::string& regexPath, std::shared_ptr<Directory> parentDir)
{
std::vector<std::shared_ptr<File>> matchingFiles;

auto pathSegments = utils::parsePath(regexPath);
if (pathSegments.empty()) {
return matchingFiles;
}

const std::string topLevelName = pathSegments.back();
pathSegments.pop_back();

std::vector<std::shared_ptr<Directory>> matchedDirs = {std::move(parentDir)};

for (const auto& subDir : pathSegments) {
std::vector<std::shared_ptr<Directory>> matchesInCurrentDir;
const std::regex dirRegex(subDir);
for (const auto& dir : matchedDirs) {
const auto matchedSubDirs = getMatchesInDirectory<Directory>(dirRegex, dir);
matchesInCurrentDir.insert(
matchesInCurrentDir.end(),
matchedSubDirs.begin(),
matchedSubDirs.end());
}
matchedDirs = matchesInCurrentDir;
}

const std::regex fileRegex(topLevelName);
for (const auto& dir : matchedDirs) {
const auto filesInDir = getMatchesInDirectory<File>(fileRegex, dir);
matchingFiles.insert(matchingFiles.end(), filesInDir.begin(), filesInDir.end());
}

return matchingFiles;
}

static void mergeContent(Content& content, const Content& newContent)
{
if (std::holds_alternative<Dict>(content) && std::holds_alternative<Dict>(newContent)) {
auto& dict = std::get<Dict>(content);
const auto& newDict = std::get<Dict>(newContent);
dict.insert(newDict.begin(), newDict.end());
return;
}
content = newContent;
}

static void validateAggOperations(const std::vector<AggOperation>& ops)
{
const bool hasDictFieldName
= std::any_of(ops.begin(), ops.end(), [](const AggOperation& aggOp) {
return !aggOp.dictFieldName.empty();
});

const bool hasNoDictFieldName
= std::any_of(ops.begin(), ops.end(), [](const AggOperation& aggOp) {
return aggOp.dictFieldName.empty();
});

if (hasDictFieldName && hasNoDictFieldName) {
throw NodeException(
"Inconsistent AggOperation configurations: Some operations have 'dictFieldName' "
"specified while others don't.");
}

if (hasDictFieldName && ops.size() > 1) {
throw NodeException(
"Invalid AggOperation configuration: When 'dictFieldName' is specified, only one "
"operation is allowed.");
}
}

Content AggregatedFile::read()
{
Content content;

const auto files = getFilesMatchingPattern(
m_filesRegexPattern,
std::dynamic_pointer_cast<Directory>(m_parent));

std::vector<Content> fileContents;
fileContents.reserve(files.size());
for (const auto& file : files) {
fileContents.emplace_back(file->read());
}

for (const auto& aggMethod : m_aggMethods) {
const Content methodResult = aggMethod->aggregate(fileContents);
mergeContent(content, methodResult);
}

return content;
}

FileOps AggregatedFile::getOps()
{
FileOps ops = {};
ops.read = [this]() { return read(); };
return ops;
}

AggregatedFile::AggregatedFile(
const std::shared_ptr<Node>& parent,
std::string_view name,
std::string aggFilesPattern,
const std::vector<AggOperation>& ops)
: File(parent, name, getOps())
, m_filesRegexPattern(std::move(aggFilesPattern))
{
validateAggOperations(ops);

for (const auto& aggOp : ops) {
m_aggMethods.push_back(AggMethodFactory::createAggMethod(
aggOp.method,
aggOp.dictFieldName,
aggOp.dictResultName));
}
}

} // namespace telemetry
19 changes: 19 additions & 0 deletions src/telemetry/directory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,25 @@ std::shared_ptr<File> Directory::addFile(std::string_view name, FileOps ops)
return newFile;
}

std::shared_ptr<AggregatedFile> Directory::addAggFile(
std::string_view name,
const std::string& aggFilesPattern,
const std::vector<AggOperation>& aggOps)
{
const std::lock_guard lock(getMutex());
const std::shared_ptr<Node> entry = getEntryLocked(name);

if (entry != nullptr) {
throwEntryAlreadyExists(name);
}

auto newFile = std::shared_ptr<AggregatedFile>(
new AggregatedFile(shared_from_this(), name, aggFilesPattern, aggOps));

addEntryLocked(newFile);
return newFile;
}

std::vector<std::string> Directory::listEntries()
{
std::vector<std::string> result;
Expand Down
4 changes: 2 additions & 2 deletions src/telemetry/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
namespace telemetry {

Node::Node(std::shared_ptr<Node> parent, std::string_view name)
: m_name(name)
, m_parent(std::move(parent))
: m_parent(std::move(parent))
, m_name(name)
{
if (m_parent == nullptr) {
throwNodeException("parent cannot be nullptr");
Expand Down

0 comments on commit 791a70c

Please sign in to comment.