Skip to content

Commit

Permalink
tsuba: overlap work on the read side
Browse files Browse the repository at this point in the history
Signed-off-by: Tyler Hunt <thunt@katanagraph.com>
  • Loading branch information
Tyler Hunt authored and tylershunt committed Apr 29, 2021
1 parent 05cd1cf commit 535d9b5
Show file tree
Hide file tree
Showing 12 changed files with 352 additions and 144 deletions.
2 changes: 2 additions & 0 deletions libtsuba/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ add_dependencies(lib tsuba tsuba-preload)

set(sources
src/AddProperties.cpp
src/AsyncOpGroup.cpp
src/Errors.cpp
src/FaultTest.cpp
src/file.cpp
Expand All @@ -34,6 +35,7 @@ set(sources
src/RDGPartHeader.cpp
src/RDGPrefix.cpp
src/RDGSlice.cpp
src/ReadGroup.cpp
src/tsuba.cpp
src/WriteGroup.cpp
)
Expand Down
39 changes: 39 additions & 0 deletions libtsuba/include/tsuba/AsyncOpGroup.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#ifndef KATANA_LIBTSUBA_TSUBA_ASYNCOPGROUP_H_
#define KATANA_LIBTSUBA_TSUBA_ASYNCOPGROUP_H_

#include <future>
#include <list>

#include "katana/Result.h"

namespace tsuba {

class AsyncOpGroup {
public:
struct AsyncOp {
std::future<katana::Result<void>> result;
std::string location;
std::function<katana::Result<void>()> on_complete;
};

/// Add future to the list of futures this descriptor will wait for, note
/// the file name for debugging.
void AddOp(
std::future<katana::Result<void>> future, std::string file,
const std::function<katana::Result<void>()>& on_complete);

/// Wait until all operations this descriptor knows about have completed
katana::Result<void> Finish();
/// wait for the op at the head of the list, return true if there was one
bool FinishOne();

private:
std::list<AsyncOp> pending_ops_;
uint64_t errors_{0};
uint64_t total_{0};
katana::Result<void> last_error_{katana::ResultSuccess()};
};

} // namespace tsuba

#endif
1 change: 1 addition & 0 deletions libtsuba/include/tsuba/RDG.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "tsuba/FileView.h"
#include "tsuba/PartitionMetadata.h"
#include "tsuba/RDGLineage.h"
#include "tsuba/ReadGroup.h"
#include "tsuba/WriteGroup.h"
#include "tsuba/tsuba.h"

Expand Down
63 changes: 63 additions & 0 deletions libtsuba/include/tsuba/ReadGroup.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#ifndef KATANA_LIBTSUBA_TSUBA_READGROUP_H_
#define KATANA_LIBTSUBA_TSUBA_READGROUP_H_

#include <future>
#include <list>
#include <memory>

#include "katana/Result.h"
#include "tsuba/AsyncOpGroup.h"

namespace tsuba {

/// Track multiple, outstanding async writes and provide a mechanism to ensure
/// that they have all completed
class ReadGroup {
public:
static katana::Result<std::unique_ptr<ReadGroup>> Make();

/// Wait until all operations this descriptor knows about have completed
katana::Result<void> Finish();

/// Add future to the list of futures this ReadGroup will wait for, note
/// the file name for debugging. `on_complete` is guaranteed to be called
/// in FIFO order
void AddOp(
std::future<katana::Result<void>> future, std::string file,
const std::function<katana::Result<void>()>& on_complete);

/// same as AddOp, but the future may return a data type which can then be
/// consumed by on_complete
template <typename RetType>
void AddReturnsOp(
std::future<katana::Result<RetType>> future, const std::string& file,
const std::function<katana::Result<void>(RetType)>& on_complete) {
// n.b., make shared instead of unique because move capture below prevents
// passing generic_complete_fn as a std::function
auto ret_val = std::make_shared<RetType>();
auto new_future = std::async(
std::launch::deferred,
[future = std::move(future),
&ret_val_storage = *ret_val]() mutable -> katana::Result<void> {
auto res = future.get();
if (!res) {
return res.error();
}
ret_val_storage = res.value();
return katana::ResultSuccess();
});

std::function<katana::Result<void>()> generic_complete_fn =
[ret_val, on_complete]() -> katana::Result<void> {
return on_complete(std::move(*ret_val));
};
AddOp(std::move(new_future), file, generic_complete_fn);
}

private:
AsyncOpGroup async_op_group_;
};

} // namespace tsuba

#endif
10 changes: 2 additions & 8 deletions libtsuba/include/tsuba/WriteGroup.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <memory>

#include "katana/Result.h"
#include "tsuba/AsyncOpGroup.h"
#include "tsuba/FileFrame.h"
#include "tsuba/file.h"

Expand All @@ -21,18 +22,11 @@ class WriteGroup {
};

std::string tag_;
std::list<AsyncOp> pending_ops_;
std::atomic<uint64_t> outstanding_size_{0};
uint64_t errors_{0};
uint64_t total_{0};
katana::Result<void> last_error_{katana::ResultSuccess()};
AsyncOpGroup async_op_group_;

WriteGroup(std::string tag) : tag_(std::move(tag)){};

/// Wait for the next op if there is one, account errors. Returns true if
/// there was a next op
bool Drain();

public:
static constexpr uint64_t kMaxOutstandingSize = 10ULL << 30; // 10 GB

Expand Down
106 changes: 103 additions & 3 deletions libtsuba/src/AddProperties.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "AddProperties.h"

#include <memory>

#include <arrow/chunked_array.h>

#include "katana/Result.h"
Expand Down Expand Up @@ -52,7 +54,7 @@ tsuba::LoadProperties(
return DoLoadProperties(expected_name, file_path);
} catch (const std::exception& exp) {
return KATANA_ERROR(
ErrorCode::ArrowError, "arrow exception: {}", exp.what());
tsuba::ErrorCode::ArrowError, "arrow exception: {}", exp.what());
}
}

Expand All @@ -63,9 +65,107 @@ tsuba::LoadPropertySlice(
try {
return DoLoadProperties(
expected_name, file_path,
ParquetReader::Slice{.offset = offset, .length = length});
tsuba::ParquetReader::Slice{.offset = offset, .length = length});
} catch (const std::exception& exp) {
return KATANA_ERROR(
ErrorCode::ArrowError, "arrow exception: {}", exp.what());
tsuba::ErrorCode::ArrowError, "arrow exception: {}", exp.what());
}
}

katana::Result<void>
tsuba::AddProperties(
const katana::Uri& uri,
const std::vector<tsuba::PropStorageInfo>& properties, ReadGroup* grp,
const std::function<katana::Result<void>(std::shared_ptr<arrow::Table>)>&
add_fn) {
for (const tsuba::PropStorageInfo& prop : properties) {
const std::string& name = prop.name;
const katana::Uri& path = uri.Join(prop.path);
std::future<katana::Result<std::shared_ptr<arrow::Table>>> future =
std::async(
std::launch::async,
[name, path]() -> katana::Result<std::shared_ptr<arrow::Table>> {
auto load_result = LoadProperties(name, path);
if (!load_result) {
return load_result.error().WithContext(
"error loading {}", path);
}
return load_result.value();
});
auto on_complete = [add_fn,
name](const std::shared_ptr<arrow::Table>& props)
-> katana::Result<void> {
auto add_result = add_fn(props);
if (!add_result) {
return add_result.error().WithContext("adding {}", std::quoted(name));
}
return katana::ResultSuccess();
};
if (grp) {
grp->AddReturnsOp<std::shared_ptr<arrow::Table>>(
std::move(future), path.string(), on_complete);
continue;
}
auto read_res = future.get();
if (!read_res) {
return read_res.error();
}
auto on_complete_res = on_complete(read_res.value());
if (!on_complete_res) {
return on_complete_res.error();
}
}

return katana::ResultSuccess();
}

katana::Result<void>
tsuba::AddPropertySlice(
const katana::Uri& dir,
const std::vector<tsuba::PropStorageInfo>& properties,
std::pair<uint64_t, uint64_t> range, ReadGroup* grp,
const std::function<katana::Result<void>(std::shared_ptr<arrow::Table>)>&
add_fn) {
uint64_t begin = range.first;
uint64_t size = range.second - range.first;
for (const tsuba::PropStorageInfo& prop : properties) {
const std::string& name = prop.name;
const katana::Uri& path = dir.Join(prop.path);
std::future<katana::Result<std::shared_ptr<arrow::Table>>> future =
std::async(
std::launch::async,
[name, path, begin,
size]() -> katana::Result<std::shared_ptr<arrow::Table>> {
auto load_result = LoadPropertySlice(name, path, begin, size);
if (!load_result) {
return load_result.error().WithContext(
"error loading {}", path);
}
return load_result.value();
});
auto on_complete = [add_fn,
name](const std::shared_ptr<arrow::Table>& props)
-> katana::Result<void> {
auto add_result = add_fn(props);
if (!add_result) {
return add_result.error().WithContext("adding {}", std::quoted(name));
}
return katana::ResultSuccess();
};
if (grp) {
grp->AddReturnsOp<std::shared_ptr<arrow::Table>>(
std::move(future), path.string(), on_complete);
continue;
}
auto read_res = future.get();
if (!read_res) {
return read_res.error();
}
auto on_complete_res = on_complete(read_res.value());
if (!on_complete_res) {
return on_complete_res.error();
}
}

return katana::ResultSuccess();
}
55 changes: 9 additions & 46 deletions libtsuba/src/AddProperties.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "RDGPartHeader.h"
#include "katana/Result.h"
#include "katana/Uri.h"
#include "tsuba/ReadGroup.h"

namespace tsuba {

Expand All @@ -16,56 +17,18 @@ KATANA_EXPORT katana::Result<std::shared_ptr<arrow::Table>> LoadPropertySlice(
const std::string& expected_name, const katana::Uri& file_path,
int64_t offset, int64_t length);

template <typename AddFn>
katana::Result<void>
AddProperties(
KATANA_EXPORT katana::Result<void> AddProperties(
const katana::Uri& uri,
const std::vector<tsuba::PropStorageInfo>& properties, AddFn add_fn) {
for (const tsuba::PropStorageInfo& properties : properties) {
auto p_path = uri.Join(properties.path);
const std::vector<tsuba::PropStorageInfo>& properties, ReadGroup* grp,
const std::function<katana::Result<void>(std::shared_ptr<arrow::Table>)>&
add_fn);

auto load_result = LoadProperties(properties.name, p_path);
if (!load_result) {
return load_result.error().WithContext("error loading {}", p_path);
}

std::shared_ptr<arrow::Table> props = load_result.value();

auto add_result = add_fn(props);
if (!add_result) {
return add_result.error().WithContext(
"adding {}", std::quoted(properties.name));
}
}

return katana::ResultSuccess();
}

template <typename AddFn>
katana::Result<void>
AddPropertySlice(
KATANA_EXPORT katana::Result<void> AddPropertySlice(
const katana::Uri& dir,
const std::vector<tsuba::PropStorageInfo>& properties,
std::pair<uint64_t, uint64_t> range, AddFn add_fn) {
for (const tsuba::PropStorageInfo& properties : properties) {
katana::Uri p_path = dir.Join(properties.path);

auto load_result = LoadPropertySlice(
properties.name, p_path, range.first, range.second - range.first);
if (!load_result) {
return load_result.error();
}

std::shared_ptr<arrow::Table> props = load_result.value();

auto add_result = add_fn(props);
if (!add_result) {
return add_result.error();
}
}

return katana::ResultSuccess();
}
std::pair<uint64_t, uint64_t> range, ReadGroup* grp,
const std::function<katana::Result<void>(std::shared_ptr<arrow::Table>)>&
add_fn);

} // namespace tsuba

Expand Down
Loading

0 comments on commit 535d9b5

Please sign in to comment.