Skip to content

Commit

Permalink
Add S3 metrics collection and reporting.
Browse files Browse the repository at this point in the history
  • Loading branch information
athmaja-n committed Aug 21, 2024
1 parent 2518463 commit 1226bdf
Show file tree
Hide file tree
Showing 11 changed files with 548 additions and 4 deletions.
3 changes: 2 additions & 1 deletion velox/common/base/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ add_library(
StatsReporter.cpp
SuccinctPrinter.cpp)

# Link the newly created velox_s3_metrics_aggregator library
target_link_libraries(
velox_common_base
PUBLIC velox_exception Folly::folly fmt::fmt xsimd
PRIVATE velox_common_compression velox_process velox_test_util glog::glog)
PRIVATE velox_common_compression velox_process velox_test_util glog::glog velox_s3_metrics_aggregator)

if(${VELOX_BUILD_TESTING})
add_subdirectory(tests)
Expand Down
107 changes: 106 additions & 1 deletion velox/common/base/PeriodicStatsReporter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,11 @@ PeriodicStatsReporter::PeriodicStatsReporter(const Options& options)
cache_(options.cache),
arbitrator_(options.arbitrator),
spillMemoryPool_(options.spillMemoryPool),
options_(options) {}
#ifdef VELOX_ENABLE_S3
s3Metrics(options.s3Metrics),
#endif
options_(options) {
}

void PeriodicStatsReporter::start() {
LOG(INFO) << "Starting PeriodicStatsReporter with options "
Expand All @@ -84,6 +88,17 @@ void PeriodicStatsReporter::start() {
"report_spill_stats",
[this]() { reportSpillStats(); },
options_.spillStatsIntervalMs);
// Add S3 metrics task
#ifdef VELOX_ENABLE_S3
if (s3Metrics != nullptr) {
addTask(
"report_s3_metrics",
[this]() { reportS3Metrics(); },
options_.s3MetricsIntervalMs);
} else {
LOG(WARNING) << "S3 metrics are null, not adding the reporting task.";
}
#endif
}

void PeriodicStatsReporter::stop() {
Expand Down Expand Up @@ -256,4 +271,94 @@ void PeriodicStatsReporter::reportSpillStats() {
RECORD_METRIC_VALUE(kMetricSpillPeakMemoryBytes, spillMemoryStats.peakBytes);
}

void PeriodicStatsReporter::reportS3Metrics() {
#ifdef VELOX_ENABLE_S3
auto aggregator =
facebook::velox::filesystems::S3MetricsAggregator::getInstance();

LOG(INFO) << "Updating S3 metrics: "
<< "ActiveConnections="
<< aggregator->getMetric(
facebook::velox::filesystems::kMetricS3ActiveConnections)
<< ", "
<< "StartedUploads="
<< aggregator->getMetric(
facebook::velox::filesystems::kMetricS3StartedUploads)
<< ", "
<< "FailedUploads="
<< aggregator->getMetric(
facebook::velox::filesystems::kMetricS3FailedUploads)
<< ", "
<< "SuccessfulUploads="
<< aggregator->getMetric(
facebook::velox::filesystems::kMetricS3SuccessfulUploads);

// Record the metrics
RECORD_METRIC_VALUE(
facebook::velox::filesystems::kMetricS3ActiveConnections,
aggregator->getMetric(
facebook::velox::filesystems::kMetricS3ActiveConnections));
RECORD_METRIC_VALUE(
facebook::velox::filesystems::kMetricS3StartedUploads,
aggregator->getMetric(
facebook::velox::filesystems::kMetricS3StartedUploads));
RECORD_METRIC_VALUE(
facebook::velox::filesystems::kMetricS3FailedUploads,
aggregator->getMetric(
facebook::velox::filesystems::kMetricS3FailedUploads));
RECORD_METRIC_VALUE(
facebook::velox::filesystems::kMetricS3SuccessfulUploads,
aggregator->getMetric(
facebook::velox::filesystems::kMetricS3SuccessfulUploads));
RECORD_METRIC_VALUE(
facebook::velox::filesystems::kMetricS3MetadataCalls,
aggregator->getMetric(
facebook::velox::filesystems::kMetricS3MetadataCalls));
RECORD_METRIC_VALUE(
facebook::velox::filesystems::kMetricS3ListStatusCalls,
aggregator->getMetric(
facebook::velox::filesystems::kMetricS3ListStatusCalls));
RECORD_METRIC_VALUE(
facebook::velox::filesystems::kMetricS3ListLocatedStatusCalls,
aggregator->getMetric(
facebook::velox::filesystems::kMetricS3ListLocatedStatusCalls));
RECORD_METRIC_VALUE(
facebook::velox::filesystems::kMetricS3ListObjectsCalls,
aggregator->getMetric(
facebook::velox::filesystems::kMetricS3ListObjectsCalls));
RECORD_METRIC_VALUE(
facebook::velox::filesystems::kMetricS3OtherReadErrors,
aggregator->getMetric(
facebook::velox::filesystems::kMetricS3OtherReadErrors));
RECORD_METRIC_VALUE(
facebook::velox::filesystems::kMetricS3AwsAbortedExceptions,
aggregator->getMetric(
facebook::velox::filesystems::kMetricS3AwsAbortedExceptions));
RECORD_METRIC_VALUE(
facebook::velox::filesystems::kMetricS3SocketExceptions,
aggregator->getMetric(
facebook::velox::filesystems::kMetricS3SocketExceptions));
RECORD_METRIC_VALUE(
facebook::velox::filesystems::kMetricS3GetObjectErrors,
aggregator->getMetric(
facebook::velox::filesystems::kMetricS3GetObjectErrors));
RECORD_METRIC_VALUE(
facebook::velox::filesystems::kMetricS3GetMetadataErrors,
aggregator->getMetric(
facebook::velox::filesystems::kMetricS3GetMetadataErrors));
RECORD_METRIC_VALUE(
facebook::velox::filesystems::kMetricS3GetObjectRetries,
aggregator->getMetric(
facebook::velox::filesystems::kMetricS3GetObjectRetries));
RECORD_METRIC_VALUE(
facebook::velox::filesystems::kMetricS3GetMetadataRetries,
aggregator->getMetric(
facebook::velox::filesystems::kMetricS3GetMetadataRetries));
RECORD_METRIC_VALUE(
facebook::velox::filesystems::kMetricS3ReadRetries,
aggregator->getMetric(
facebook::velox::filesystems::kMetricS3ReadRetries));
#endif
}

} // namespace facebook::velox
27 changes: 26 additions & 1 deletion velox/common/base/PeriodicStatsReporter.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@
#include "velox/common/caching/SsdFile.h"
#include "velox/common/memory/MemoryArbitrator.h"

#ifdef VELOX_ENABLE_S3
#include "velox/connectors/hive/storage_adapters/s3fs/S3MetricsAggregator.h"
#include "velox/connectors/hive/storage_adapters/s3fs/S3Metrics.h"
#endif

namespace folly {
class CPUThreadPoolExecutor;
}
Expand Down Expand Up @@ -53,14 +58,25 @@ class PeriodicStatsReporter {
const memory::MemoryPool* spillMemoryPool{nullptr};
uint64_t spillStatsIntervalMs{60'000};

#ifdef VELOX_ENABLE_S3
std::shared_ptr<velox::filesystems::S3MetricsAggregator> s3Metrics{nullptr};
uint64_t s3MetricsIntervalMs{60'000};
#endif

std::string toString() const {
return fmt::format(
std::string result = fmt::format(
"allocatorStatsIntervalMs:{}, cacheStatsIntervalMs:{}, "
"arbitratorStatsIntervalMs:{}, spillStatsIntervalMs:{}",
allocatorStatsIntervalMs,
cacheStatsIntervalMs,
arbitratorStatsIntervalMs,
spillStatsIntervalMs);

#ifdef VELOX_ENABLE_S3
result += fmt::format(", s3MetricsIntervalMs:{}", s3MetricsIntervalMs);
#endif

return result;
}
};

Expand Down Expand Up @@ -96,10 +112,19 @@ class PeriodicStatsReporter {
void reportArbitratorStats();
void reportSpillStats();

#ifdef VELOX_ENABLE_S3
// Method for reporting S3 metrics
void reportS3Metrics();
#endif

const velox::memory::MemoryAllocator* const allocator_{nullptr};
const velox::cache::AsyncDataCache* const cache_{nullptr};
const velox::memory::MemoryArbitrator* const arbitrator_{nullptr};
const velox::memory::MemoryPool* const spillMemoryPool_{nullptr};
#ifdef VELOX_ENABLE_S3
const std::shared_ptr<velox::filesystems::S3MetricsAggregator> s3Metrics{
nullptr};
#endif
const Options options_;

cache::CacheStats lastCacheStats_;
Expand Down
51 changes: 51 additions & 0 deletions velox/common/base/tests/StatsReporterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
#include "velox/common/caching/SsdCache.h"
#include "velox/common/memory/MmapAllocator.h"

#ifdef VELOX_ENABLE_S3
#include "velox/connectors/hive/storage_adapters/s3fs/S3MetricsAggregator.h"
#include "velox/connectors/hive/storage_adapters/s3fs/S3Metrics.h"
#endif

namespace facebook::velox {

class TestReporter : public BaseStatsReporter {
Expand Down Expand Up @@ -612,6 +617,52 @@ TEST_F(PeriodicStatsReporterTest, allNullOption) {
ASSERT_NO_THROW(stopPeriodicStatsReporter());
}

TEST_F(PeriodicStatsReporterTest, s3MetricsReporting) {
#ifdef VELOX_ENABLE_S3
auto s3Metrics = filesystems::S3MetricsAggregator::getInstance();
ASSERT_NE(s3Metrics, nullptr) << "S3MetricsAggregator instance is null!";

// Reset all metrics before starting the test
s3Metrics->resetAllMetrics();

PeriodicStatsReporter::Options options;
options.s3Metrics = s3Metrics;
options.s3MetricsIntervalMs = 1000;

PeriodicStatsReporter periodicReporter(options);
periodicReporter.start();

std::this_thread::sleep_for(std::chrono::milliseconds(1100));

// Increment S3 metrics by calling incrementMetric multiple times
s3Metrics->incrementMetric(filesystems::kMetricS3ActiveConnections);
s3Metrics->incrementMetric(filesystems::kMetricS3StartedUploads);
// Increment again for 2
s3Metrics->incrementMetric(filesystems::kMetricS3StartedUploads);
s3Metrics->incrementMetric(filesystems::kMetricS3FailedUploads);
s3Metrics->incrementMetric(filesystems::kMetricS3SuccessfulUploads);
// Increment again for 2
s3Metrics->incrementMetric(filesystems::kMetricS3SuccessfulUploads);
// Increment again for 3
s3Metrics->incrementMetric(filesystems::kMetricS3SuccessfulUploads);

std::this_thread::sleep_for(std::chrono::milliseconds(1500));

periodicReporter.stop();

const auto& counterMap = reporter_->counterMap;

ASSERT_EQ(counterMap.count(filesystems::kMetricS3ActiveConnections), 1);
ASSERT_EQ(counterMap.count(filesystems::kMetricS3StartedUploads), 1);
ASSERT_EQ(counterMap.count(filesystems::kMetricS3FailedUploads), 1);
ASSERT_EQ(counterMap.count(filesystems::kMetricS3SuccessfulUploads), 1);
ASSERT_EQ(counterMap.at(filesystems::kMetricS3ActiveConnections), 1);
ASSERT_EQ(counterMap.at(filesystems::kMetricS3StartedUploads), 2);
ASSERT_EQ(counterMap.at(filesystems::kMetricS3FailedUploads), 1);
ASSERT_EQ(counterMap.at(filesystems::kMetricS3SuccessfulUploads), 3);
#endif
}

// Registering to folly Singleton with intended reporter type
folly::Singleton<BaseStatsReporter> reporter([]() {
return new TestReporter();
Expand Down
15 changes: 14 additions & 1 deletion velox/connectors/hive/storage_adapters/s3fs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,26 @@

# for generated headers

# Create a new library for S3MetricsAggregator to avoid cyclic dependency
add_library(velox_s3_metrics_aggregator STATIC
S3MetricsAggregator.cpp
)

target_include_directories(velox_s3_metrics_aggregator PUBLIC ${AWSSDK_INCLUDE_DIRS})

target_link_libraries(velox_s3_metrics_aggregator
PUBLIC Folly::folly
PRIVATE glog::glog
)

add_library(velox_s3fs RegisterS3FileSystem.cpp)
if(VELOX_ENABLE_S3)
target_sources(velox_s3fs PRIVATE S3FileSystem.cpp S3Util.cpp)

target_include_directories(velox_s3fs PUBLIC ${AWSSDK_INCLUDE_DIRS})

target_link_libraries(velox_s3fs velox_dwio_common Folly::folly
${AWSSDK_LIBRARIES})
${AWSSDK_LIBRARIES} velox_s3_metrics_aggregator)

if(${VELOX_BUILD_TESTING})
add_subdirectory(tests)
Expand Down
28 changes: 28 additions & 0 deletions velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
#include "velox/core/QueryConfig.h"
#include "velox/dwio/common/DataBuffer.h"

// Include the S3 metrics headers
#include "velox/connectors/hive/storage_adapters/s3fs/S3Metrics.h"
#include "velox/connectors/hive/storage_adapters/s3fs/S3MetricsAggregator.h"

#include <fmt/format.h>
#include <glog/logging.h>
#include <memory>
Expand Down Expand Up @@ -102,6 +106,10 @@ class S3ReadFile final : public ReadFile {
outcome, "Failed to get metadata for S3 object", bucket_, key_);
length_ = outcome.GetResult().GetContentLength();
VELOX_CHECK_GE(length_, 0);

// Increment the metadata call metric
filesystems::S3MetricsAggregator::getInstance()->incrementMetric(
filesystems::kMetricS3MetadataCalls);
}

std::string_view pread(uint64_t offset, uint64_t length, void* buffer)
Expand Down Expand Up @@ -182,6 +190,10 @@ class S3ReadFile final : public ReadFile {
AwsWriteableStreamFactory(position, length));
auto outcome = client_->GetObject(request);
VELOX_CHECK_AWS_OUTCOME(outcome, "Failed to get S3 object", bucket_, key_);

// Increment the get object metric
filesystems::S3MetricsAggregator::getInstance()->incrementMetric(
filesystems::kMetricS3ListObjectsCalls);
}

Aws::S3::S3Client* client_;
Expand Down Expand Up @@ -271,6 +283,10 @@ class S3WriteFile::Impl {
}

fileSize_ = 0;

// Increment the started uploads metric
filesystems::S3MetricsAggregator::getInstance()->incrementMetric(
filesystems::kMetricS3StartedUploads);
}

// Appends data to the end of the file.
Expand Down Expand Up @@ -315,6 +331,10 @@ class S3WriteFile::Impl {
outcome, "Failed to complete multiple part upload", bucket_, key_);
}
currentPart_->clear();

// Increment the successful uploads metric
filesystems::S3MetricsAggregator::getInstance()->incrementMetric(
filesystems::kMetricS3SuccessfulUploads);
}

// Current file size, i.e. the sum of all previous appends.
Expand Down Expand Up @@ -576,11 +596,19 @@ class S3FileSystem::Impl {
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
hiveConfig_->s3UseVirtualAddressing());
++fileSystemCount;

// Increment the active connections metric
filesystems::S3MetricsAggregator::getInstance()->incrementMetric(
filesystems::kMetricS3ActiveConnections);
}

~Impl() {
client_.reset();
--fileSystemCount;

// Decrement the active connections metric
filesystems::S3MetricsAggregator::getInstance()->incrementMetric(
filesystems::kMetricS3ActiveConnections);
}

// Configure and return an AWSCredentialsProvider with access key and secret
Expand Down
Loading

0 comments on commit 1226bdf

Please sign in to comment.