diff --git a/include/telemetry/aggFile.hpp b/include/telemetry/aggFile.hpp new file mode 100644 index 0000000..4f59f9f --- /dev/null +++ b/include/telemetry/aggFile.hpp @@ -0,0 +1,69 @@ +/** + * @file + * @author Pavel Siska + * @brief Aggregated telemetry file + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#pragma once + +#include "aggMethod.hpp" +#include "content.hpp" +#include "file.hpp" +#include "node.hpp" + +#include +#include +#include +#include + +namespace telemetry { + +/** + * @brief Class representing an aggregated file + * + * AggregatedFile is a subclass of File and is responsible for aggregating telemetry data from + * multiple files. It reads data from matched files based on a regex pattern and applies aggregation + * operations defined by aggregation methods. + */ +class AggregatedFile : public File { +public: + ~AggregatedFile() override = default; + + // Object cannot be copied or moved as it would break references from directories. + AggregatedFile(const AggregatedFile& other) = delete; + AggregatedFile& operator=(const AggregatedFile& other) = delete; + AggregatedFile(AggregatedFile&& other) = delete; + AggregatedFile& operator=(AggregatedFile&& other) = delete; + + /** + * @brief Execute read operation over all matched files and aggregate them. + * + * This function reads data from all matched files based on the regex pattern and aggregates it + * using aggregation methods specified during object creation. + * + * @return Aggregated content. + * @throw NodeException if an error occurs during aggregation. + */ + Content read(); + +private: + // Allow directory to call AggregatedFile constructor + friend class Directory; + // Can be created only from a directory. Must be always created as a shared_ptr. + AggregatedFile( + const std::shared_ptr& parent, + std::string_view name, + std::string aggFilesPattern, + const std::vector& ops); + + FileOps getOps(); + + const std::string m_filesRegexPattern; + + std::vector m_paths; + std::vector> m_aggMethods; +}; + +} // namespace telemetry diff --git a/include/telemetry/directory.hpp b/include/telemetry/directory.hpp index 7796bd7..75f819e 100644 --- a/include/telemetry/directory.hpp +++ b/include/telemetry/directory.hpp @@ -8,6 +8,7 @@ #pragma once +#include "aggFile.hpp" #include "file.hpp" #include "node.hpp" @@ -64,6 +65,35 @@ class Directory : public Node { */ [[nodiscard]] std::shared_ptr addFile(std::string_view name, FileOps ops); + /** + * @brief Add an aggregated file to the directory + * + * This function adds a new aggregated file to the directory, which aggregates data from + * multiple files matching a specified pattern using the provided aggregation operations. + * + * An aggregated file combines data from multiple source files into a single cohesive dataset + * according to the specified aggregation operations. These operations define how data from + * individual files should be combined, such as computing averages, sums, or joining values. + * + * The aggregation file pattern specifies a regular expression pattern used to match files + * within the directory. Only files whose names match this pattern will be included in the + * aggregation process. + * + * Syntax of regex is default std::regex() + * + * If an entry with the same name already exists in the directory, an exception is thrown. + * + * @param name Name of the aggregated file + * @param aggFilesPattern Regular expression pattern used to match files for aggregation + * @param aggOps Vector of aggregation operations to be applied to the data + * @return Shared pointer to the newly created aggregated file + * @throw NodeException If an entry with the same name already exists in the directory + */ + [[nodiscard]] std::shared_ptr addAggFile( + std::string_view name, + const std::string& aggFilesPattern, + const std::vector& aggOps); + /** * @brief List all available entries of the directory. * @return All available entries. diff --git a/include/telemetry/file.hpp b/include/telemetry/file.hpp index 79f304f..b587dce 100644 --- a/include/telemetry/file.hpp +++ b/include/telemetry/file.hpp @@ -85,7 +85,10 @@ class File : public Node { // Allow directory to call File constructor friend class Directory; - // Can be created only from a directory. Must be always created as a shared_ptr. + +protected: + // Can be created only from a directory or a derived class. + // Must be always created as a shared_ptr. File(const std::shared_ptr& parent, std::string_view name, FileOps ops); }; diff --git a/include/telemetry/node.hpp b/include/telemetry/node.hpp index af8d680..5589dfc 100644 --- a/include/telemetry/node.hpp +++ b/include/telemetry/node.hpp @@ -63,10 +63,12 @@ class Node : public std::enable_shared_from_this { /** @brief Get full path from the root to this node (including this node name). */ std::string getFullPath(); +protected: + std::shared_ptr m_parent; + private: std::mutex m_mutex; std::string m_name; - std::shared_ptr m_parent; void checkName(std::string_view name); [[noreturn]] void throwNodeException(std::string_view err); diff --git a/src/telemetry/CMakeLists.txt b/src/telemetry/CMakeLists.txt index 3bc0b36..af0288b 100644 --- a/src/telemetry/CMakeLists.txt +++ b/src/telemetry/CMakeLists.txt @@ -5,6 +5,7 @@ list(APPEND TELEMETRY_SOURCE_FILES directory.cpp holder.cpp utility.cpp + aggFile.cpp aggregator/aggMethod.cpp aggregator/aggSum.cpp aggregator/aggAvg.cpp diff --git a/src/telemetry/aggFile.cpp b/src/telemetry/aggFile.cpp new file mode 100644 index 0000000..22dcf0b --- /dev/null +++ b/src/telemetry/aggFile.cpp @@ -0,0 +1,161 @@ +/** + * @file + * @author Pavel Siska + * @brief Telemetry file aggregator + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include "aggregator/aggMethodFactory.hpp" + +#include +#include +#include +#include + +#include +#include + +namespace telemetry { + +template +static std::vector> +getMatchesInDirectory(const std::regex& regex, const std::shared_ptr& directory) +{ + std::vector> matches; + + const auto& entries = directory->listEntries(); + for (const auto& entry : entries) { + if (!std::regex_match(entry, regex)) { + continue; + } + auto node = directory->getEntry(entry); + if (const auto& derivedNode = std::dynamic_pointer_cast(node)) { + matches.push_back(derivedNode); + } + } + + return matches; +} + +static std::vector> +getFilesMatchingPattern(const std::string& regexPath, std::shared_ptr parentDir) +{ + std::vector> matchingFiles; + + auto pathSegments = utils::parsePath(regexPath); + if (pathSegments.empty()) { + return matchingFiles; + } + + const std::string topLevelName = pathSegments.back(); + pathSegments.pop_back(); + + std::vector> matchedDirs = {std::move(parentDir)}; + + for (const auto& subDir : pathSegments) { + std::vector> matchesInCurrentDir; + const std::regex dirRegex(subDir); + for (const auto& dir : matchedDirs) { + const auto matchedSubDirs = getMatchesInDirectory(dirRegex, dir); + matchesInCurrentDir.insert( + matchesInCurrentDir.end(), + matchedSubDirs.begin(), + matchedSubDirs.end()); + } + matchedDirs = matchesInCurrentDir; + } + + const std::regex fileRegex(topLevelName); + for (const auto& dir : matchedDirs) { + const auto filesInDir = getMatchesInDirectory(fileRegex, dir); + matchingFiles.insert(matchingFiles.end(), filesInDir.begin(), filesInDir.end()); + } + + return matchingFiles; +} + +static void mergeContent(Content& content, const Content& newContent) +{ + if (std::holds_alternative(content) && std::holds_alternative(newContent)) { + auto& dict = std::get(content); + const auto& newDict = std::get(newContent); + dict.insert(newDict.begin(), newDict.end()); + return; + } + content = newContent; +} + +static void validateAggOperations(const std::vector& ops) +{ + const bool hasDictFieldName + = std::any_of(ops.begin(), ops.end(), [](const AggOperation& aggOp) { + return !aggOp.dictFieldName.empty(); + }); + + const bool hasNoDictFieldName + = std::any_of(ops.begin(), ops.end(), [](const AggOperation& aggOp) { + return aggOp.dictFieldName.empty(); + }); + + if (hasDictFieldName && hasNoDictFieldName) { + throw NodeException( + "Inconsistent AggOperation configurations: Some operations have 'dictFieldName' " + "specified while others don't."); + } + + if (hasDictFieldName && ops.size() > 1) { + throw NodeException( + "Invalid AggOperation configuration: When 'dictFieldName' is specified, only one " + "operation is allowed."); + } +} + +Content AggregatedFile::read() +{ + Content content; + + const auto files = getFilesMatchingPattern( + m_filesRegexPattern, + std::dynamic_pointer_cast(m_parent)); + + std::vector fileContents; + fileContents.reserve(files.size()); + for (const auto& file : files) { + fileContents.emplace_back(file->read()); + } + + for (const auto& aggMethod : m_aggMethods) { + const Content methodResult = aggMethod->aggregate(fileContents); + mergeContent(content, methodResult); + } + + return content; +} + +FileOps AggregatedFile::getOps() +{ + FileOps ops = {}; + ops.read = [this]() { return read(); }; + return ops; +} + +AggregatedFile::AggregatedFile( + const std::shared_ptr& parent, + std::string_view name, + std::string aggFilesPattern, + const std::vector& ops) + : File(parent, name, getOps()) + , m_filesRegexPattern(std::move(aggFilesPattern)) +{ + validateAggOperations(ops); + + for (const auto& aggOp : ops) { + m_aggMethods.push_back(AggMethodFactory::createAggMethod( + aggOp.method, + aggOp.dictFieldName, + aggOp.dictResultName)); + } +} + +} // namespace telemetry diff --git a/src/telemetry/directory.cpp b/src/telemetry/directory.cpp index b769cca..9825f4e 100644 --- a/src/telemetry/directory.cpp +++ b/src/telemetry/directory.cpp @@ -61,6 +61,25 @@ std::shared_ptr Directory::addFile(std::string_view name, FileOps ops) return newFile; } +std::shared_ptr Directory::addAggFile( + std::string_view name, + const std::string& aggFilesPattern, + const std::vector& aggOps) +{ + const std::lock_guard lock(getMutex()); + const std::shared_ptr entry = getEntryLocked(name); + + if (entry != nullptr) { + throwEntryAlreadyExists(name); + } + + auto newFile = std::shared_ptr( + new AggregatedFile(shared_from_this(), name, aggFilesPattern, aggOps)); + + addEntryLocked(newFile); + return newFile; +} + std::vector Directory::listEntries() { std::vector result; diff --git a/src/telemetry/node.cpp b/src/telemetry/node.cpp index 06f4c2c..d516bd9 100644 --- a/src/telemetry/node.cpp +++ b/src/telemetry/node.cpp @@ -14,8 +14,8 @@ namespace telemetry { Node::Node(std::shared_ptr parent, std::string_view name) - : m_name(name) - , m_parent(std::move(parent)) + : m_parent(std::move(parent)) + , m_name(name) { if (m_parent == nullptr) { throwNodeException("parent cannot be nullptr");