Skip to content

Commit

Permalink
C++ client changes : change API for making TableHandles when using do…
Browse files Browse the repository at this point in the history
…Put, (#4024)

and do a better job of dependency tracking for releasing TableHandles
at the right time.
  • Loading branch information
kosak committed Jun 17, 2023
1 parent 5c0e5d0 commit 85e7708
Show file tree
Hide file tree
Showing 25 changed files with 392 additions and 488 deletions.
8 changes: 2 additions & 6 deletions cpp-client/deephaven/client/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,9 @@ set(ALL_FILES
include/private/deephaven/client/impl/table_handle_manager_impl.h
include/private/deephaven/client/impl/util.h

src/arrowutil/arrow_flight.cc

include/private/deephaven/client/arrowutil/arrow_traits.h
include/private/deephaven/client/arrowutil/arrow_flight.h
include/private/deephaven/client/arrowutil/arrow_visitors.h
include/private/deephaven/client/arrowutil/arrow_value_converter.h
include/private/deephaven/client/arrowutil/arrow_column_source.h
include/private/deephaven/client/arrowutil/arrow_value_converter.h
include/private/deephaven/client/arrowutil/arrow_visitors.h

src/columns.cc
src/expressions.cc
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -42,27 +42,25 @@ class LazyStateInfo final {
typedef io::deephaven::proto::backplane::grpc::Ticket Ticket;

public:
LazyStateInfo(Ticket ticket, int64_t numRows, bool isStatic);
LazyStateInfo(int64_t numRows, bool isStatic);
LazyStateInfo(const LazyStateInfo &other);
LazyStateInfo &operator=(const LazyStateInfo &other);
LazyStateInfo(LazyStateInfo &&other) noexcept;
LazyStateInfo &operator=(LazyStateInfo &&other) noexcept;
~LazyStateInfo();

Ticket &ticket() { return ticket_; }
const Ticket &ticket() const { return ticket_; }
int64_t numRows() const { return numRows_; }
bool isStatic() const { return isStatic_; }

private:
Ticket ticket_;
int64_t numRows_ = 0;
bool isStatic_ = false;
};

class ExportedTableCreationCallback final
: public deephaven::dhcore::utility::SFCallback<io::deephaven::proto::backplane::grpc::ExportedTableCreationResponse> {
typedef io::deephaven::proto::backplane::grpc::ExportedTableCreationResponse ExportedTableCreationResponse;
typedef io::deephaven::proto::backplane::grpc::Ticket Ticket;
typedef deephaven::client::server::Server Server;
typedef deephaven::client::utility::Executor Executor;

Expand All @@ -74,13 +72,17 @@ class ExportedTableCreationCallback final
using CBFuture = deephaven::dhcore::utility::CBFuture<T>;

public:
explicit ExportedTableCreationCallback(CBPromise<LazyStateInfo> &&infoPromise);
ExportedTableCreationCallback(std::shared_ptr<TableHandleImpl> dependency, Ticket expectedTicket,
CBPromise<LazyStateInfo> infoPromise);
~ExportedTableCreationCallback() final;

void onSuccess(ExportedTableCreationResponse item) final;
void onFailure(std::exception_ptr ep) final;

private:
// Hold a dependency on the parent until this callback is done.
std::shared_ptr<TableHandleImpl> dependency_;
Ticket expectedTicket_;
CBPromise<LazyStateInfo> infoPromise_;
};

Expand All @@ -100,12 +102,14 @@ class LazyState final {

public:
LazyState(std::shared_ptr<Server> server, std::shared_ptr<Executor> flightExecutor,
CBFuture<LazyStateInfo> infoFuture);
CBFuture<LazyStateInfo> infoFuture, Ticket ticket);
~LazyState();

std::shared_ptr<Schema> getSchema();
void getSchemaAsync(std::shared_ptr<SFCallback<std::shared_ptr<Schema>>> cb);

void releaseAsync();

/**
* Used in tests.
*/
Expand All @@ -117,6 +121,7 @@ class LazyState final {
std::shared_ptr<Server> server_;
std::shared_ptr<Executor> flightExecutor_;
CBFuture<LazyStateInfo> infoFuture_;
Ticket ticket_;

std::atomic_flag schemaRequestSent_ = {};
CBPromise<std::shared_ptr<Schema>> schemaPromise_;
Expand Down Expand Up @@ -145,12 +150,12 @@ class TableHandleImpl : public std::enable_shared_from_this<TableHandleImpl> {
using SFCallback = deephaven::dhcore::utility::SFCallback<Args...>;
public:
static std::pair<std::shared_ptr<internal::ExportedTableCreationCallback>, std::shared_ptr<internal::LazyState>>
createEtcCallback(const TableHandleManagerImpl *thm);
createEtcCallback(std::shared_ptr<TableHandleImpl> dependency, const TableHandleManagerImpl *thm, Ticket resultTicket);

static std::shared_ptr<TableHandleImpl> create(std::shared_ptr<const TableHandleImpl> parent,
std::shared_ptr<TableHandleManagerImpl> thm, Ticket ticket, std::shared_ptr<internal::LazyState> lazyState);
TableHandleImpl(Private, std::shared_ptr<const TableHandleImpl> &&parent, std::shared_ptr<TableHandleManagerImpl> &&thm,
Ticket &&ticket, std::shared_ptr<internal::LazyState> &&lazyState);
static std::shared_ptr<TableHandleImpl> create(std::shared_ptr<TableHandleManagerImpl> thm, Ticket ticket,
std::shared_ptr<internal::LazyState> lazyState);
TableHandleImpl(Private, std::shared_ptr<TableHandleManagerImpl> &&thm, Ticket &&ticket,
std::shared_ptr<internal::LazyState> &&lazyState);
~TableHandleImpl();

std::shared_ptr<TableHandleImpl> select(std::vector<std::string> columnSpecs);
Expand Down Expand Up @@ -192,13 +197,13 @@ class TableHandleImpl : public std::enable_shared_from_this<TableHandleImpl> {
std::shared_ptr<TableHandleImpl> merge(std::string keyColumn, std::vector<Ticket> sourceTickets);

std::shared_ptr<TableHandleImpl> crossJoin(const TableHandleImpl &rightSide,
std::vector<std::string> columnsToMatch, std::vector<std::string> columnsToAdd) const;
std::vector<std::string> columnsToMatch, std::vector<std::string> columnsToAdd);

std::shared_ptr<TableHandleImpl> naturalJoin(const TableHandleImpl &rightSide,
std::vector<std::string> columnsToMatch, std::vector<std::string> columnsToAdd) const;
std::vector<std::string> columnsToMatch, std::vector<std::string> columnsToAdd);

std::shared_ptr<TableHandleImpl> exactJoin(const TableHandleImpl &rightSide,
std::vector<std::string> columnsToMatch, std::vector<std::string> columnsToAdd) const;
std::vector<std::string> columnsToMatch, std::vector<std::string> columnsToAdd);

std::shared_ptr<TableHandleImpl> asOfJoin(AsOfJoinTablesRequest::MatchRule matchRule,
const TableHandleImpl &rightSide, std::vector<std::string> columnsToMatch,
Expand Down Expand Up @@ -241,10 +246,6 @@ class TableHandleImpl : public std::enable_shared_from_this<TableHandleImpl> {
std::shared_ptr<TableHandleImpl> headOrTailByHelper(int64_t n, bool head,
std::vector<std::string> columnSpecs);

/**
* This TableHandleImpl holds a dependency on its parent so that the parent's lifetime is as least as long as this.
*/
std::shared_ptr<const TableHandleImpl> parent_;
std::shared_ptr<TableHandleManagerImpl> managerImpl_;
Ticket ticket_;
std::shared_ptr<internal::LazyState> lazyState_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,15 @@ class TableHandleManagerImpl final : public std::enable_shared_from_this<TableHa
void runScriptAsync(std::string code, std::shared_ptr<SFCallback<>> callback);

/**
* For locally creating a new ticket, e.g. when making a table with Arrow. numRows and isStatic are needed
* so that TableHandleImpl has something to report for TableHandleImpl::numRows() and TableHandleImpl::isStatic().
* See the documentation for Server::newTicket().
*/
std::tuple<std::shared_ptr<TableHandleImpl>, arrow::flight::FlightDescriptor> newTicket(int64_t numRows,
bool isStatic);
std::string newTicket() {
auto ticket = server_->newTicket();
// our API only wants the internal string part.
return std::move(*ticket.mutable_ticket());
}

std::shared_ptr<TableHandleImpl> makeTableHandleFromTicket(std::string ticket);

const std::optional<Ticket> &consoleId() const { return consoleId_; }
const std::shared_ptr<Server> &server() const { return server_; }
Expand Down
Loading

0 comments on commit 85e7708

Please sign in to comment.