Skip to content

Commit

Permalink
feat: Add storage stats into IoStatistics
Browse files Browse the repository at this point in the history
  • Loading branch information
kewang1024 committed Jan 25, 2025
1 parent 9002fc9 commit 115951c
Show file tree
Hide file tree
Showing 23 changed files with 93 additions and 33 deletions.
47 changes: 37 additions & 10 deletions velox/common/caching/CachedFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ template <
typename Value,
typename Generator,
typename Properties = void,
typename Stats = void,
typename Sizer = DefaultSizer<Value>,
typename Comparator = std::equal_to<Key>,
typename Hash = std::hash<Key>>
Expand Down Expand Up @@ -178,7 +179,8 @@ class CachedFactory {
/// will probably mess with your memory model, so really try to avoid it.
CachedPtr<Key, Value, Comparator, Hash> generate(
const Key& key,
const Properties* properties = nullptr);
const Properties* properties = nullptr,
Stats* ioStats = nullptr);

/// Looks up the cache entry of the given key if it exists, otherwise returns
/// null.
Expand Down Expand Up @@ -358,17 +360,25 @@ template <
typename Value,
typename Generator,
typename Properties,
typename Stats,
typename Sizer,
typename Comparator,
typename Hash>
CachedPtr<Key, Value, Comparator, Hash>
CachedFactory<Key, Value, Generator, Properties, Sizer, Comparator, Hash>::
generate(const Key& key, const Properties* properties) {
CachedPtr<Key, Value, Comparator, Hash> CachedFactory<
Key,
Value,
Generator,
Properties,
Stats,
Sizer,
Comparator,
Hash>::
generate(const Key& key, const Properties* properties, Stats* stats) {
process::TraceContext trace("CachedFactory::generate");
if (cache_ == nullptr) {
return CachedPtr<Key, Value, Comparator, Hash>{
/*fromCache=*/false,
(*generator_)(key, properties).release(),
(*generator_)(key, properties, stats).release(),
nullptr,
std::make_unique<Key>(key)};
}
Expand Down Expand Up @@ -408,7 +418,8 @@ CachedFactory<Key, Value, Generator, Properties, Sizer, Comparator, Hash>::
pendingCv_.notify_all();
};

std::unique_ptr<Value> generatedValue = (*generator_)(key, properties);
std::unique_ptr<Value> generatedValue =
(*generator_)(key, properties, stats);
const uint64_t valueSize = Sizer()(*generatedValue);
Value* rawValue = generatedValue.release();
const bool inserted = addCache(key, rawValue, valueSize);
Expand All @@ -433,12 +444,19 @@ template <
typename Value,
typename Generator,
typename Properties,
typename Stats,
typename Sizer,
typename Comparator,
typename Hash>
CachedPtr<Key, Value, Comparator, Hash>
CachedFactory<Key, Value, Generator, Properties, Sizer, Comparator, Hash>::get(
const Key& key) {
CachedPtr<Key, Value, Comparator, Hash> CachedFactory<
Key,
Value,
Generator,
Properties,
Stats,
Sizer,
Comparator,
Hash>::get(const Key& key) {
if (cache_ == nullptr) {
return {};
}
Expand All @@ -460,10 +478,19 @@ template <
typename Value,
typename Generator,
typename Properties,
typename Stats,
typename Sizer,
typename Comparator,
typename Hash>
void CachedFactory<Key, Value, Generator, Properties, Sizer, Comparator, Hash>::
void CachedFactory<
Key,
Value,
Generator,
Properties,
Stats,
Sizer,
Comparator,
Hash>::
retrieveCached(
const std::vector<Key>& keys,
std::vector<std::pair<Key, CachedPtr<Key, Value, Comparator, Hash>>>&
Expand Down
9 changes: 6 additions & 3 deletions velox/common/caching/tests/CachedFactoryTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ namespace {
struct DoublerGenerator {
std::unique_ptr<int> operator()(
const int& value,
const void* properties = nullptr) {
const void* properties = nullptr,
const void* stats = nullptr) {
++generated;
return std::make_unique<int>(value * 2);
}
Expand All @@ -40,7 +41,8 @@ struct DoublerGenerator {
struct IdentityGenerator {
std::unique_ptr<int> operator()(
const int& value,
const void* properties = nullptr) {
const void* properties = nullptr,
const void* stats = nullptr) {
return std::make_unique<int>(value);
}
};
Expand Down Expand Up @@ -113,7 +115,8 @@ TEST(CachedFactoryTest, basicGeneration) {
struct DoublerWithExceptionsGenerator {
std::unique_ptr<int> operator()(
const int& value,
const void* properties = nullptr) {
const void* properties = nullptr,
const void* stats = nullptr) {
if (value == 3) {
VELOX_FAIL("3 is bad");
}
Expand Down
3 changes: 2 additions & 1 deletion velox/common/file/FileSystems.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ class LocalFileSystem : public FileSystem {

std::unique_ptr<ReadFile> openFileForRead(
std::string_view path,
const FileOptions& options) override {
const FileOptions& options,
io::IoStatistics* ioStats) override {
return std::make_unique<LocalReadFile>(
extractPath(path), executor_.get(), options.bufferIo);
}
Expand Down
4 changes: 3 additions & 1 deletion velox/common/file/FileSystems.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#pragma once

#include "velox/common/base/Exceptions.h"
#include "velox/common/io/IoStatistics.h"
#include "velox/common/memory/MemoryPool.h"

#include <functional>
Expand Down Expand Up @@ -103,7 +104,8 @@ class FileSystem {
/// Returns a ReadFile handle for a given file path
virtual std::unique_ptr<ReadFile> openFileForRead(
std::string_view path,
const FileOptions& options = {}) = 0;
const FileOptions& options = {},
io::IoStatistics* ioStats = {}) = 0;

/// Returns a WriteFile handle for a given file path
virtual std::unique_ptr<WriteFile> openFileForWrite(
Expand Down
3 changes: 2 additions & 1 deletion velox/common/file/tests/FaultyFileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ fileSystemGenerator() {

std::unique_ptr<ReadFile> FaultyFileSystem::openFileForRead(
std::string_view path,
const FileOptions& options) {
const FileOptions& options,
io::IoStatistics* ioStats) {
const std::string delegatedPath = std::string(extractPath(path));
auto delegatedFile = getFileSystem(delegatedPath, config_)
->openFileForRead(delegatedPath, options);
Expand Down
3 changes: 2 additions & 1 deletion velox/common/file/tests/FaultyFileSystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ class FaultyFileSystem : public FileSystem {

std::unique_ptr<ReadFile> openFileForRead(
std::string_view path,
const FileOptions& options = {}) override;
const FileOptions& options = {},
io::IoStatistics* ioStats = {}) override;

std::unique_ptr<WriteFile> openFileForWrite(
std::string_view path,
Expand Down
5 changes: 5 additions & 0 deletions velox/common/io/IoStatistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ IoStatistics::operationStats() const {
return operationStats_;
}

std::unordered_map<std::string, RuntimeCounter>* IoStatistics::storageStats() {
std::lock_guard<std::mutex> lock{storageStatsMutex_};
return &storageStats_;
}

void IoStatistics::merge(const IoStatistics& other) {
rawBytesRead_ += other.rawBytesRead_;
rawBytesWritten_ += other.rawBytesWritten_;
Expand Down
4 changes: 4 additions & 0 deletions velox/common/io/IoStatistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <unordered_map>

#include <folly/dynamic.h>
#include "velox/common/base/RuntimeMetrics.h"

namespace facebook::velox::io {

Expand Down Expand Up @@ -140,6 +141,7 @@ class IoStatistics {
const uint64_t partialThrottleCount = 0);

std::unordered_map<std::string, OperationCounters> operationStats() const;
std::unordered_map<std::string, RuntimeCounter>* storageStats();

void merge(const IoStatistics& other);

Expand Down Expand Up @@ -172,7 +174,9 @@ class IoStatistics {
IoCounter queryThreadIoLatency_;

std::unordered_map<std::string, OperationCounters> operationStats_;
std::unordered_map<std::string, RuntimeCounter> storageStats_;
mutable std::mutex operationStatsMutex_;
mutable std::mutex storageStatsMutex_;
};

} // namespace facebook::velox::io
5 changes: 3 additions & 2 deletions velox/connectors/hive/FileHandle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ std::string groupName(const std::string& filename) {

std::unique_ptr<FileHandle> FileHandleGenerator::operator()(
const std::string& filename,
const FileProperties* properties) {
const FileProperties* properties,
io::IoStatistics* ioStats) {
// We have seen cases where drivers are stuck when creating file handles.
// Adding a trace here to spot this more easily in future.
process::TraceContext trace("FileHandleGenerator::operator()");
Expand All @@ -55,7 +56,7 @@ std::unique_ptr<FileHandle> FileHandleGenerator::operator()(
options.fileSize = properties->fileSize;
}
fileHandle->file = filesystems::getFileSystem(filename, properties_)
->openFileForRead(filename, options);
->openFileForRead(filename, options, ioStats);
fileHandle->uuid = StringIdLease(fileIds(), filename);
fileHandle->groupId = StringIdLease(fileIds(), groupName(filename));
VLOG(1) << "Generating file handle for: " << filename
Expand Down
5 changes: 4 additions & 1 deletion velox/connectors/hive/FileHandle.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "velox/common/caching/FileIds.h"
#include "velox/common/config/Config.h"
#include "velox/common/file/File.h"
#include "velox/common/io/IoStatistics.h"
#include "velox/connectors/hive/FileProperties.h"

namespace facebook::velox {
Expand Down Expand Up @@ -69,7 +70,8 @@ class FileHandleGenerator {
: properties_(std::move(properties)) {}
std::unique_ptr<FileHandle> operator()(
const std::string& filename,
const FileProperties* properties);
const FileProperties* properties,
io::IoStatistics* ioStats);

private:
const std::shared_ptr<const config::ConfigBase> properties_;
Expand All @@ -80,6 +82,7 @@ using FileHandleFactory = CachedFactory<
FileHandle,
FileHandleGenerator,
FileProperties,
io::IoStatistics,
FileHandleSizer>;

using FileHandleCachedPtr = CachedPtr<std::string, FileHandle>;
Expand Down
3 changes: 3 additions & 0 deletions velox/connectors/hive/HiveDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,9 @@ std::unordered_map<std::string, RuntimeCounter> HiveDataSource::runtimeStats() {
if (numBucketConversion_ > 0) {
res.insert({"numBucketConversion", RuntimeCounter(numBucketConversion_)});
}
for (const auto& storageStats : *ioStats_->storageStats()) {
res.insert({storageStats.first, storageStats.second});
}
return res;
}

Expand Down
4 changes: 2 additions & 2 deletions velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,8 @@ RowTypePtr SplitReader::createReader() {
try {
fileHandleCachePtr = fileHandleFactory_->generate(
hiveSplit_->filePath,
hiveSplit_->properties.has_value() ? &*hiveSplit_->properties
: nullptr);
hiveSplit_->properties.has_value() ? &*hiveSplit_->properties : nullptr,
ioStats_.get());
VELOX_CHECK_NOT_NULL(fileHandleCachePtr.get());
} catch (const VeloxRuntimeError& e) {
if (e.errorCode() == error_code::kFileNotFound &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ std::string AbfsFileSystem::name() const {

std::unique_ptr<ReadFile> AbfsFileSystem::openFileForRead(
std::string_view path,
const FileOptions& options) {
const FileOptions& options,
io::IoStatistics* ioStats) {
auto abfsfile = std::make_unique<AbfsReadFile>(path, *config_);
abfsfile->initialize(options);
return abfsfile;
Expand Down
3 changes: 2 additions & 1 deletion velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ class AbfsFileSystem : public FileSystem {

std::unique_ptr<ReadFile> openFileForRead(
std::string_view path,
const FileOptions& options = {}) override;
const FileOptions& options = {},
io::IoStatistics* ioStats = {}) override;

std::unique_ptr<WriteFile> openFileForWrite(
std::string_view path,
Expand Down
3 changes: 2 additions & 1 deletion velox/connectors/hive/storage_adapters/gcs/GcsFileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,8 @@ void GcsFileSystem::initializeClient() {

std::unique_ptr<ReadFile> GcsFileSystem::openFileForRead(
std::string_view path,
const FileOptions& options) {
const FileOptions& options,
io::IoStatistics* ioStats) {
const auto gcspath = gcsPath(path);
auto gcsfile = std::make_unique<GcsReadFile>(gcspath, impl_->getClient());
gcsfile->initialize(options);
Expand Down
3 changes: 2 additions & 1 deletion velox/connectors/hive/storage_adapters/gcs/GcsFileSystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ class GcsFileSystem : public FileSystem {
/// [[https://cloud.google.com/storage/docs/samples/storage-stream-file-download]].
std::unique_ptr<ReadFile> openFileForRead(
std::string_view path,
const FileOptions& options = {}) override;
const FileOptions& options = {},
io::IoStatistics* ioStats = {}) override;

/// Initialize a WriteFile
/// First the method google::cloud::storage::Client::GetObjectMetadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ std::string HdfsFileSystem::name() const {

std::unique_ptr<ReadFile> HdfsFileSystem::openFileForRead(
std::string_view path,
const FileOptions& /*unused*/) {
const FileOptions& /*unused*/,
io::IoStatistics* ioStats) {
// Only remove the schema for hdfs path.
if (path.find(kScheme) == 0) {
path.remove_prefix(kScheme.length());
Expand Down
3 changes: 2 additions & 1 deletion velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ class HdfsFileSystem : public FileSystem {

std::unique_ptr<ReadFile> openFileForRead(
std::string_view path,
const FileOptions& options = {}) override;
const FileOptions& options = {},
io::IoStatistics* ioStats = {}) override;

std::unique_ptr<WriteFile> openFileForWrite(
std::string_view path,
Expand Down
3 changes: 2 additions & 1 deletion velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,8 @@ std::string S3FileSystem::getLogLevelName() const {

std::unique_ptr<ReadFile> S3FileSystem::openFileForRead(
std::string_view s3Path,
const FileOptions& options) {
const FileOptions& options,
io::IoStatistics* ioStats) {
const auto path = getPath(s3Path);
auto s3file = std::make_unique<S3ReadFile>(path, impl_->s3Client());
s3file->initialize(options);
Expand Down
3 changes: 2 additions & 1 deletion velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ class S3FileSystem : public FileSystem {

std::unique_ptr<ReadFile> openFileForRead(
std::string_view s3Path,
const FileOptions& options = {}) override;
const FileOptions& options = {},
io::IoStatistics* ioStats = {}) override;

std::unique_ptr<WriteFile> openFileForWrite(
std::string_view s3Path,
Expand Down
1 change: 1 addition & 0 deletions velox/dwio/common/Throttler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ uint64_t Throttler::calculateBackoffDurationAndUpdateThrottleCache(
std::unique_ptr<Throttler::ThrottleSignal>
Throttler::ThrottleSignalGenerator::operator()(
const std::string& /*unused*/,
const void* /*unused*/,
const void* /*unused*/) {
return std::unique_ptr<ThrottleSignal>(new ThrottleSignal{1});
}
Expand Down
1 change: 1 addition & 0 deletions velox/dwio/common/Throttler.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ class Throttler {

std::unique_ptr<ThrottleSignal> operator()(
const std::string& /*unused*/,
const void* /*unused*/,
const void* /*unused*/);
};

Expand Down
5 changes: 2 additions & 3 deletions velox/experimental/wave/common/KernelCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,8 @@ class AsyncCompiledKernel : public CompiledKernel {

class KernelGenerator {
public:
std::unique_ptr<ModulePtr> operator()(
const std::string,
const KernelGenFunc* gen) {
std::unique_ptr<ModulePtr>
operator()(const std::string, const KernelGenFunc* gen, const void* stats) {
using ModulePromise = folly::Promise<ModulePtr>;
struct PromiseHolder {
ModulePromise promise;
Expand Down

0 comments on commit 115951c

Please sign in to comment.