Skip to content

Commit

Permalink
Merge pull request #15 from CESNET/v0.3.0
Browse files Browse the repository at this point in the history
V0.3.0
  • Loading branch information
SiskaPavel authored Sep 30, 2024
2 parents 6717b36 + a929479 commit 0c9304a
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 12 deletions.
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
cmake_minimum_required(VERSION 3.12)

set(VERSION_MAJOR 0)
set(VERSION_MINOR 2)
set(VERSION_PATCH 2)
set(VERSION_MINOR 3)
set(VERSION_PATCH 0)
set(VERSION ${VERSION_MAJOR}.${VERSION_MINOR}.${VERSION_PATCH})

project(telemetry VERSION ${VERSION})
Expand Down
4 changes: 3 additions & 1 deletion include/telemetry/aggFile.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,14 @@ class AggregatedFile : public File {
const std::shared_ptr<Node>& parent,
std::string_view name,
std::string aggFilesPattern,
const std::vector<AggOperation>& ops);
const std::vector<AggOperation>& ops,
std::shared_ptr<Directory> patternRootDir = nullptr);

FileOps getOps();

const std::string m_filesRegexPattern;

std::shared_ptr<Directory> m_patternRootDir;
std::vector<std::string> m_paths;
std::vector<std::unique_ptr<AggMethod>> m_aggMethods;
};
Expand Down
17 changes: 16 additions & 1 deletion include/telemetry/directory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,18 @@ class Directory : public Node {
*/
[[nodiscard]] std::shared_ptr<Directory> addDir(std::string_view name);

/**
* @brief Add multiple subdirectories with the given @p name.
*
* The function is equivalent to calling addDir() for each path segment in the given
* path. If a file with the same name already exists, the subdirectory cannot be created.
*
* @param name Path to the subdirectory
* @return Shared pointer to the last subdirectory in the path
* @throw TelemetryException if there is already a file with the same name.
*/
[[nodiscard]] std::shared_ptr<Directory> addDirs(std::string_view name);

/**
* @brief Add a new file with the given @p name and @p ops I/O operations.
*
Expand Down Expand Up @@ -86,13 +98,16 @@ class Directory : public Node {
* @param name Name of the aggregated file
* @param aggFilesPattern Regular expression pattern used to match files for aggregation
* @param aggOps Vector of aggregation operations to be applied to the data
* @param patternRootDir Root directory for the pattern (default is the parent directory)
*
* @return Shared pointer to the newly created aggregated file
* @throw TelemetryException If an entry with the same name already exists in the directory
*/
[[nodiscard]] std::shared_ptr<AggregatedFile> addAggFile(
std::string_view name,
const std::string& aggFilesPattern,
const std::vector<AggOperation>& aggOps);
const std::vector<AggOperation>& aggOps,
std::shared_ptr<Directory> patternRootDir = nullptr);

/**
* @brief List all available entries of the directory.
Expand Down
18 changes: 14 additions & 4 deletions src/telemetry/aggFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,17 @@ Content AggregatedFile::read()
{
Content content;

const auto files = getFilesMatchingPattern(
m_filesRegexPattern,
std::dynamic_pointer_cast<Directory>(m_parent));
std::shared_ptr<Directory> patternRootDir;
if (m_patternRootDir) {
patternRootDir = m_patternRootDir;
} else {
patternRootDir = std::dynamic_pointer_cast<Directory>(m_parent);
}

const auto files = getFilesMatchingPattern(m_filesRegexPattern, patternRootDir);
if (files.empty()) {
return content;
}

std::vector<Content> fileContents;
fileContents.reserve(files.size());
Expand Down Expand Up @@ -148,9 +156,11 @@ AggregatedFile::AggregatedFile(
const std::shared_ptr<Node>& parent,
std::string_view name,
std::string aggFilesPattern,
const std::vector<AggOperation>& ops)
const std::vector<AggOperation>& ops,
std::shared_ptr<Directory> patternRootDir)
: File(parent, name, getOps())
, m_filesRegexPattern(std::move(aggFilesPattern))
, m_patternRootDir(std::move(patternRootDir))
{
validateAggOperations(ops);

Expand Down
1 change: 0 additions & 1 deletion src/telemetry/aggregator/tests/testAggSum.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ TEST(AggSumTest, TestAggregate)
AggMethodSum aggMethodSum;
std::vector<Content> contents = {Scalar {true}, Scalar {5.0}};
EXPECT_THROW(aggMethodSum.aggregate(contents), TelemetryException);

}

// Test aggregation of incompatible scalar types (expect failure)
Expand Down
24 changes: 21 additions & 3 deletions src/telemetry/directory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/

#include <telemetry/directory.hpp>
#include <telemetry/utility.hpp>

#include <mutex>

Expand Down Expand Up @@ -47,6 +48,18 @@ std::shared_ptr<Directory> Directory::addDir(std::string_view name)
return newDir;
}

[[nodiscard]] std::shared_ptr<Directory> Directory::addDirs(std::string_view name)
{
const auto paths = utils::parsePath(std::string(name));

std::shared_ptr<Directory> dir = std::dynamic_pointer_cast<Directory>(shared_from_this());
for (const auto& path : paths) {
dir = dir->addDir(path);
}

return dir;
}

std::shared_ptr<File> Directory::addFile(std::string_view name, FileOps ops)
{
const std::lock_guard lock(getMutex());
Expand All @@ -64,7 +77,8 @@ std::shared_ptr<File> Directory::addFile(std::string_view name, FileOps ops)
std::shared_ptr<AggregatedFile> Directory::addAggFile(
std::string_view name,
const std::string& aggFilesPattern,
const std::vector<AggOperation>& aggOps)
const std::vector<AggOperation>& aggOps,
std::shared_ptr<Directory> patternRootDir)
{
const std::lock_guard lock(getMutex());
const std::shared_ptr<Node> entry = getEntryLocked(name);
Expand All @@ -73,8 +87,12 @@ std::shared_ptr<AggregatedFile> Directory::addAggFile(
throwEntryAlreadyExists(name);
}

auto newFile = std::shared_ptr<AggregatedFile>(
new AggregatedFile(shared_from_this(), name, aggFilesPattern, aggOps));
auto newFile = std::shared_ptr<AggregatedFile>(new AggregatedFile(
shared_from_this(),
name,
aggFilesPattern,
aggOps,
std::move(patternRootDir)));

addEntryLocked(newFile);
return newFile;
Expand Down
65 changes: 65 additions & 0 deletions src/telemetry/tests/testAggFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,4 +282,69 @@ TEST(TelemetryAggFile, read)
EXPECT_EQ(uint64_t(10), std::get<uint64_t>(ArrayValueJoin[2]));
}

TEST(TelemetryAggFile, readPatternDir)
{
auto root = Directory::create();

auto dir = root->addDir("dir");
auto data0 = root->addDirs("dir/data_0/");
auto data1 = root->addDirs("dir/data_1/");
auto data2 = root->addDirs("dir/data_2/");

FileOps ops1;
ops1.read = []() { return Dict({{"packets", Scalar {uint64_t(1)}}}); };
FileOps ops2;
ops2.read = []() { return Dict({{"packets", Scalar {uint64_t(4)}}}); };
FileOps ops3;
ops3.read = []() { return Dict({{"packets", Scalar {uint64_t(10)}}}); };

auto file1 = data0->addFile("file1", ops1);
auto file2 = data1->addFile("file2", ops2);
auto file3 = data2->addFile("file3", ops3);

AggOperation aggOp1 {AggMethodType::SUM, "packets", "sumPackets"};
AggOperation aggOp2 {AggMethodType::AVG, "packets", "avgPackets"};
AggOperation aggOp3 {AggMethodType::JOIN, "packets", "joinPackets"};

auto aggFile
= root->addAggFile("aggFile", R"(data_\d+/file\d+)", {aggOp1, aggOp2, aggOp3}, dir);
const auto content = aggFile->read();

EXPECT_TRUE(std::holds_alternative<Dict>(content));

const Dict& dict = std::get<Dict>(content);
EXPECT_EQ(3, dict.size());

const Scalar& scalarValueSum = std::get<Scalar>(dict.at("sumPackets"));
EXPECT_EQ(uint64_t(15), std::get<uint64_t>(scalarValueSum));

const Scalar& scalarValueAvg = std::get<Scalar>(dict.at("avgPackets"));
EXPECT_EQ(5.00, std::get<double>(scalarValueAvg));

const Array& ArrayValueJoin = std::get<Array>(dict.at("joinPackets"));
EXPECT_EQ(3, ArrayValueJoin.size());
EXPECT_EQ(uint64_t(1), std::get<uint64_t>(ArrayValueJoin[0]));
EXPECT_EQ(uint64_t(4), std::get<uint64_t>(ArrayValueJoin[1]));
EXPECT_EQ(uint64_t(10), std::get<uint64_t>(ArrayValueJoin[2]));
}

TEST(TelemetryAggFile, readNoMatchingPattern)
{
auto root = Directory::create();

auto data0 = root->addDirs("dir/data_0/");

FileOps ops1;
ops1.read = []() { return Dict({{"packets", Scalar {uint64_t(1)}}}); };

auto file1 = data0->addFile("file1", ops1);

AggOperation aggOp1 {AggMethodType::SUM, "packets", "sumPackets"};

auto aggFile = root->addAggFile("aggFile", R"(data_\d+/file\d+)", {aggOp1});
const auto content = aggFile->read();

EXPECT_TRUE(std::holds_alternative<Scalar>(content));
}

} // namespace telemetry
20 changes: 20 additions & 0 deletions src/telemetry/tests/testDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,26 @@ TEST(TelemetryDirectory, addDir)
EXPECT_EQ(info, info2);
}

/**
* @test Test creating telemetry directories recursively.
*/
TEST(TelemetryDirectory, addDirs)
{
auto root = Directory::create();

auto test = root->addDirs("info/app/test");
EXPECT_EQ("test", test->getName());
EXPECT_EQ("/info/app/test", test->getFullPath());

auto app = test->addDirs("app");
EXPECT_EQ("app", app->getName());
EXPECT_EQ("/info/app/test/app", app->getFullPath());

auto rootDir = root->addDirs("");
EXPECT_EQ("", rootDir->getName());
EXPECT_EQ("/", rootDir->getFullPath());
}

/**
* @test Test creating invalid telemetry directories.
*/
Expand Down

0 comments on commit 0c9304a

Please sign in to comment.