Skip to content

Commit

Permalink
feat: Add storage stats into IoStatistics
Browse files Browse the repository at this point in the history
  • Loading branch information
kewang1024 committed Jan 25, 2025
1 parent 9002fc9 commit 81fa1ee
Show file tree
Hide file tree
Showing 23 changed files with 113 additions and 33 deletions.
46 changes: 36 additions & 10 deletions velox/common/caching/CachedFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ template <
typename Value,
typename Generator,
typename Properties = void,
typename Stats = void,
typename Sizer = DefaultSizer<Value>,
typename Comparator = std::equal_to<Key>,
typename Hash = std::hash<Key>>
Expand Down Expand Up @@ -178,7 +179,8 @@ class CachedFactory {
/// will probably mess with your memory model, so really try to avoid it.
CachedPtr<Key, Value, Comparator, Hash> generate(
const Key& key,
const Properties* properties = nullptr);
const Properties* properties = nullptr,
Stats* ioStats = nullptr);

/// Looks up the cache entry of the given key if it exists, otherwise returns
/// null.
Expand Down Expand Up @@ -358,17 +360,25 @@ template <
typename Value,
typename Generator,
typename Properties,
typename Stats,
typename Sizer,
typename Comparator,
typename Hash>
CachedPtr<Key, Value, Comparator, Hash>
CachedFactory<Key, Value, Generator, Properties, Sizer, Comparator, Hash>::
generate(const Key& key, const Properties* properties) {
CachedPtr<Key, Value, Comparator, Hash> CachedFactory<
Key,
Value,
Generator,
Properties,
Stats,
Sizer,
Comparator,
Hash>::
generate(const Key& key, const Properties* properties, Stats* stats) {
process::TraceContext trace("CachedFactory::generate");
if (cache_ == nullptr) {
return CachedPtr<Key, Value, Comparator, Hash>{
/*fromCache=*/false,
(*generator_)(key, properties).release(),
(*generator_)(key, properties, stats).release(),
nullptr,
std::make_unique<Key>(key)};
}
Expand Down Expand Up @@ -408,7 +418,7 @@ CachedFactory<Key, Value, Generator, Properties, Sizer, Comparator, Hash>::
pendingCv_.notify_all();
};

std::unique_ptr<Value> generatedValue = (*generator_)(key, properties);
std::unique_ptr<Value> generatedValue = (*generator_)(key, properties, stats);
const uint64_t valueSize = Sizer()(*generatedValue);
Value* rawValue = generatedValue.release();
const bool inserted = addCache(key, rawValue, valueSize);
Expand All @@ -433,12 +443,19 @@ template <
typename Value,
typename Generator,
typename Properties,
typename Stats,
typename Sizer,
typename Comparator,
typename Hash>
CachedPtr<Key, Value, Comparator, Hash>
CachedFactory<Key, Value, Generator, Properties, Sizer, Comparator, Hash>::get(
const Key& key) {
CachedPtr<Key, Value, Comparator, Hash> CachedFactory<
Key,
Value,
Generator,
Properties,
Stats,
Sizer,
Comparator,
Hash>::get(const Key& key) {
if (cache_ == nullptr) {
return {};
}
Expand All @@ -460,10 +477,19 @@ template <
typename Value,
typename Generator,
typename Properties,
typename Stats,
typename Sizer,
typename Comparator,
typename Hash>
void CachedFactory<Key, Value, Generator, Properties, Sizer, Comparator, Hash>::
void CachedFactory<
Key,
Value,
Generator,
Properties,
Stats,
Sizer,
Comparator,
Hash>::
retrieveCached(
const std::vector<Key>& keys,
std::vector<std::pair<Key, CachedPtr<Key, Value, Comparator, Hash>>>&
Expand Down
9 changes: 6 additions & 3 deletions velox/common/caching/tests/CachedFactoryTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ namespace {
struct DoublerGenerator {
std::unique_ptr<int> operator()(
const int& value,
const void* properties = nullptr) {
const void* properties = nullptr,
const void* stats = nullptr) {
++generated;
return std::make_unique<int>(value * 2);
}
Expand All @@ -40,7 +41,8 @@ struct DoublerGenerator {
struct IdentityGenerator {
std::unique_ptr<int> operator()(
const int& value,
const void* properties = nullptr) {
const void* properties = nullptr,
const void* stats = nullptr) {
return std::make_unique<int>(value);
}
};
Expand Down Expand Up @@ -113,7 +115,8 @@ TEST(CachedFactoryTest, basicGeneration) {
struct DoublerWithExceptionsGenerator {
std::unique_ptr<int> operator()(
const int& value,
const void* properties = nullptr) {
const void* properties = nullptr,
const void* stats = nullptr) {
if (value == 3) {
VELOX_FAIL("3 is bad");
}
Expand Down
3 changes: 2 additions & 1 deletion velox/common/file/FileSystems.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ class LocalFileSystem : public FileSystem {

std::unique_ptr<ReadFile> openFileForRead(
std::string_view path,
const FileOptions& options) override {
const FileOptions& options,
io::IoStatistics* ioStats) override {
return std::make_unique<LocalReadFile>(
extractPath(path), executor_.get(), options.bufferIo);
}
Expand Down
4 changes: 3 additions & 1 deletion velox/common/file/FileSystems.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#pragma once

#include "velox/common/base/Exceptions.h"
#include "velox/common/io/IoStatistics.h"
#include "velox/common/memory/MemoryPool.h"

#include <functional>
Expand Down Expand Up @@ -103,7 +104,8 @@ class FileSystem {
/// Returns a ReadFile handle for a given file path
virtual std::unique_ptr<ReadFile> openFileForRead(
std::string_view path,
const FileOptions& options = {}) = 0;
const FileOptions& options = {},
io::IoStatistics* ioStats = {}) = 0;

/// Returns a WriteFile handle for a given file path
virtual std::unique_ptr<WriteFile> openFileForWrite(
Expand Down
3 changes: 2 additions & 1 deletion velox/common/file/tests/FaultyFileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ fileSystemGenerator() {

std::unique_ptr<ReadFile> FaultyFileSystem::openFileForRead(
std::string_view path,
const FileOptions& options) {
const FileOptions& options,
io::IoStatistics* ioStats) {
const std::string delegatedPath = std::string(extractPath(path));
auto delegatedFile = getFileSystem(delegatedPath, config_)
->openFileForRead(delegatedPath, options);
Expand Down
3 changes: 2 additions & 1 deletion velox/common/file/tests/FaultyFileSystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ class FaultyFileSystem : public FileSystem {

std::unique_ptr<ReadFile> openFileForRead(
std::string_view path,
const FileOptions& options = {}) override;
const FileOptions& options = {},
io::IoStatistics* ioStats = {}) override;

std::unique_ptr<WriteFile> openFileForWrite(
std::string_view path,
Expand Down
23 changes: 23 additions & 0 deletions velox/common/io/IoStatistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,24 @@ IoStatistics::operationStats() const {
return operationStats_;
}

std::unordered_map<std::string, RuntimeMetric> IoStatistics::storageStats()
const {
std::lock_guard<std::mutex> lock{storageStatsMutex_};
return storageStats_;
}

void IoStatistics::addStorageStats(
const std::string& name,
const RuntimeCounter& counter) {
std::lock_guard<std::mutex> lock{storageStatsMutex_};
if (storageStats_.count(name) == 0) {
storageStats_.emplace(name, RuntimeMetric(counter.unit));
} else {
VELOX_CHECK_EQ(storageStats_.at(name).unit, counter.unit);
}
storageStats_.at(name).addValue(counter.value);
}

void IoStatistics::merge(const IoStatistics& other) {
rawBytesRead_ += other.rawBytesRead_;
rawBytesWritten_ += other.rawBytesWritten_;
Expand All @@ -123,6 +141,11 @@ void IoStatistics::merge(const IoStatistics& other) {
for (auto& item : other.operationStats_) {
operationStats_[item.first].merge(item.second);
}

std::lock_guard<std::mutex> l2(storageStatsMutex_);
for (auto& item : other.storageStats_) {
storageStats_[item.first].merge(item.second);
}
}

void OperationCounters::merge(const OperationCounters& other) {
Expand Down
7 changes: 7 additions & 0 deletions velox/common/io/IoStatistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include <unordered_map>

#include <folly/dynamic.h>
#include "velox/common/base/Exceptions.h"
#include "velox/common/base/RuntimeMetrics.h"

namespace facebook::velox::io {

Expand Down Expand Up @@ -140,6 +142,9 @@ class IoStatistics {
const uint64_t partialThrottleCount = 0);

std::unordered_map<std::string, OperationCounters> operationStats() const;
std::unordered_map<std::string, RuntimeMetric> storageStats() const;

void addStorageStats(const std::string& name, const RuntimeCounter& counter);

void merge(const IoStatistics& other);

Expand Down Expand Up @@ -172,7 +177,9 @@ class IoStatistics {
IoCounter queryThreadIoLatency_;

std::unordered_map<std::string, OperationCounters> operationStats_;
std::unordered_map<std::string, RuntimeMetric> storageStats_;
mutable std::mutex operationStatsMutex_;
mutable std::mutex storageStatsMutex_;
};

} // namespace facebook::velox::io
5 changes: 3 additions & 2 deletions velox/connectors/hive/FileHandle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ std::string groupName(const std::string& filename) {

std::unique_ptr<FileHandle> FileHandleGenerator::operator()(
const std::string& filename,
const FileProperties* properties) {
const FileProperties* properties,
io::IoStatistics* ioStats) {
// We have seen cases where drivers are stuck when creating file handles.
// Adding a trace here to spot this more easily in future.
process::TraceContext trace("FileHandleGenerator::operator()");
Expand All @@ -55,7 +56,7 @@ std::unique_ptr<FileHandle> FileHandleGenerator::operator()(
options.fileSize = properties->fileSize;
}
fileHandle->file = filesystems::getFileSystem(filename, properties_)
->openFileForRead(filename, options);
->openFileForRead(filename, options, ioStats);
fileHandle->uuid = StringIdLease(fileIds(), filename);
fileHandle->groupId = StringIdLease(fileIds(), groupName(filename));
VLOG(1) << "Generating file handle for: " << filename
Expand Down
5 changes: 4 additions & 1 deletion velox/connectors/hive/FileHandle.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "velox/common/caching/FileIds.h"
#include "velox/common/config/Config.h"
#include "velox/common/file/File.h"
#include "velox/common/io/IoStatistics.h"
#include "velox/connectors/hive/FileProperties.h"

namespace facebook::velox {
Expand Down Expand Up @@ -69,7 +70,8 @@ class FileHandleGenerator {
: properties_(std::move(properties)) {}
std::unique_ptr<FileHandle> operator()(
const std::string& filename,
const FileProperties* properties);
const FileProperties* properties,
io::IoStatistics* ioStats);

private:
const std::shared_ptr<const config::ConfigBase> properties_;
Expand All @@ -80,6 +82,7 @@ using FileHandleFactory = CachedFactory<
FileHandle,
FileHandleGenerator,
FileProperties,
io::IoStatistics,
FileHandleSizer>;

using FileHandleCachedPtr = CachedPtr<std::string, FileHandle>;
Expand Down
3 changes: 3 additions & 0 deletions velox/connectors/hive/HiveDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,9 @@ std::unordered_map<std::string, RuntimeCounter> HiveDataSource::runtimeStats() {
if (numBucketConversion_ > 0) {
res.insert({"numBucketConversion", RuntimeCounter(numBucketConversion_)});
}
for (const auto& storageStats : ioStats_->storageStats()) {
res.emplace(storageStats.first, storageStats.second.count);
}
return res;
}

Expand Down
4 changes: 2 additions & 2 deletions velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,8 @@ RowTypePtr SplitReader::createReader() {
try {
fileHandleCachePtr = fileHandleFactory_->generate(
hiveSplit_->filePath,
hiveSplit_->properties.has_value() ? &*hiveSplit_->properties
: nullptr);
hiveSplit_->properties.has_value() ? &*hiveSplit_->properties : nullptr,
ioStats_.get());
VELOX_CHECK_NOT_NULL(fileHandleCachePtr.get());
} catch (const VeloxRuntimeError& e) {
if (e.errorCode() == error_code::kFileNotFound &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ std::string AbfsFileSystem::name() const {

std::unique_ptr<ReadFile> AbfsFileSystem::openFileForRead(
std::string_view path,
const FileOptions& options) {
const FileOptions& options,
io::IoStatistics* ioStats) {
auto abfsfile = std::make_unique<AbfsReadFile>(path, *config_);
abfsfile->initialize(options);
return abfsfile;
Expand Down
3 changes: 2 additions & 1 deletion velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ class AbfsFileSystem : public FileSystem {

std::unique_ptr<ReadFile> openFileForRead(
std::string_view path,
const FileOptions& options = {}) override;
const FileOptions& options = {},
io::IoStatistics* ioStats = {}) override;

std::unique_ptr<WriteFile> openFileForWrite(
std::string_view path,
Expand Down
3 changes: 2 additions & 1 deletion velox/connectors/hive/storage_adapters/gcs/GcsFileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,8 @@ void GcsFileSystem::initializeClient() {

std::unique_ptr<ReadFile> GcsFileSystem::openFileForRead(
std::string_view path,
const FileOptions& options) {
const FileOptions& options,
io::IoStatistics* ioStats) {
const auto gcspath = gcsPath(path);
auto gcsfile = std::make_unique<GcsReadFile>(gcspath, impl_->getClient());
gcsfile->initialize(options);
Expand Down
3 changes: 2 additions & 1 deletion velox/connectors/hive/storage_adapters/gcs/GcsFileSystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ class GcsFileSystem : public FileSystem {
/// [[https://cloud.google.com/storage/docs/samples/storage-stream-file-download]].
std::unique_ptr<ReadFile> openFileForRead(
std::string_view path,
const FileOptions& options = {}) override;
const FileOptions& options = {},
io::IoStatistics* ioStats = {}) override;

/// Initialize a WriteFile
/// First the method google::cloud::storage::Client::GetObjectMetadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ std::string HdfsFileSystem::name() const {

std::unique_ptr<ReadFile> HdfsFileSystem::openFileForRead(
std::string_view path,
const FileOptions& /*unused*/) {
const FileOptions& /*unused*/,
io::IoStatistics* ioStats) {
// Only remove the schema for hdfs path.
if (path.find(kScheme) == 0) {
path.remove_prefix(kScheme.length());
Expand Down
3 changes: 2 additions & 1 deletion velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ class HdfsFileSystem : public FileSystem {

std::unique_ptr<ReadFile> openFileForRead(
std::string_view path,
const FileOptions& options = {}) override;
const FileOptions& options = {},
io::IoStatistics* ioStats = {}) override;

std::unique_ptr<WriteFile> openFileForWrite(
std::string_view path,
Expand Down
3 changes: 2 additions & 1 deletion velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,8 @@ std::string S3FileSystem::getLogLevelName() const {

std::unique_ptr<ReadFile> S3FileSystem::openFileForRead(
std::string_view s3Path,
const FileOptions& options) {
const FileOptions& options,
io::IoStatistics* ioStats) {
const auto path = getPath(s3Path);
auto s3file = std::make_unique<S3ReadFile>(path, impl_->s3Client());
s3file->initialize(options);
Expand Down
3 changes: 2 additions & 1 deletion velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ class S3FileSystem : public FileSystem {

std::unique_ptr<ReadFile> openFileForRead(
std::string_view s3Path,
const FileOptions& options = {}) override;
const FileOptions& options = {},
io::IoStatistics* ioStats = {}) override;

std::unique_ptr<WriteFile> openFileForWrite(
std::string_view s3Path,
Expand Down
1 change: 1 addition & 0 deletions velox/dwio/common/Throttler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ uint64_t Throttler::calculateBackoffDurationAndUpdateThrottleCache(
std::unique_ptr<Throttler::ThrottleSignal>
Throttler::ThrottleSignalGenerator::operator()(
const std::string& /*unused*/,
const void* /*unused*/,
const void* /*unused*/) {
return std::unique_ptr<ThrottleSignal>(new ThrottleSignal{1});
}
Expand Down
Loading

0 comments on commit 81fa1ee

Please sign in to comment.