From 17e93e67cc774042387efc84e33543c1da5e8268 Mon Sep 17 00:00:00 2001 From: Pavel Siska Date: Thu, 3 Oct 2024 22:30:28 +0200 Subject: [PATCH 1/2] telemetry - add MIN and MAX aggregation method --- include/telemetry/aggMethod.hpp | 4 +- src/telemetry/CMakeLists.txt | 1 + src/telemetry/aggregator/aggMethodFactory.cpp | 3 + src/telemetry/aggregator/aggMinMax.cpp | 179 ++++++++++++++++++ src/telemetry/aggregator/aggMinMax.hpp | 51 +++++ 5 files changed, 237 insertions(+), 1 deletion(-) create mode 100644 src/telemetry/aggregator/aggMinMax.cpp create mode 100644 src/telemetry/aggregator/aggMinMax.hpp diff --git a/include/telemetry/aggMethod.hpp b/include/telemetry/aggMethod.hpp index 2fa5ad1..2f41871 100644 --- a/include/telemetry/aggMethod.hpp +++ b/include/telemetry/aggMethod.hpp @@ -21,10 +21,12 @@ namespace telemetry { * Supported methods and types: * - @p AVG: Scalar(WithUnit) value, [uint64_t, int64_t, double] -> result double * - @p SUM: Scalar(WithUnit) value, [uint64_t, int64_t, double] + * - @p MIN: Scalar(WithUnit) value, [uint64_t, int64_t, double] + * - @p MAX: Scalar(WithUnit) value, [uint64_t, int64_t, double] * - @p JOIN: Scalar value (array included), [bool, uint64_t, int64_t, double, string, * std::monostate()] */ -enum class AggMethodType { AVG, SUM, JOIN }; +enum class AggMethodType { AVG, SUM, MIN, MAX, JOIN }; /** * @brief Structure representing an aggregation operation diff --git a/src/telemetry/CMakeLists.txt b/src/telemetry/CMakeLists.txt index 3c26727..6d6995d 100644 --- a/src/telemetry/CMakeLists.txt +++ b/src/telemetry/CMakeLists.txt @@ -9,6 +9,7 @@ list(APPEND TELEMETRY_SOURCE_FILES aggregator/aggMethod.cpp aggregator/aggSum.cpp aggregator/aggAvg.cpp + aggregator/aggMinMax.cpp aggregator/aggJoin.cpp aggregator/aggMethodFactory.cpp ) diff --git a/src/telemetry/aggregator/aggMethodFactory.cpp b/src/telemetry/aggregator/aggMethodFactory.cpp index a4ed08a..9fd0458 100644 --- a/src/telemetry/aggregator/aggMethodFactory.cpp +++ b/src/telemetry/aggregator/aggMethodFactory.cpp @@ -8,6 +8,7 @@ #include "aggAvg.hpp" #include "aggJoin.hpp" +#include "aggMinMax.hpp" #include "aggSum.hpp" #include @@ -25,6 +26,8 @@ std::unique_ptr AggMethodFactory::createAggMethod( aggMethod = std::make_unique(); } else if (aggMethodType == AggMethodType::AVG) { aggMethod = std::make_unique(); + } else if (aggMethodType == AggMethodType::MIN || aggMethodType == AggMethodType::MAX) { + aggMethod = std::make_unique(aggMethodType); } else if (aggMethodType == AggMethodType::JOIN) { aggMethod = std::make_unique(); } else { diff --git a/src/telemetry/aggregator/aggMinMax.cpp b/src/telemetry/aggregator/aggMinMax.cpp new file mode 100644 index 0000000..09f1cb3 --- /dev/null +++ b/src/telemetry/aggregator/aggMinMax.cpp @@ -0,0 +1,179 @@ +/** + * @file + * @author Pavel Siska + * @brief Implementaion of the MIN aggregation method for telemetry data. + * + * @note SPDX-License-Identifier: BSD-3-Clause + */ + +#include "aggMinMax.hpp" + +#include "aggCommon.hpp" + +#include + +namespace telemetry { + +using ResultType = std::variant; + +static void findMin(const Scalar& value, Scalar& result) +{ + if (std::holds_alternative(result)) { + result = value; + return; + } + + if (std::holds_alternative(value)) { + if (std::get(value) < std::get(result)) { + result = value; + } + } else if (std::holds_alternative(value)) { + if (std::get(value) < std::get(result)) { + result = value; + } + } else if (std::holds_alternative(value)) { + if (std::get(value) < std::get(result)) { + result = value; + } + } else { + throw TelemetryException("Invalid scalar alternative type for min operation."); + } +} + +static void findMax(const Scalar& value, Scalar& result) +{ + if (std::holds_alternative(result)) { + result = value; + return; + } + + if (std::holds_alternative(value)) { + if (std::get(value) > std::get(result)) { + result = value; + } + } else if (std::holds_alternative(value)) { + if (std::get(value) > std::get(result)) { + result = value; + } + } else if (std::holds_alternative(value)) { + if (std::get(value) > std::get(result)) { + result = value; + } + } else { + throw TelemetryException("Invalid scalar alternative type for max operation."); + } +} + +static Scalar +aggregateScalar(std::vector& values, const AggMethodMinMax::AggMethod& aggMethod) +{ + Scalar result = std::monostate(); + + if (values.empty()) { + return result; + } + + if (!std::holds_alternative(values.front())) { + throw TelemetryException("Unexpected variant alternative."); + } + + for (const auto& value : values) { + const auto& scalar = std::get(value); + aggMethod(scalar, result); + } + + return result; +} + +static ScalarWithUnit aggregateScalarWithUnit( + std::vector& values, + const AggMethodMinMax::AggMethod& aggMethod) +{ + Scalar result = std::monostate(); + + if (values.empty()) { + return {}; + } + + if (!std::holds_alternative(values.front())) { + throw TelemetryException("Unexpected variant alternative."); + } + + for (const auto& value : values) { + [[maybe_unused]] const auto& [scalar, _] = std::get(value); + aggMethod(scalar, result); + } + + [[maybe_unused]] const auto& [_, unit] = std::get(values.front()); + + return {result, unit}; +} + +static ResultType aggregateGatheredValues( + std::vector& values, + const AggMethodMinMax::AggMethod& aggMethod) +{ + if (std::holds_alternative(values.front())) { + return aggregateScalar(values, aggMethod); + } + + if (std::holds_alternative(values.front())) { + return aggregateScalarWithUnit(values, aggMethod); + } + + throw TelemetryException("Unexpected variant alternative."); +} + +static Content createDictContent(const std::string& dictKey, const ResultType& result) +{ + Dict dict; + + auto visitor = [&](const auto& arg) -> DictValue { return arg; }; + dict[dictKey] = std::visit(visitor, result); + + return dict; +} + +static Content createContent(const std::string& dictKey, const ResultType& result) +{ + if (!dictKey.empty()) { + return createDictContent(dictKey, result); + } + + auto visitor = [&](const auto& arg) -> Content { return arg; }; + return std::visit(visitor, result); +} + +AggMethodMinMax::AggMethodMinMax(const AggMethodType& method) +{ + if (method == AggMethodType::MIN) { + m_agregateFunction = findMin; + } else if (method == AggMethodType::MAX) { + m_agregateFunction = findMax; + } else { + throw TelemetryException("Invalid aggregation method."); + } +} + +Content AggMethodMinMax::aggregate(const std::vector& contents) +{ + std::vector values; + + for (const auto& content : contents) { + const auto& aggContent = getAggContent(content); + values.emplace_back(aggContent); + } + + if (!hasOneOfThisAlternative(values)) { + throw TelemetryException("The contents data does not contain the same variant alternative"); + } + + if (!hasValidScalarType(values)) { + throw TelemetryException("Invalid scalar variant alternative"); + } + + const auto& result = aggregateGatheredValues(values, m_agregateFunction); + return createContent(getDictResultName(), result); +} + +} // namespace telemetry diff --git a/src/telemetry/aggregator/aggMinMax.hpp b/src/telemetry/aggregator/aggMinMax.hpp new file mode 100644 index 0000000..e91413e --- /dev/null +++ b/src/telemetry/aggregator/aggMinMax.hpp @@ -0,0 +1,51 @@ +/** + * @file + * @author Pavel Siska + * @brief Interface of the MIN and MAX aggregation method for telemetry data. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#pragma once + +#include +#include + +#include +#include +#include + +namespace telemetry { + +/** + * @brief Implementation of the SUM aggregation method. + */ +class AggMethodMinMax : public AggMethod { +public: + /** + * @brief Construct a new AggMethodMinMax object. + * + * @param method The aggregation method to use (MIN or MAX). + * @throws TelemetryException if the aggregation method is invalid. + */ + explicit AggMethodMinMax(const AggMethodType& method); + + /** + * @brief Aggregate telemetry data using the MIN and MAX method. + * + * @param contents The vector of telemetry content to aggregate. + * @return The aggregated content. + * @throws TelemetryException if the aggregation encounters an error. + */ + Content aggregate(const std::vector& contents) override; + + /** + * @brief Get the result type of the aggregation + */ + using AggMethod = std::function; + +private: + AggMethod m_agregateFunction; +}; + +} // namespace telemetry From a11eef0ff6d6ef305c88dadd960e5e327e2c259e Mon Sep 17 00:00:00 2001 From: Pavel Siska Date: Thu, 3 Oct 2024 23:24:39 +0200 Subject: [PATCH 2/2] test - introduce unit tests for telemetry::AggMethodMinMax --- src/telemetry/aggregator/aggMinMax.cpp | 4 + .../aggregator/tests/testAggMinMax.cpp | 292 ++++++++++++++++++ 2 files changed, 296 insertions(+) create mode 100644 src/telemetry/aggregator/tests/testAggMinMax.cpp diff --git a/src/telemetry/aggregator/aggMinMax.cpp b/src/telemetry/aggregator/aggMinMax.cpp index 09f1cb3..e6bc19e 100644 --- a/src/telemetry/aggregator/aggMinMax.cpp +++ b/src/telemetry/aggregator/aggMinMax.cpp @@ -177,3 +177,7 @@ Content AggMethodMinMax::aggregate(const std::vector& contents) } } // namespace telemetry + +#ifdef TELEMETRY_ENABLE_TESTS +#include "tests/testAggMinMax.cpp" +#endif diff --git a/src/telemetry/aggregator/tests/testAggMinMax.cpp b/src/telemetry/aggregator/tests/testAggMinMax.cpp new file mode 100644 index 0000000..6559554 --- /dev/null +++ b/src/telemetry/aggregator/tests/testAggMinMax.cpp @@ -0,0 +1,292 @@ +/** + * @file + * @author Pavel Siska + * @brief Unit tests of Telemetry::AggMethodMinMax + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include + +#include + +namespace telemetry { + +/** + * @test Test find min of scalar values + */ +TEST(AggMinMaxTest, TestFindMin) +{ + // Test find min of uint64_t scalar + { + Scalar result = uint64_t(10); + findMin(uint64_t(1), result); + EXPECT_EQ(uint64_t(1), std::get(result)); + } + + // Test find min of int64_t scalar + { + Scalar result = int64_t(10); + findMin(int64_t(5), result); + EXPECT_EQ(int64_t(5), std::get(result)); + } + + // Test find min of double scalar + { + Scalar result = double(1.0); + findMin(double(5.0), result); + EXPECT_EQ(double(1.0), std::get(result)); + } + + // Test find min of scalar types with different types (expect failure) + { + Scalar result = uint64_t(5); + EXPECT_THROW(findMin(int64_t(5), result), std::exception); + } +} + +/** + * @test Test find max of scalar values + */ +TEST(AggMinMaxTest, TestFindMax) +{ + // Test find max of uint64_t scalar + { + Scalar result = uint64_t(1); + findMax(uint64_t(10), result); + EXPECT_EQ(uint64_t(10), std::get(result)); + } + + // Test find max of int64_t scalar + { + Scalar result = int64_t(1); + findMax(int64_t(5), result); + EXPECT_EQ(int64_t(5), std::get(result)); + } + + // Test find max of double scalar + { + Scalar result = double(1.0); + findMax(double(5.0), result); + EXPECT_EQ(double(5.0), std::get(result)); + } + + // Test find max of scalar types with different types (expect failure) + { + Scalar result = uint64_t(5); + EXPECT_THROW(findMax(int64_t(5), result), std::exception); + } +} + +/** + * @test Test aggregation of scalar values + */ +TEST(AggMinMaxTest, TestAggregateScalar) +{ + const AggMethodMinMax::AggMethod minMethod = findMin; + const AggMethodMinMax::AggMethod maxMethod = findMax; + + // Test aggregation of scalar values (min) + { + std::vector values = {Scalar {5.0}, Scalar {10.0}, Scalar {15.0}}; + Scalar result = aggregateScalar(values, minMethod); + EXPECT_EQ(Scalar {5.0}, result); + } + + // Test aggregation of scalar values (max) + { + std::vector values = {Scalar {5.0}, Scalar {15.0}, Scalar {10.0}}; + Scalar result = aggregateScalar(values, maxMethod); + EXPECT_EQ(Scalar {15.0}, result); + } + + // Test aggregation of ScalarWithUnit values (expect failure) + { + std::vector values = {ScalarWithUnit {5.0, "unit"}}; + EXPECT_THROW(aggregateScalar(values, minMethod), TelemetryException); + } + + // Test aggregation of empty vector + { + std::vector values = {}; + Scalar result = aggregateScalar(values, maxMethod); + EXPECT_TRUE(std::holds_alternative(result)); + } +} + +/** + * @test Test aggregation of scalar values with units + */ +TEST(AggMinMaxTest, TestAggregateScalarWithUnit) +{ + const AggMethodMinMax::AggMethod minMethod = findMin; + const AggMethodMinMax::AggMethod maxMethod = findMax; + + // Test aggregation of scalar values with unit (min) + { + std::vector values + = {ScalarWithUnit {5.0, "unit"}, + ScalarWithUnit {-10.0, "unit"}, + ScalarWithUnit {15.0, "unit"}}; + const auto& [scalar, unit] = aggregateScalarWithUnit(values, minMethod); + EXPECT_EQ(std::get(scalar), -10.0); + EXPECT_EQ(unit, "unit"); + } + + // Test aggregation of scalar values with unit (max) + { + std::vector values + = {ScalarWithUnit {5.0, "unit"}, + ScalarWithUnit {10.0, "unit"}, + ScalarWithUnit {150.0, "unit"}}; + const auto& [scalar, unit] = aggregateScalarWithUnit(values, maxMethod); + EXPECT_EQ(std::get(scalar), 150.0); + EXPECT_EQ(unit, "unit"); + } + + // Test aggregation of Scalar values (expect failure) + { + std::vector values = {Scalar {5.0}}; + EXPECT_THROW(aggregateScalarWithUnit(values, maxMethod), TelemetryException); + } + + // Test aggregation of empty vector + { + std::vector values = {}; + const auto& [scalar, unit] = aggregateScalarWithUnit(values, maxMethod); + EXPECT_TRUE(std::holds_alternative(scalar)); + EXPECT_EQ(unit, ""); + } +} + +/** + * @test Test creation of dictionary content + */ +TEST(AggMinMaxTest, TestCreateDictContent) +{ + ResultType result = Scalar {uint64_t(30)}; + Content content = createDictContent("min", result); + + EXPECT_TRUE(std::holds_alternative(content)); + + Dict& contentDict = std::get(content); + EXPECT_EQ(1, contentDict.size()); + + auto iter = contentDict.cbegin(); + { + const auto& [key, value] = *(iter++); + EXPECT_EQ("min", key); + EXPECT_TRUE(std::holds_alternative(value)); + const auto& scalar = std::get(value); + EXPECT_TRUE(std::holds_alternative(scalar)); + EXPECT_EQ(uint64_t(30), std::get(scalar)); + } +} + +/** + * @test Test aggregation method for sum + */ +TEST(AggMinMaxTest, TestAggregate) +{ + // Test aggregation of scalar values (min) + { + AggMethodMinMax aggMethodMin(AggMethodType::MIN); + std::vector contents = {Scalar {5.0}, Scalar {100.0}, Scalar {-105.0}}; + Content content = aggMethodMin.aggregate(contents); + EXPECT_TRUE(std::holds_alternative(content)); + Scalar& scalar = std::get(content); + EXPECT_TRUE(std::holds_alternative(scalar)); + double result = std::get(scalar); + EXPECT_EQ(-105.0, result); + } + + // Test aggregation of scalar values (max) + { + AggMethodMinMax aggMethodMax(AggMethodType::MAX); + std::vector contents = {Scalar {5.0}, Scalar {100.0}, Scalar {-105.0}}; + Content content = aggMethodMax.aggregate(contents); + EXPECT_TRUE(std::holds_alternative(content)); + Scalar& scalar = std::get(content); + EXPECT_TRUE(std::holds_alternative(scalar)); + double result = std::get(scalar); + EXPECT_EQ(100.0, result); + } + + // Test aggregation of ScalarWithUnit values (min) + { + AggMethodMinMax aggMethodMin(AggMethodType::MIN); + std::vector contents = {ScalarWithUnit {5.0, "unit"}}; + Content content = aggMethodMin.aggregate(contents); + EXPECT_TRUE(std::holds_alternative(content)); + const auto& [scalar, unit] = std::get(content); + EXPECT_EQ(5.0, std::get(scalar)); + EXPECT_EQ("unit", unit); + } + + // Test aggregation of mixed types (expect failure) + { + AggMethodMinMax aggMethodMin(AggMethodType::MIN); + std::vector contents = {ScalarWithUnit {5.0, "unit"}, Scalar {5.0}}; + EXPECT_THROW(aggMethodMin.aggregate(contents), TelemetryException); + } + + // Test aggregation of incompatible types (expect failure) + { + AggMethodMinMax aggMethodMin(AggMethodType::MIN); + std::vector contents = {Scalar {true}, Scalar {5.0}}; + EXPECT_THROW(aggMethodMin.aggregate(contents), TelemetryException); + } + + // Test aggregation of incompatible scalar types (expect failure) + { + AggMethodMinMax aggMethodMin(AggMethodType::MIN); + std::vector contents = {Scalar {uint64_t(20)}, Scalar {5.0}}; + EXPECT_THROW(aggMethodMin.aggregate(contents), TelemetryException); + } + + // Test aggregation of scalar and uint64_t types + { + AggMethodMinMax aggMethodMax(AggMethodType::MAX); + std::vector contents = {Scalar {uint64_t(20)}, uint64_t {5}}; + Content content = aggMethodMax.aggregate(contents); + EXPECT_TRUE(std::holds_alternative(content)); + const auto& scalar = std::get(content); + EXPECT_EQ(20, std::get(scalar)); + } + + // Test aggregation of dictionary values (min) + { + AggMethodMinMax aggMethodMin(AggMethodType::MIN); + aggMethodMin.setDictField("packets", "packetsSum"); + std::vector contents + = {Dict({{"packets", Scalar {uint64_t(1)}}}), + Dict({{"packets", Scalar {uint64_t(5)}}})}; + Content content = aggMethodMin.aggregate(contents); + EXPECT_TRUE(std::holds_alternative(content)); + + const Dict& dict = std::get(content); + EXPECT_EQ(1, dict.size()); + + const Scalar& scalarValueSum = std::get(dict.at("packetsSum")); + EXPECT_EQ(uint64_t(1), std::get(scalarValueSum)); + } + + // Test aggregation of dictionary values (max) + { + AggMethodMinMax aggMethodMax(AggMethodType::MAX); + aggMethodMax.setDictField("packets", "packetsSum"); + std::vector contents + = {Dict({{"packets", Scalar {uint64_t(1)}}}), + Dict({{"packets", Scalar {uint64_t(5)}}})}; + Content content = aggMethodMax.aggregate(contents); + EXPECT_TRUE(std::holds_alternative(content)); + + const Dict& dict = std::get(content); + EXPECT_EQ(1, dict.size()); + + const Scalar& scalarValueSum = std::get(dict.at("packetsSum")); + EXPECT_EQ(uint64_t(5), std::get(scalarValueSum)); + } +} + +} // namespace telemetry