Skip to content

Commit

Permalink
telemetry - add MIN and MAX aggregation method
Browse files Browse the repository at this point in the history
  • Loading branch information
SiskaPavel committed Oct 3, 2024
1 parent 905c14b commit 926a4a7
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 1 deletion.
4 changes: 3 additions & 1 deletion include/telemetry/aggMethod.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ namespace telemetry {
* Supported methods and types:
* - @p AVG: Scalar(WithUnit) value, [uint64_t, int64_t, double] -> result double
* - @p SUM: Scalar(WithUnit) value, [uint64_t, int64_t, double]
* - @p MIN: Scalar(WithUnit) value, [uint64_t, int64_t, double]
* - @p MAX: Scalar(WithUnit) value, [uint64_t, int64_t, double]
* - @p JOIN: Scalar value (array included), [bool, uint64_t, int64_t, double, string,
* std::monostate()]
*/
enum class AggMethodType { AVG, SUM, JOIN };
enum class AggMethodType { AVG, SUM, MIN, MAX, JOIN };

/**
* @brief Structure representing an aggregation operation
Expand Down
1 change: 1 addition & 0 deletions src/telemetry/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ list(APPEND TELEMETRY_SOURCE_FILES
aggregator/aggMethod.cpp
aggregator/aggSum.cpp
aggregator/aggAvg.cpp
aggregator/aggMinMax.cpp
aggregator/aggJoin.cpp
aggregator/aggMethodFactory.cpp
)
Expand Down
3 changes: 3 additions & 0 deletions src/telemetry/aggregator/aggMethodFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include "aggAvg.hpp"
#include "aggJoin.hpp"
#include "aggMinMax.hpp"
#include "aggSum.hpp"

#include <telemetry/node.hpp>
Expand All @@ -25,6 +26,8 @@ std::unique_ptr<AggMethod> AggMethodFactory::createAggMethod(
aggMethod = std::make_unique<AggMethodSum>();
} else if (aggMethodType == AggMethodType::AVG) {
aggMethod = std::make_unique<AggMethodAvg>();
} else if (aggMethodType == AggMethodType::MIN || aggMethodType == AggMethodType::MAX) {
aggMethod = std::make_unique<AggMethodMinMax>(aggMethodType);
} else if (aggMethodType == AggMethodType::JOIN) {
aggMethod = std::make_unique<AggMethodJoin>();
} else {
Expand Down
179 changes: 179 additions & 0 deletions src/telemetry/aggregator/aggMinMax.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/**
* @file
* @author Pavel Siska <siska@cesnet.cz>
* @brief Implementaion of the MIN aggregation method for telemetry data.
*
* @note SPDX-License-Identifier: BSD-3-Clause
*/

#include "aggMinMax.hpp"

#include "aggCommon.hpp"

#include <telemetry/node.hpp>

namespace telemetry {

using ResultType = std::variant<Scalar, ScalarWithUnit>;

static void findMin(const Scalar& value, Scalar& result)
{
if (std::holds_alternative<std::monostate>(result)) {
result = value;
return;
}

if (std::holds_alternative<uint64_t>(value)) {
if (std::get<uint64_t>(value) < std::get<uint64_t>(result)) {
result = value;
}
} else if (std::holds_alternative<int64_t>(value)) {
if (std::get<int64_t>(value) < std::get<int64_t>(result)) {
result = value;
}
} else if (std::holds_alternative<double>(value)) {
if (std::get<double>(value) < std::get<double>(result)) {
result = value;
}
} else {
throw TelemetryException("Invalid scalar alternative type for min operation.");
}
}

static void findMax(const Scalar& value, Scalar& result)
{
if (std::holds_alternative<std::monostate>(result)) {
result = value;
return;
}

if (std::holds_alternative<uint64_t>(value)) {
if (std::get<uint64_t>(value) > std::get<uint64_t>(result)) {
result = value;
}
} else if (std::holds_alternative<int64_t>(value)) {
if (std::get<int64_t>(value) > std::get<int64_t>(result)) {
result = value;
}
} else if (std::holds_alternative<double>(value)) {
if (std::get<double>(value) > std::get<double>(result)) {
result = value;
}
} else {
throw TelemetryException("Invalid scalar alternative type for max operation.");
}
}

static Scalar
aggregateScalar(std::vector<AggContent>& values, const AggMethodMinMax::AggMethod& aggMethod)
{
Scalar result = std::monostate();

if (values.empty()) {
return result;
}

if (!std::holds_alternative<Scalar>(values.front())) {
throw TelemetryException("Unexpected variant alternative.");
}

for (const auto& value : values) {
const auto& scalar = std::get<Scalar>(value);
aggMethod(scalar, result);
}

return result;
}

static ScalarWithUnit aggregateScalarWithUnit(
std::vector<AggContent>& values,
const AggMethodMinMax::AggMethod& aggMethod)
{
Scalar result = std::monostate();

if (values.empty()) {
return {};
}

if (!std::holds_alternative<ScalarWithUnit>(values.front())) {
throw TelemetryException("Unexpected variant alternative.");
}

for (const auto& value : values) {
[[maybe_unused]] const auto& [scalar, _] = std::get<ScalarWithUnit>(value);
aggMethod(scalar, result);
}

[[maybe_unused]] const auto& [_, unit] = std::get<ScalarWithUnit>(values.front());

return {result, unit};
}

static ResultType aggregateGatheredValues(
std::vector<AggContent>& values,
const AggMethodMinMax::AggMethod& aggMethod)
{
if (std::holds_alternative<Scalar>(values.front())) {
return aggregateScalar(values, aggMethod);
}

if (std::holds_alternative<ScalarWithUnit>(values.front())) {
return aggregateScalarWithUnit(values, aggMethod);
}

throw TelemetryException("Unexpected variant alternative.");
}

static Content createDictContent(const std::string& dictKey, const ResultType& result)
{
Dict dict;

auto visitor = [&](const auto& arg) -> DictValue { return arg; };
dict[dictKey] = std::visit(visitor, result);

return dict;
}

static Content createContent(const std::string& dictKey, const ResultType& result)
{
if (!dictKey.empty()) {
return createDictContent(dictKey, result);
}

auto visitor = [&](const auto& arg) -> Content { return arg; };
return std::visit(visitor, result);
}

AggMethodMinMax::AggMethodMinMax(const AggMethodType& method)
{
if (method == AggMethodType::MIN) {
m_agregateFunction = findMin;
} else if (method == AggMethodType::MAX) {
m_agregateFunction = findMax;
} else {
throw TelemetryException("Invalid aggregation method.");
}
}

Content AggMethodMinMax::aggregate(const std::vector<Content>& contents)
{
std::vector<AggContent> values;

for (const auto& content : contents) {
const auto& aggContent = getAggContent(content);
values.emplace_back(aggContent);
}

if (!hasOneOfThisAlternative<ScalarWithUnit, Scalar>(values)) {
throw TelemetryException("The contents data does not contain the same variant alternative");
}

if (!hasValidScalarType<uint64_t, int64_t, double, std::monostate>(values)) {
throw TelemetryException("Invalid scalar variant alternative");
}

const auto& result = aggregateGatheredValues(values, m_agregateFunction);
return createContent(getDictResultName(), result);
}

} // namespace telemetry
51 changes: 51 additions & 0 deletions src/telemetry/aggregator/aggMinMax.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* @file
* @author Pavel Siska <siska@cesnet.cz>
* @brief Interface of the MIN and MAX aggregation method for telemetry data.
*
* SPDX-License-Identifier: BSD-3-Clause
*/

#pragma once

#include <telemetry/aggMethod.hpp>
#include <telemetry/content.hpp>

#include <functional>
#include <variant>
#include <vector>

namespace telemetry {

/**
* @brief Implementation of the SUM aggregation method.
*/
class AggMethodMinMax : public AggMethod {
public:
/**
* @brief Construct a new AggMethodMinMax object.
*
* @param method The aggregation method to use (MIN or MAX).
* @throws TelemetryException if the aggregation method is invalid.
*/
explicit AggMethodMinMax(const AggMethodType& method);

/**
* @brief Aggregate telemetry data using the MIN and MAX method.
*
* @param contents The vector of telemetry content to aggregate.
* @return The aggregated content.
* @throws TelemetryException if the aggregation encounters an error.
*/
Content aggregate(const std::vector<Content>& contents) override;

/**
* @brief Get the result type of the aggregation
*/
using AggMethod = std::function<void(const Scalar&, Scalar&)>;

private:
AggMethod m_agregateFunction;
};

} // namespace telemetry

0 comments on commit 926a4a7

Please sign in to comment.