Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add IoStatistics in ReadFile to collect storage statistics #12160

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 36 additions & 10 deletions velox/common/caching/CachedFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ template <
typename Value,
typename Generator,
typename Properties = void,
typename Stats = void,
typename Sizer = DefaultSizer<Value>,
typename Comparator = std::equal_to<Key>,
typename Hash = std::hash<Key>>
Expand Down Expand Up @@ -178,7 +179,8 @@ class CachedFactory {
/// will probably mess with your memory model, so really try to avoid it.
CachedPtr<Key, Value, Comparator, Hash> generate(
const Key& key,
const Properties* properties = nullptr);
const Properties* properties = nullptr,
Stats* ioStats = nullptr);

/// Looks up the cache entry of the given key if it exists, otherwise returns
/// null.
Expand Down Expand Up @@ -358,17 +360,25 @@ template <
typename Value,
typename Generator,
typename Properties,
typename Stats,
typename Sizer,
typename Comparator,
typename Hash>
CachedPtr<Key, Value, Comparator, Hash>
CachedFactory<Key, Value, Generator, Properties, Sizer, Comparator, Hash>::
generate(const Key& key, const Properties* properties) {
CachedPtr<Key, Value, Comparator, Hash> CachedFactory<
Key,
Value,
Generator,
Properties,
Stats,
Sizer,
Comparator,
Hash>::
generate(const Key& key, const Properties* properties, Stats* stats) {
process::TraceContext trace("CachedFactory::generate");
if (cache_ == nullptr) {
return CachedPtr<Key, Value, Comparator, Hash>{
/*fromCache=*/false,
(*generator_)(key, properties).release(),
(*generator_)(key, properties, stats).release(),
nullptr,
std::make_unique<Key>(key)};
}
Expand Down Expand Up @@ -408,7 +418,7 @@ CachedFactory<Key, Value, Generator, Properties, Sizer, Comparator, Hash>::
pendingCv_.notify_all();
};

std::unique_ptr<Value> generatedValue = (*generator_)(key, properties);
std::unique_ptr<Value> generatedValue = (*generator_)(key, properties, stats);
const uint64_t valueSize = Sizer()(*generatedValue);
Value* rawValue = generatedValue.release();
const bool inserted = addCache(key, rawValue, valueSize);
Expand All @@ -433,12 +443,19 @@ template <
typename Value,
typename Generator,
typename Properties,
typename Stats,
typename Sizer,
typename Comparator,
typename Hash>
CachedPtr<Key, Value, Comparator, Hash>
CachedFactory<Key, Value, Generator, Properties, Sizer, Comparator, Hash>::get(
const Key& key) {
CachedPtr<Key, Value, Comparator, Hash> CachedFactory<
Key,
Value,
Generator,
Properties,
Stats,
Sizer,
Comparator,
Hash>::get(const Key& key) {
if (cache_ == nullptr) {
return {};
}
Expand All @@ -460,10 +477,19 @@ template <
typename Value,
typename Generator,
typename Properties,
typename Stats,
typename Sizer,
typename Comparator,
typename Hash>
void CachedFactory<Key, Value, Generator, Properties, Sizer, Comparator, Hash>::
void CachedFactory<
Key,
Value,
Generator,
Properties,
Stats,
Sizer,
Comparator,
Hash>::
retrieveCached(
const std::vector<Key>& keys,
std::vector<std::pair<Key, CachedPtr<Key, Value, Comparator, Hash>>>&
Expand Down
9 changes: 6 additions & 3 deletions velox/common/caching/tests/CachedFactoryTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ namespace {
struct DoublerGenerator {
std::unique_ptr<int> operator()(
const int& value,
const void* properties = nullptr) {
const void* properties = nullptr,
const void* stats = nullptr) {
++generated;
return std::make_unique<int>(value * 2);
}
Expand All @@ -40,7 +41,8 @@ struct DoublerGenerator {
struct IdentityGenerator {
std::unique_ptr<int> operator()(
const int& value,
const void* properties = nullptr) {
const void* properties = nullptr,
const void* stats = nullptr) {
return std::make_unique<int>(value);
}
};
Expand Down Expand Up @@ -113,7 +115,8 @@ TEST(CachedFactoryTest, basicGeneration) {
struct DoublerWithExceptionsGenerator {
std::unique_ptr<int> operator()(
const int& value,
const void* properties = nullptr) {
const void* properties = nullptr,
const void* stats = nullptr) {
if (value == 3) {
VELOX_FAIL("3 is bad");
}
Expand Down
3 changes: 2 additions & 1 deletion velox/common/file/FileSystems.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ class LocalFileSystem : public FileSystem {

std::unique_ptr<ReadFile> openFileForRead(
std::string_view path,
const FileOptions& options) override {
const FileOptions& options,
io::IoStatistics* ioStats) override {
return std::make_unique<LocalReadFile>(
extractPath(path), executor_.get(), options.bufferIo);
}
Expand Down
4 changes: 3 additions & 1 deletion velox/common/file/FileSystems.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#pragma once

#include "velox/common/base/Exceptions.h"
#include "velox/common/io/IoStatistics.h"
#include "velox/common/memory/MemoryPool.h"

#include <functional>
Expand Down Expand Up @@ -103,7 +104,8 @@ class FileSystem {
/// Returns a ReadFile handle for a given file path
virtual std::unique_ptr<ReadFile> openFileForRead(
std::string_view path,
const FileOptions& options = {}) = 0;
const FileOptions& options = {},
io::IoStatistics* ioStats = nullptr) = 0;

/// Returns a WriteFile handle for a given file path
virtual std::unique_ptr<WriteFile> openFileForWrite(
Expand Down
3 changes: 2 additions & 1 deletion velox/common/file/tests/FaultyFileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ fileSystemGenerator() {

std::unique_ptr<ReadFile> FaultyFileSystem::openFileForRead(
std::string_view path,
const FileOptions& options) {
const FileOptions& options,
io::IoStatistics* ioStats) {
const std::string delegatedPath = std::string(extractPath(path));
auto delegatedFile = getFileSystem(delegatedPath, config_)
->openFileForRead(delegatedPath, options);
Expand Down
3 changes: 2 additions & 1 deletion velox/common/file/tests/FaultyFileSystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ class FaultyFileSystem : public FileSystem {

std::unique_ptr<ReadFile> openFileForRead(
std::string_view path,
const FileOptions& options = {}) override;
const FileOptions& options = {},
io::IoStatistics* ioStats = nullptr) override;

std::unique_ptr<WriteFile> openFileForWrite(
std::string_view path,
Expand Down
23 changes: 23 additions & 0 deletions velox/common/io/IoStatistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,24 @@ IoStatistics::operationStats() const {
return operationStats_;
}

std::unordered_map<std::string, RuntimeMetric> IoStatistics::storageStats()
const {
std::lock_guard<std::mutex> lock{storageStatsMutex_};
return storageStats_;
}

void IoStatistics::addStorageStats(
const std::string& name,
const RuntimeCounter& counter) {
std::lock_guard<std::mutex> lock{storageStatsMutex_};
if (storageStats_.count(name) == 0) {
storageStats_.emplace(name, RuntimeMetric(counter.unit));
} else {
VELOX_CHECK_EQ(storageStats_.at(name).unit, counter.unit);
}
storageStats_.at(name).addValue(counter.value);
}

void IoStatistics::merge(const IoStatistics& other) {
rawBytesRead_ += other.rawBytesRead_;
rawBytesWritten_ += other.rawBytesWritten_;
Expand All @@ -123,6 +141,11 @@ void IoStatistics::merge(const IoStatistics& other) {
for (auto& item : other.operationStats_) {
operationStats_[item.first].merge(item.second);
}

std::lock_guard<std::mutex> l2(storageStatsMutex_);
for (auto& item : other.storageStats_) {
storageStats_[item.first].merge(item.second);
}
}

void OperationCounters::merge(const OperationCounters& other) {
Expand Down
7 changes: 7 additions & 0 deletions velox/common/io/IoStatistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include <unordered_map>

#include <folly/dynamic.h>
#include "velox/common/base/Exceptions.h"
#include "velox/common/base/RuntimeMetrics.h"

namespace facebook::velox::io {

Expand Down Expand Up @@ -140,6 +142,9 @@ class IoStatistics {
const uint64_t partialThrottleCount = 0);

std::unordered_map<std::string, OperationCounters> operationStats() const;
std::unordered_map<std::string, RuntimeMetric> storageStats() const;

void addStorageStats(const std::string& name, const RuntimeCounter& counter);

void merge(const IoStatistics& other);

Expand Down Expand Up @@ -172,7 +177,9 @@ class IoStatistics {
IoCounter queryThreadIoLatency_;

std::unordered_map<std::string, OperationCounters> operationStats_;
std::unordered_map<std::string, RuntimeMetric> storageStats_;
mutable std::mutex operationStatsMutex_;
mutable std::mutex storageStatsMutex_;
};

} // namespace facebook::velox::io
5 changes: 3 additions & 2 deletions velox/connectors/hive/FileHandle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ std::string groupName(const std::string& filename) {

std::unique_ptr<FileHandle> FileHandleGenerator::operator()(
const std::string& filename,
const FileProperties* properties) {
const FileProperties* properties,
io::IoStatistics* ioStats) {
// We have seen cases where drivers are stuck when creating file handles.
// Adding a trace here to spot this more easily in future.
process::TraceContext trace("FileHandleGenerator::operator()");
Expand All @@ -55,7 +56,7 @@ std::unique_ptr<FileHandle> FileHandleGenerator::operator()(
options.fileSize = properties->fileSize;
}
fileHandle->file = filesystems::getFileSystem(filename, properties_)
->openFileForRead(filename, options);
->openFileForRead(filename, options, ioStats);
fileHandle->uuid = StringIdLease(fileIds(), filename);
fileHandle->groupId = StringIdLease(fileIds(), groupName(filename));
VLOG(1) << "Generating file handle for: " << filename
Expand Down
5 changes: 4 additions & 1 deletion velox/connectors/hive/FileHandle.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "velox/common/caching/FileIds.h"
#include "velox/common/config/Config.h"
#include "velox/common/file/File.h"
#include "velox/common/io/IoStatistics.h"
#include "velox/connectors/hive/FileProperties.h"

namespace facebook::velox {
Expand Down Expand Up @@ -69,7 +70,8 @@ class FileHandleGenerator {
: properties_(std::move(properties)) {}
std::unique_ptr<FileHandle> operator()(
const std::string& filename,
const FileProperties* properties);
const FileProperties* properties,
io::IoStatistics* ioStats);

private:
const std::shared_ptr<const config::ConfigBase> properties_;
Expand All @@ -80,6 +82,7 @@ using FileHandleFactory = CachedFactory<
FileHandle,
FileHandleGenerator,
FileProperties,
io::IoStatistics,
FileHandleSizer>;

using FileHandleCachedPtr = CachedPtr<std::string, FileHandle>;
Expand Down
5 changes: 5 additions & 0 deletions velox/connectors/hive/HiveDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,11 @@ std::unordered_map<std::string, RuntimeCounter> HiveDataSource::runtimeStats() {
if (numBucketConversion_ > 0) {
res.insert({"numBucketConversion", RuntimeCounter(numBucketConversion_)});
}
for (const auto& storageStats : ioStats_->storageStats()) {
res.emplace(
storageStats.first,
RuntimeCounter(storageStats.second.sum, storageStats.second.unit));
}
return res;
}

Expand Down
4 changes: 2 additions & 2 deletions velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,8 @@ RowTypePtr SplitReader::createReader() {
try {
fileHandleCachePtr = fileHandleFactory_->generate(
hiveSplit_->filePath,
hiveSplit_->properties.has_value() ? &*hiveSplit_->properties
: nullptr);
hiveSplit_->properties.has_value() ? &*hiveSplit_->properties : nullptr,
ioStats_.get());
VELOX_CHECK_NOT_NULL(fileHandleCachePtr.get());
} catch (const VeloxRuntimeError& e) {
if (e.errorCode() == error_code::kFileNotFound &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ std::string AbfsFileSystem::name() const {

std::unique_ptr<ReadFile> AbfsFileSystem::openFileForRead(
std::string_view path,
const FileOptions& options) {
const FileOptions& options,
io::IoStatistics* ioStats) {
auto abfsfile = std::make_unique<AbfsReadFile>(path, *config_);
abfsfile->initialize(options);
return abfsfile;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ class AbfsFileSystem : public FileSystem {

std::unique_ptr<ReadFile> openFileForRead(
std::string_view path,
const FileOptions& options = {}) override;
const FileOptions& options = {},
io::IoStatistics* ioStats = nullptr) override;

std::unique_ptr<WriteFile> openFileForWrite(
std::string_view path,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,8 @@ void GcsFileSystem::initializeClient() {

std::unique_ptr<ReadFile> GcsFileSystem::openFileForRead(
std::string_view path,
const FileOptions& options) {
const FileOptions& options,
io::IoStatistics* ioStats) {
const auto gcspath = gcsPath(path);
auto gcsfile = std::make_unique<GcsReadFile>(gcspath, impl_->getClient());
gcsfile->initialize(options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ class GcsFileSystem : public FileSystem {
/// [[https://cloud.google.com/storage/docs/samples/storage-stream-file-download]].
std::unique_ptr<ReadFile> openFileForRead(
std::string_view path,
const FileOptions& options = {}) override;
const FileOptions& options = {},
io::IoStatistics* ioStats = nullptr) override;

/// Initialize a WriteFile
/// First the method google::cloud::storage::Client::GetObjectMetadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ std::string HdfsFileSystem::name() const {

std::unique_ptr<ReadFile> HdfsFileSystem::openFileForRead(
std::string_view path,
const FileOptions& /*unused*/) {
const FileOptions& /*unused*/,
io::IoStatistics* /*unused*/) {
// Only remove the schema for hdfs path.
if (path.find(kScheme) == 0) {
path.remove_prefix(kScheme.length());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ class HdfsFileSystem : public FileSystem {

std::unique_ptr<ReadFile> openFileForRead(
std::string_view path,
const FileOptions& options = {}) override;
const FileOptions& options = {},
io::IoStatistics* ioStats = nullptr) override;

std::unique_ptr<WriteFile> openFileForWrite(
std::string_view path,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,8 @@ std::string S3FileSystem::getLogLevelName() const {

std::unique_ptr<ReadFile> S3FileSystem::openFileForRead(
std::string_view s3Path,
const FileOptions& options) {
const FileOptions& options,
io::IoStatistics* ioStats) {
const auto path = getPath(s3Path);
auto s3file = std::make_unique<S3ReadFile>(path, impl_->s3Client());
s3file->initialize(options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ class S3FileSystem : public FileSystem {

std::unique_ptr<ReadFile> openFileForRead(
std::string_view s3Path,
const FileOptions& options = {}) override;
const FileOptions& options = {},
io::IoStatistics* ioStats = nullptr) override;

std::unique_ptr<WriteFile> openFileForWrite(
std::string_view s3Path,
Expand Down
Loading
Loading