From 85e77085269bf3d07c8353429b11ee16e391c74f Mon Sep 17 00:00:00 2001 From: Corey Kosak Date: Sat, 17 Jun 2023 02:37:43 -0400 Subject: [PATCH] C++ client changes : change API for making TableHandles when using doPut, (#4024) and do a better job of dependency tracking for releasing TableHandles at the right time. --- cpp-client/deephaven/client/CMakeLists.txt | 8 +- .../deephaven/client/arrowutil/arrow_flight.h | 17 -- .../deephaven/client/arrowutil/arrow_traits.h | 88 ------ .../deephaven/client/impl/table_handle_impl.h | 37 +-- .../client/impl/table_handle_manager_impl.h | 12 +- .../private/deephaven/client/server/server.h | 82 +++--- .../include/public/deephaven/client/client.h | 23 +- .../include/public/deephaven/client/flight.h | 22 -- .../deephaven/client/utility/arrow_util.h | 5 + .../deephaven/client/utility/table_maker.h | 4 +- .../client/src/arrowutil/arrow_flight.cc | 17 -- cpp-client/deephaven/client/src/client.cc | 16 +- cpp-client/deephaven/client/src/flight.cc | 8 - .../client/src/impl/table_handle_impl.cc | 272 ++++++++++-------- .../src/impl/table_handle_manager_impl.cc | 37 ++- .../deephaven/client/src/server/server.cc | 166 +++++------ .../client/src/utility/arrow_util.cc | 12 + .../client/src/utility/table_maker.cc | 9 +- cpp-client/tests/attributes_test.cc | 9 + cpp-client/tests/new_table_test.cc | 2 +- cpp-client/tests/select_test.cc | 4 +- cpp-client/tests/sort_test.cc | 2 +- cpp-client/tests/test_util.cc | 2 +- .../create_table_with_arrow_flight/main.cc | 21 +- cpp-examples/read_csv/main.cc | 5 +- 25 files changed, 392 insertions(+), 488 deletions(-) delete mode 100644 cpp-client/deephaven/client/include/private/deephaven/client/arrowutil/arrow_flight.h delete mode 100644 cpp-client/deephaven/client/include/private/deephaven/client/arrowutil/arrow_traits.h delete mode 100644 cpp-client/deephaven/client/src/arrowutil/arrow_flight.cc diff --git a/cpp-client/deephaven/client/CMakeLists.txt b/cpp-client/deephaven/client/CMakeLists.txt index 79d5183345d..0d8aecfa624 100644 --- a/cpp-client/deephaven/client/CMakeLists.txt +++ b/cpp-client/deephaven/client/CMakeLists.txt @@ -44,13 +44,9 @@ set(ALL_FILES include/private/deephaven/client/impl/table_handle_manager_impl.h include/private/deephaven/client/impl/util.h - src/arrowutil/arrow_flight.cc - - include/private/deephaven/client/arrowutil/arrow_traits.h - include/private/deephaven/client/arrowutil/arrow_flight.h - include/private/deephaven/client/arrowutil/arrow_visitors.h - include/private/deephaven/client/arrowutil/arrow_value_converter.h include/private/deephaven/client/arrowutil/arrow_column_source.h + include/private/deephaven/client/arrowutil/arrow_value_converter.h + include/private/deephaven/client/arrowutil/arrow_visitors.h src/columns.cc src/expressions.cc diff --git a/cpp-client/deephaven/client/include/private/deephaven/client/arrowutil/arrow_flight.h b/cpp-client/deephaven/client/include/private/deephaven/client/arrowutil/arrow_flight.h deleted file mode 100644 index d32eec9d29f..00000000000 --- a/cpp-client/deephaven/client/include/private/deephaven/client/arrowutil/arrow_flight.h +++ /dev/null @@ -1,17 +0,0 @@ -/** - * Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending - */ -#pragma once -#include -#include -#include - -#include "deephaven/dhcore/types.h" - -namespace deephaven::client::arrowutil { -class ArrowUtil { -public: - static bool tryConvertTicketToFlightDescriptor(const std::string &ticket, - arrow::flight::FlightDescriptor *fd); -}; -} // namespace deephaven::client::arrowutil diff --git a/cpp-client/deephaven/client/include/private/deephaven/client/arrowutil/arrow_traits.h b/cpp-client/deephaven/client/include/private/deephaven/client/arrowutil/arrow_traits.h deleted file mode 100644 index c98863f8cf2..00000000000 --- a/cpp-client/deephaven/client/include/private/deephaven/client/arrowutil/arrow_traits.h +++ /dev/null @@ -1,88 +0,0 @@ -/** - * Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending - */ -#pragma once -#include -#include -#include -#include - -#include "deephaven/client/types.h" - -namespace deephaven::client::arrowutil { -/** - * Returns true iff type T is one of the numeric types. - */ -template -constexpr bool isNumericType() { - static_assert( - std::is_same_v || - std::is_same_v || - std::is_same_v || - std::is_same_v || - std::is_same_v || - std::is_same_v || - std::is_same_v || - std::is_same_v || - std::is_same_v , - "T is not one of the supported element types for Deephaven columns"); - - return std::is_same_v || - std::is_same_v || - std::is_same_v || - std::is_same_v || - std::is_same_v || - std::is_same_v; -} - -/** - * Maps the Deephaven element type to its corresponding Arrow array type. - */ -template -struct CorrespondingArrowArrayType {}; - -template<> -struct CorrespondingArrowArrayType { - typedef arrow::Int8Array type_t; -}; - -template<> -struct CorrespondingArrowArrayType { - typedef arrow::Int16Array type_t; -}; - -template<> -struct CorrespondingArrowArrayType { - typedef arrow::Int32Array type_t; -}; - -template<> -struct CorrespondingArrowArrayType { - typedef arrow::Int64Array type_t; -}; - -template<> -struct CorrespondingArrowArrayType { - typedef arrow::FloatArray type_t; -}; - -template<> -struct CorrespondingArrowArrayType { - typedef arrow::DoubleArray type_t; -}; - -template<> -struct CorrespondingArrowArrayType { - typedef arrow::BooleanArray type_t; -}; - -template<> -struct CorrespondingArrowArrayType { - typedef arrow::StringArray type_t; -}; - -template<> -struct CorrespondingArrowArrayType { - typedef arrow::TimestampArray type_t; -}; -} // namespace deephaven::client::arrowutil diff --git a/cpp-client/deephaven/client/include/private/deephaven/client/impl/table_handle_impl.h b/cpp-client/deephaven/client/include/private/deephaven/client/impl/table_handle_impl.h index a364517ff2a..89f83cc6d1c 100644 --- a/cpp-client/deephaven/client/include/private/deephaven/client/impl/table_handle_impl.h +++ b/cpp-client/deephaven/client/include/private/deephaven/client/impl/table_handle_impl.h @@ -42,20 +42,17 @@ class LazyStateInfo final { typedef io::deephaven::proto::backplane::grpc::Ticket Ticket; public: - LazyStateInfo(Ticket ticket, int64_t numRows, bool isStatic); + LazyStateInfo(int64_t numRows, bool isStatic); LazyStateInfo(const LazyStateInfo &other); LazyStateInfo &operator=(const LazyStateInfo &other); LazyStateInfo(LazyStateInfo &&other) noexcept; LazyStateInfo &operator=(LazyStateInfo &&other) noexcept; ~LazyStateInfo(); - Ticket &ticket() { return ticket_; } - const Ticket &ticket() const { return ticket_; } int64_t numRows() const { return numRows_; } bool isStatic() const { return isStatic_; } private: - Ticket ticket_; int64_t numRows_ = 0; bool isStatic_ = false; }; @@ -63,6 +60,7 @@ class LazyStateInfo final { class ExportedTableCreationCallback final : public deephaven::dhcore::utility::SFCallback { typedef io::deephaven::proto::backplane::grpc::ExportedTableCreationResponse ExportedTableCreationResponse; + typedef io::deephaven::proto::backplane::grpc::Ticket Ticket; typedef deephaven::client::server::Server Server; typedef deephaven::client::utility::Executor Executor; @@ -74,13 +72,17 @@ class ExportedTableCreationCallback final using CBFuture = deephaven::dhcore::utility::CBFuture; public: - explicit ExportedTableCreationCallback(CBPromise &&infoPromise); + ExportedTableCreationCallback(std::shared_ptr dependency, Ticket expectedTicket, + CBPromise infoPromise); ~ExportedTableCreationCallback() final; void onSuccess(ExportedTableCreationResponse item) final; void onFailure(std::exception_ptr ep) final; private: + // Hold a dependency on the parent until this callback is done. + std::shared_ptr dependency_; + Ticket expectedTicket_; CBPromise infoPromise_; }; @@ -100,12 +102,14 @@ class LazyState final { public: LazyState(std::shared_ptr server, std::shared_ptr flightExecutor, - CBFuture infoFuture); + CBFuture infoFuture, Ticket ticket); ~LazyState(); std::shared_ptr getSchema(); void getSchemaAsync(std::shared_ptr>> cb); + void releaseAsync(); + /** * Used in tests. */ @@ -117,6 +121,7 @@ class LazyState final { std::shared_ptr server_; std::shared_ptr flightExecutor_; CBFuture infoFuture_; + Ticket ticket_; std::atomic_flag schemaRequestSent_ = {}; CBPromise> schemaPromise_; @@ -145,12 +150,12 @@ class TableHandleImpl : public std::enable_shared_from_this { using SFCallback = deephaven::dhcore::utility::SFCallback; public: static std::pair, std::shared_ptr> - createEtcCallback(const TableHandleManagerImpl *thm); + createEtcCallback(std::shared_ptr dependency, const TableHandleManagerImpl *thm, Ticket resultTicket); - static std::shared_ptr create(std::shared_ptr parent, - std::shared_ptr thm, Ticket ticket, std::shared_ptr lazyState); - TableHandleImpl(Private, std::shared_ptr &&parent, std::shared_ptr &&thm, - Ticket &&ticket, std::shared_ptr &&lazyState); + static std::shared_ptr create(std::shared_ptr thm, Ticket ticket, + std::shared_ptr lazyState); + TableHandleImpl(Private, std::shared_ptr &&thm, Ticket &&ticket, + std::shared_ptr &&lazyState); ~TableHandleImpl(); std::shared_ptr select(std::vector columnSpecs); @@ -192,13 +197,13 @@ class TableHandleImpl : public std::enable_shared_from_this { std::shared_ptr merge(std::string keyColumn, std::vector sourceTickets); std::shared_ptr crossJoin(const TableHandleImpl &rightSide, - std::vector columnsToMatch, std::vector columnsToAdd) const; + std::vector columnsToMatch, std::vector columnsToAdd); std::shared_ptr naturalJoin(const TableHandleImpl &rightSide, - std::vector columnsToMatch, std::vector columnsToAdd) const; + std::vector columnsToMatch, std::vector columnsToAdd); std::shared_ptr exactJoin(const TableHandleImpl &rightSide, - std::vector columnsToMatch, std::vector columnsToAdd) const; + std::vector columnsToMatch, std::vector columnsToAdd); std::shared_ptr asOfJoin(AsOfJoinTablesRequest::MatchRule matchRule, const TableHandleImpl &rightSide, std::vector columnsToMatch, @@ -241,10 +246,6 @@ class TableHandleImpl : public std::enable_shared_from_this { std::shared_ptr headOrTailByHelper(int64_t n, bool head, std::vector columnSpecs); - /** - * This TableHandleImpl holds a dependency on its parent so that the parent's lifetime is as least as long as this. - */ - std::shared_ptr parent_; std::shared_ptr managerImpl_; Ticket ticket_; std::shared_ptr lazyState_; diff --git a/cpp-client/deephaven/client/include/private/deephaven/client/impl/table_handle_manager_impl.h b/cpp-client/deephaven/client/include/private/deephaven/client/impl/table_handle_manager_impl.h index 3e2095c06b4..e4db1eeed61 100644 --- a/cpp-client/deephaven/client/include/private/deephaven/client/impl/table_handle_manager_impl.h +++ b/cpp-client/deephaven/client/include/private/deephaven/client/impl/table_handle_manager_impl.h @@ -43,11 +43,15 @@ class TableHandleManagerImpl final : public std::enable_shared_from_this> callback); /** - * For locally creating a new ticket, e.g. when making a table with Arrow. numRows and isStatic are needed - * so that TableHandleImpl has something to report for TableHandleImpl::numRows() and TableHandleImpl::isStatic(). + * See the documentation for Server::newTicket(). */ - std::tuple, arrow::flight::FlightDescriptor> newTicket(int64_t numRows, - bool isStatic); + std::string newTicket() { + auto ticket = server_->newTicket(); + // our API only wants the internal string part. + return std::move(*ticket.mutable_ticket()); + } + + std::shared_ptr makeTableHandleFromTicket(std::string ticket); const std::optional &consoleId() const { return consoleId_; } const std::shared_ptr &server() const { return server_; } diff --git a/cpp-client/deephaven/client/include/private/deephaven/client/server/server.h b/cpp-client/deephaven/client/include/private/deephaven/client/server/server.h index 7beebe8b46d..5a765b91d25 100644 --- a/cpp-client/deephaven/client/include/private/deephaven/client/server/server.h +++ b/cpp-client/deephaven/client/include/private/deephaven/client/server/server.h @@ -131,8 +131,10 @@ class Server : public std::enable_shared_from_this { // TODO(kosak): decide on the multithreaded story here arrow::flight::FlightClient *flightClient() const { return flightClient_.get(); } + /** + * Allocates a new Ticket from client-managed namespace. + */ Ticket newTicket(); - std::tuple newTicketAndFlightDescriptor(); void getConfigurationConstantsAsync( std::shared_ptr> callback); @@ -142,7 +144,9 @@ class Server : public std::enable_shared_from_this { void executeCommandAsync(Ticket consoleId, std::string code, std::shared_ptr> callback); - Ticket emptyTableAsync(int64_t size, std::shared_ptr etcCallback); + void getExportedTableCreationResponseAsync(Ticket ticket, std::shared_ptr callback); + + void emptyTableAsync(int64_t size, std::shared_ptr etcCallback, Ticket result); // std::shared_ptr historicalTableAsync(std::shared_ptr nameSpace, // std::shared_ptr tableName, std::shared_ptr itdCallback); @@ -150,77 +154,77 @@ class Server : public std::enable_shared_from_this { // std::shared_ptr tempTableAsync(std::shared_ptr>> columnHolders, // std::shared_ptr itdCallback); - Ticket timeTableAsync(int64_t startTimeNanos, int64_t periodNanos, - std::shared_ptr etcCallback); + void timeTableAsync(int64_t startTimeNanos, int64_t periodNanos, std::shared_ptr etcCallback, + Ticket result); // // std::shared_ptr snapshotAsync(std::shared_ptr leftTableHandle, // std::shared_ptr rightTableHandle, // bool doInitialSnapshot, std::shared_ptr>> stampColumns, // std::shared_ptr itdCallback); - Ticket selectAsync(Ticket parentTicket, std::vector columnSpecs, - std::shared_ptr etcCallback); + void selectAsync(Ticket parentTicket, std::vector columnSpecs, + std::shared_ptr etcCallback, Ticket result); - Ticket updateAsync(Ticket parentTicket, std::vector columnSpecs, - std::shared_ptr etcCallback); + void updateAsync(Ticket parentTicket, std::vector columnSpecs, + std::shared_ptr etcCallback, Ticket result); - Ticket viewAsync(Ticket parentTicket, std::vector columnSpecs, - std::shared_ptr etcCallback); + void viewAsync(Ticket parentTicket, std::vector columnSpecs, + std::shared_ptr etcCallback, Ticket result); - Ticket updateViewAsync(Ticket parentTicket, std::vector columnSpecs, - std::shared_ptr etcCallback); + void updateViewAsync(Ticket parentTicket, std::vector columnSpecs, + std::shared_ptr etcCallback, Ticket result); - Ticket dropColumnsAsync(Ticket parentTicket, std::vector columnSpecs, - std::shared_ptr etcCallback); + void dropColumnsAsync(Ticket parentTicket, std::vector columnSpecs, + std::shared_ptr etcCallback, Ticket result); - Ticket whereAsync(Ticket parentTicket, std::string condition, - std::shared_ptr etcCallback); + void whereAsync(Ticket parentTicket, std::string condition, std::shared_ptr etcCallback, + Ticket result); - Ticket sortAsync(Ticket parentTicket, std::vector sortDescriptors, - std::shared_ptr etcCallback); + void sortAsync(Ticket parentTicket, std::vector sortDescriptors, + std::shared_ptr etcCallback, Ticket result); // std::shared_ptr preemptiveAsync(std::shared_ptr parentTableHandle, // int32_t sampleIntervalMs, std::shared_ptr itdCallback); - Ticket comboAggregateDescriptorAsync(Ticket parentTicket, + void comboAggregateDescriptorAsync(Ticket parentTicket, std::vector aggregates, std::vector groupByColumns, bool forceCombo, - std::shared_ptr etcCallback); + std::shared_ptr etcCallback, Ticket result); - Ticket headOrTailByAsync(Ticket parentTicket, bool head, int64_t n, - std::vector columnSpecs, std::shared_ptr etcCallback); + void headOrTailByAsync(Ticket parentTicket, bool head, int64_t n, + std::vector columnSpecs, std::shared_ptr etcCallback, Ticket result); - Ticket headOrTailAsync(Ticket parentTicket, - bool head, int64_t n, std::shared_ptr etcCallback); + void headOrTailAsync(Ticket parentTicket, bool head, int64_t n, std::shared_ptr etcCallback, + Ticket result); - Ticket ungroupAsync(Ticket parentTicket, bool nullFill, std::vector groupByColumns, - std::shared_ptr etcCallback); + void ungroupAsync(Ticket parentTicket, bool nullFill, std::vector groupByColumns, + std::shared_ptr etcCallback, Ticket result); - Ticket mergeAsync(std::vector sourceTickets, std::string keyColumn, - std::shared_ptr etcCallback); + void mergeAsync(std::vector sourceTickets, std::string keyColumn, + std::shared_ptr etcCallback, Ticket result); - Ticket crossJoinAsync(Ticket leftTableTicket, Ticket rightTableTicket, + void crossJoinAsync(Ticket leftTableTicket, Ticket rightTableTicket, std::vector columnsToMatch, std::vector columnsToAdd, - std::shared_ptr etcCallback); + std::shared_ptr etcCallback, Ticket result); - Ticket naturalJoinAsync(Ticket leftTableTicket, Ticket rightTableTicket, + void naturalJoinAsync(Ticket leftTableTicket, Ticket rightTableTicket, std::vector columnsToMatch, std::vector columnsToAdd, - std::shared_ptr etcCallback); + std::shared_ptr etcCallback, Ticket result); - Ticket exactJoinAsync(Ticket leftTableTicket, Ticket rightTableTicket, + void exactJoinAsync(Ticket leftTableTicket, Ticket rightTableTicket, std::vector columnsToMatch, std::vector columnsToAdd, - std::shared_ptr etcCallback); + std::shared_ptr etcCallback, Ticket result); - Ticket asOfJoinAsync(AsOfJoinTablesRequest::MatchRule matchRule, Ticket leftTableTicket, + void asOfJoinAsync(AsOfJoinTablesRequest::MatchRule matchRule, Ticket leftTableTicket, Ticket rightTableTicket, std::vector columnsToMatch, - std::vector columnsToAdd, std::shared_ptr etcCallback); + std::vector columnsToAdd, std::shared_ptr etcCallback, Ticket result); void bindToVariableAsync(const Ticket &consoleId, const Ticket &tableId, std::string variable, std::shared_ptr> callback); void releaseAsync(Ticket ticket, std::shared_ptr> callback); - Ticket fetchTableAsync(std::string tableName, std::shared_ptr callback); + void fetchTableAsync(std::string tableName, std::shared_ptr callback, Ticket result); template void sendRpc(const TReq &req, std::shared_ptr> responseCallback, @@ -238,8 +242,8 @@ class Server : public std::enable_shared_from_this { (TableService::Stub::*selectOrUpdateMethod_t)(::grpc::ClientContext *context, const SelectOrUpdateRequest &request, ::grpc::CompletionQueue *cq); - Ticket selectOrUpdateHelper(Ticket parentTicket, std::vector columnSpecs, - std::shared_ptr etcCallback, selectOrUpdateMethod_t method); + void selectOrUpdateHelper(Ticket parentTicket, std::vector columnSpecs, + std::shared_ptr etcCallback, Ticket result, selectOrUpdateMethod_t method); static void processCompletionQueueForever(const std::shared_ptr &self); bool processNextCompletionQueueItem(); diff --git a/cpp-client/deephaven/client/include/public/deephaven/client/client.h b/cpp-client/deephaven/client/include/public/deephaven/client/client.h index 93962a7c0af..39758d64f86 100644 --- a/cpp-client/deephaven/client/include/public/deephaven/client/client.h +++ b/cpp-client/deephaven/client/include/public/deephaven/client/client.h @@ -18,7 +18,6 @@ */ namespace deephaven::client { class FlightWrapper; -class TableHandleAndFlightDescriptor; } // namespace deephaven::client /** @@ -119,15 +118,21 @@ class TableHandleManager { TableHandle timeTable(std::chrono::system_clock::time_point startTime, std::chrono::system_clock::duration period) const; /** - * Allocate a fresh TableHandle and return both it and its corresponding Arrow FlightDescriptor. - * This is used when the caller wants to do an Arrow DoPut operation. - * The object returned is only forward-referenced in this file. If you want to use it, you will - * also need to include deephaven/client/flight.h. - * @param numRows The number of table rows (reflected back when you call TableHandle::numRows()) - * @param isStatic Whether the table is static (reflected back when youcall TableHandle::isStatic()) - * @return A TableHandle and Arrow FlightDescriptor referring to the new table. + * Allocate a fresh client ticket. This is a low level operation, typically used when the caller wants to do an Arrow + * doPut operation. + * @example + * auto ticket = manager.newTicket(); + * auto flightDescriptor = convertTicketToFlightDescriptor(ticket); + * // [do arrow operations here to put your table to the server] + * // Once that is done, you can bind the ticket to a TableHandle + * auto tableHandle = manager.makeTableHandleFromTicket(ticket); */ - TableHandleAndFlightDescriptor newTableHandleAndFlightDescriptor(int64_t numRows, bool isStatic) const; + std::string newTicket() const; + /** + * Creates a TableHandle that owns the underlying ticket and its resources. The ticket argument is typically + * created with newTicket() and then populated e.g. with Arrow operations. + */ + TableHandle makeTableHandleFromTicket(std::string ticket) const; /** * Execute a script on the server. This assumes that the Client was created with a sessionType corresponding to * the language of the script (typically either "python" or "groovy") and that the code matches that language. diff --git a/cpp-client/deephaven/client/include/public/deephaven/client/flight.h b/cpp-client/deephaven/client/include/public/deephaven/client/flight.h index b8dd75ac106..29677babff2 100644 --- a/cpp-client/deephaven/client/include/public/deephaven/client/flight.h +++ b/cpp-client/deephaven/client/include/public/deephaven/client/flight.h @@ -64,26 +64,4 @@ class FlightWrapper { private: std::shared_ptr impl_; }; - -/** - * The return type for TableHandleManager::newTableHandleAndFlightDescriptor(), defined in - * deephaven/client/client.h. - */ -class TableHandleAndFlightDescriptor { -public: - TableHandleAndFlightDescriptor(TableHandle tableHandle, - arrow::flight::FlightDescriptor flightDescriptor); - TableHandleAndFlightDescriptor(TableHandleAndFlightDescriptor &&other) noexcept; - TableHandleAndFlightDescriptor &operator=(TableHandleAndFlightDescriptor &&other) noexcept; - ~TableHandleAndFlightDescriptor(); - - TableHandle &tableHandle() { return tableHandle_; } - const TableHandle &tableHandle() const { return tableHandle_; } - - arrow::flight::FlightDescriptor &flightDescriptor() { return flightDescriptor_; } - const arrow::flight::FlightDescriptor &flightDescriptor() const { return flightDescriptor_; } - - TableHandle tableHandle_; - arrow::flight::FlightDescriptor flightDescriptor_; -}; } // namespace deephaven::client diff --git a/cpp-client/deephaven/client/include/public/deephaven/client/utility/arrow_util.h b/cpp-client/deephaven/client/include/public/deephaven/client/utility/arrow_util.h index a8039d6267f..c38757bd436 100644 --- a/cpp-client/deephaven/client/include/public/deephaven/client/utility/arrow_util.h +++ b/cpp-client/deephaven/client/include/public/deephaven/client/utility/arrow_util.h @@ -8,9 +8,14 @@ #include #include #include +#include +#include + #include "deephaven/dhcore/utility/utility.h" namespace deephaven::client::utility { +arrow::flight::FlightDescriptor convertTicketToFlightDescriptor(const std::string &ticket); + /** * If result's status is OK, do nothing. Otherwise throw a runtime error with an informative message. * @param debugInfo A DebugInfo object, typically as provided by DEEPHAVEN_EXPR_MSG. diff --git a/cpp-client/deephaven/client/include/public/deephaven/client/utility/table_maker.h b/cpp-client/deephaven/client/include/public/deephaven/client/utility/table_maker.h index 35af250fa17..e734d9adb94 100644 --- a/cpp-client/deephaven/client/include/public/deephaven/client/utility/table_maker.h +++ b/cpp-client/deephaven/client/include/public/deephaven/client/utility/table_maker.h @@ -100,11 +100,9 @@ class TableMaker { /** * Make the table. Call this after all your calls to addColumn(). * @param manager The TableHandleManager - * @param numRows The number of rows in your table. This will be reflected in TableHandle::numRows() - * @param isStatic Whether your table is static. This will be reflected in TableHandle::isStatic(). * @return The TableHandle referencing the newly-created table. */ - TableHandle makeTable(const TableHandleManager &manager, int64_t numRows, bool isStatic); + TableHandle makeTable(const TableHandleManager &manager); private: void finishAddColumn(std::string name, internal::TypeConverter info); diff --git a/cpp-client/deephaven/client/src/arrowutil/arrow_flight.cc b/cpp-client/deephaven/client/src/arrowutil/arrow_flight.cc deleted file mode 100644 index 59942ce4d52..00000000000 --- a/cpp-client/deephaven/client/src/arrowutil/arrow_flight.cc +++ /dev/null @@ -1,17 +0,0 @@ -/** - * Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending - */ -#include "deephaven/client/arrowutil/arrow_flight.h" - -namespace deephaven::client::arrowutil { -bool ArrowUtil::tryConvertTicketToFlightDescriptor(const std::string &ticket, - arrow::flight::FlightDescriptor *fd) { - if (ticket.length() != 5 || ticket[0] != 'e') { - return false; - } - uint32_t value; - memcpy(&value, ticket.data() + 1, sizeof(uint32_t)); - *fd = arrow::flight::FlightDescriptor::Path({"export", std::to_string(value)}); - return true; -}; -} // namespace deephaven::client::arrowutil diff --git a/cpp-client/deephaven/client/src/client.cc b/cpp-client/deephaven/client/src/client.cc index a80c81402f1..5921a558518 100644 --- a/cpp-client/deephaven/client/src/client.cc +++ b/cpp-client/deephaven/client/src/client.cc @@ -88,13 +88,6 @@ TableHandle TableHandleManager::timeTable(int64_t startTimeNanos, int64_t period return TableHandle(std::move(qsImpl)); } -TableHandleAndFlightDescriptor TableHandleManager::newTableHandleAndFlightDescriptor(int64_t numRows, - bool isStatic) const { - auto [thImpl, fd] = impl_->newTicket(numRows, isStatic); - TableHandle th(std::move(thImpl)); - return {std::move(th), std::move(fd)}; -} - TableHandle TableHandleManager::timeTable(std::chrono::system_clock::time_point startTime, std::chrono::system_clock::duration period) const { auto stNanos = std::chrono::duration_cast(startTime.time_since_epoch()).count(); @@ -102,6 +95,15 @@ TableHandle TableHandleManager::timeTable(std::chrono::system_clock::time_point return timeTable(stNanos, dNanos); } +std::string TableHandleManager::newTicket() const { + return impl_->newTicket(); +} + +TableHandle TableHandleManager::makeTableHandleFromTicket(std::string ticket) const { + auto handleImpl = impl_->makeTableHandleFromTicket(std::move(ticket)); + return TableHandle(std::move(handleImpl)); +} + void TableHandleManager::runScript(std::string code) const { auto res = SFCallback<>::createForFuture(); impl_->runScriptAsync(std::move(code), std::move(res.first)); diff --git a/cpp-client/deephaven/client/src/flight.cc b/cpp-client/deephaven/client/src/flight.cc index da155bb393b..e82b4f973d2 100644 --- a/cpp-client/deephaven/client/src/flight.cc +++ b/cpp-client/deephaven/client/src/flight.cc @@ -41,12 +41,4 @@ arrow::flight::FlightClient *FlightWrapper::flightClient() const { const auto *server = impl_->server().get(); return server->flightClient(); } - -TableHandleAndFlightDescriptor::TableHandleAndFlightDescriptor(TableHandle tableHandle, - arrow::flight::FlightDescriptor flightDescriptor) : tableHandle_(std::move(tableHandle)), - flightDescriptor_(std::move(flightDescriptor)) {} -TableHandleAndFlightDescriptor::~TableHandleAndFlightDescriptor() = default; - -TableHandleAndFlightDescriptor::TableHandleAndFlightDescriptor(TableHandleAndFlightDescriptor &&other) noexcept = default; -TableHandleAndFlightDescriptor &TableHandleAndFlightDescriptor::operator=(TableHandleAndFlightDescriptor &&other) noexcept = default; } // namespace deephaven::client diff --git a/cpp-client/deephaven/client/src/impl/table_handle_impl.cc b/cpp-client/deephaven/client/src/impl/table_handle_impl.cc index d7058853cf1..3c722d98241 100644 --- a/cpp-client/deephaven/client/src/impl/table_handle_impl.cc +++ b/cpp-client/deephaven/client/src/impl/table_handle_impl.cc @@ -13,7 +13,6 @@ #include #include #include -#include "deephaven/client/arrowutil/arrow_flight.h" #include "deephaven/client/impl/boolean_expression_impl.h" #include "deephaven/client/impl/columns_impl.h" #include "deephaven/client/impl/table_handle_manager_impl.h" @@ -36,7 +35,6 @@ using io::deephaven::proto::backplane::grpc::TableReference; using io::deephaven::proto::backplane::grpc::Ticket; using io::deephaven::proto::backplane::script::grpc::BindTableToVariableResponse; using deephaven::client::SortDirection; -using deephaven::client::arrowutil::ArrowUtil; using deephaven::client::impl::ColumnImpl; using deephaven::client::impl::DateTimeColImpl; using deephaven::client::impl::NumColImpl; @@ -44,6 +42,7 @@ using deephaven::client::impl::StrColImpl; using deephaven::client::server::Server; using deephaven::client::subscription::SubscriptionThread; using deephaven::client::subscription::SubscriptionHandle; +using deephaven::client::utility::convertTicketToFlightDescriptor; using deephaven::client::utility::Executor; using deephaven::client::utility::okOrThrow; using deephaven::client::utility::okOrThrow; @@ -71,89 +70,87 @@ using deephaven::dhcore::utility::stringf; namespace deephaven::client { namespace impl { std::pair, std::shared_ptr> -TableHandleImpl::createEtcCallback(const TableHandleManagerImpl *thm) { +TableHandleImpl::createEtcCallback(std::shared_ptr dependency, const TableHandleManagerImpl *thm, + Ticket resultTicket) { CBPromise infoPromise; - auto ticketFuture = infoPromise.makeFuture(); - auto cb = std::make_shared(std::move(infoPromise)); - auto ls = std::make_shared(thm->server(), thm->flightExecutor(), - std::move(ticketFuture)); + auto infoFuture = infoPromise.makeFuture(); + auto cb = std::make_shared(std::move(dependency), resultTicket, + std::move(infoPromise)); + auto ls = std::make_shared(thm->server(), thm->flightExecutor(), std::move(infoFuture), + std::move(resultTicket)); return std::make_pair(std::move(cb), std::move(ls)); } -std::shared_ptr TableHandleImpl::create(std::shared_ptr parent, - std::shared_ptr thm, Ticket ticket, std::shared_ptr lazyState) { - return std::make_shared(Private(), std::move(parent), std::move(thm), std::move(ticket), - std::move(lazyState)); +std::shared_ptr TableHandleImpl::create(std::shared_ptr thm, Ticket ticket, + std::shared_ptr lazyState) { + return std::make_shared(Private(), std::move(thm), std::move(ticket), std::move(lazyState)); } -TableHandleImpl::TableHandleImpl(Private, std::shared_ptr &&parent, - std::shared_ptr &&thm, Ticket &&ticket, std::shared_ptr &&lazyState) : - parent_(std::move(parent)), managerImpl_(std::move(thm)), ticket_(std::move(ticket)), +TableHandleImpl::TableHandleImpl(Private, std::shared_ptr &&thm, Ticket &&ticket, + std::shared_ptr &&lazyState) : managerImpl_(std::move(thm)), ticket_(std::move(ticket)), lazyState_(std::move(lazyState)) { } TableHandleImpl::~TableHandleImpl() { - struct ReleaseCallback final : public SFCallback { - void onSuccess(ReleaseResponse resp) final { - // Do nothing - } - - void onFailure(std::exception_ptr ep) final { - // Do nothing - } - }; - auto cb = std::make_shared(); - managerImpl_->server()->releaseAsync(ticket_, std::move(cb)); + this->lazyState_->releaseAsync(); } std::shared_ptr TableHandleImpl::select(std::vector columnSpecs) { - auto [cb, ls] = TableHandleImpl::createEtcCallback(managerImpl_.get()); - auto resultTicket = managerImpl_->server()->selectAsync(ticket_, std::move(columnSpecs), - std::move(cb)); - return TableHandleImpl::create(shared_from_this(), managerImpl_, std::move(resultTicket), std::move(ls)); + auto *server = managerImpl_->server().get(); + auto resultTicket = server->newTicket(); + auto [cb, ls] = TableHandleImpl::createEtcCallback(shared_from_this(), managerImpl_.get(), resultTicket); + server->selectAsync(ticket_, std::move(columnSpecs), std::move(cb), resultTicket); + return TableHandleImpl::create(managerImpl_, std::move(resultTicket), std::move(ls)); } std::shared_ptr TableHandleImpl::update(std::vector columnSpecs) { - auto [cb, ls] = TableHandleImpl::createEtcCallback(managerImpl_.get()); - auto resultTicket = managerImpl_->server()->updateAsync(ticket_, std::move(columnSpecs), - std::move(cb)); - return TableHandleImpl::create(shared_from_this(), managerImpl_, std::move(resultTicket), std::move(ls)); + auto *server = managerImpl_->server().get(); + auto resultTicket = server->newTicket(); + auto [cb, ls] = TableHandleImpl::createEtcCallback(shared_from_this(), managerImpl_.get(), resultTicket); + server->updateAsync(ticket_, std::move(columnSpecs), std::move(cb), resultTicket); + return TableHandleImpl::create(managerImpl_, std::move(resultTicket), std::move(ls)); } std::shared_ptr TableHandleImpl::view(std::vector columnSpecs) { - auto [cb, ls] = TableHandleImpl::createEtcCallback(managerImpl_.get()); - auto resultTicket = managerImpl_->server()->viewAsync(ticket_, std::move(columnSpecs), - std::move(cb)); - return TableHandleImpl::create(shared_from_this(), managerImpl_, std::move(resultTicket), std::move(ls)); + auto *server = managerImpl_->server().get(); + auto resultTicket = server->newTicket(); + auto [cb, ls] = TableHandleImpl::createEtcCallback(shared_from_this(), managerImpl_.get(), resultTicket); + server->viewAsync(ticket_, std::move(columnSpecs), std::move(cb), resultTicket); + return TableHandleImpl::create(managerImpl_, std::move(resultTicket), std::move(ls)); } std::shared_ptr TableHandleImpl::dropColumns(std::vector columnSpecs) { - auto [cb, ls] = TableHandleImpl::createEtcCallback(managerImpl_.get()); - auto resultTicket = managerImpl_->server()->dropColumnsAsync(ticket_, std::move(columnSpecs), - std::move(cb)); - return TableHandleImpl::create(shared_from_this(), managerImpl_, std::move(resultTicket), std::move(ls)); + auto *server = managerImpl_->server().get(); + auto resultTicket = server->newTicket(); + auto [cb, ls] = TableHandleImpl::createEtcCallback(shared_from_this(), managerImpl_.get(), resultTicket); + server->dropColumnsAsync(ticket_, std::move(columnSpecs), std::move(cb), resultTicket); + return TableHandleImpl::create(managerImpl_, std::move(resultTicket), std::move(ls)); } std::shared_ptr TableHandleImpl::updateView(std::vector columnSpecs) { - auto [cb, ls] = TableHandleImpl::createEtcCallback(managerImpl_.get()); - auto resultTicket = managerImpl_->server()->updateViewAsync(ticket_, std::move(columnSpecs), - std::move(cb)); - return TableHandleImpl::create(shared_from_this(), managerImpl_, std::move(resultTicket), std::move(ls)); + auto *server = managerImpl_->server().get(); + auto resultTicket = server->newTicket(); + auto [cb, ls] = TableHandleImpl::createEtcCallback(shared_from_this(), managerImpl_.get(), resultTicket); + server->updateViewAsync(ticket_, std::move(columnSpecs), std::move(cb), resultTicket); + return TableHandleImpl::create(managerImpl_, std::move(resultTicket), std::move(ls)); } std::shared_ptr TableHandleImpl::where(std::string condition) { - auto [cb, ls] = TableHandleImpl::createEtcCallback(managerImpl_.get()); - auto resultTicket = managerImpl_->server()->whereAsync(ticket_, std::move(condition), - std::move(cb)); - return TableHandleImpl::create(shared_from_this(), managerImpl_, std::move(resultTicket), std::move(ls)); + auto *server = managerImpl_->server().get(); + auto resultTicket = server->newTicket(); + auto [cb, ls] = TableHandleImpl::createEtcCallback(shared_from_this(), managerImpl_.get(), resultTicket); + server->whereAsync(ticket_, std::move(condition), std::move(cb), resultTicket); + return TableHandleImpl::create(managerImpl_, std::move(resultTicket), std::move(ls)); } std::shared_ptr TableHandleImpl::sort(std::vector sortPairs) { - auto [cb, ls] = TableHandleImpl::createEtcCallback(managerImpl_.get()); + auto *server = managerImpl_->server().get(); + auto resultTicket = server->newTicket(); + auto [cb, ls] = TableHandleImpl::createEtcCallback(shared_from_this(), managerImpl_.get(), resultTicket); std::vector sortDescriptors; sortDescriptors.reserve(sortPairs.size()); - for (auto &sp : sortPairs) { + for (auto &sp: sortPairs) { auto which = sp.direction() == SortDirection::Ascending ? SortDescriptor::ASCENDING : SortDescriptor::DESCENDING; SortDescriptor sd; @@ -162,9 +159,8 @@ std::shared_ptr TableHandleImpl::sort(std::vector sor sd.set_direction(which); sortDescriptors.push_back(std::move(sd)); } - auto resultTicket = managerImpl_->server()->sortAsync(ticket_, std::move(sortDescriptors), - std::move(cb)); - return TableHandleImpl::create(shared_from_this(), managerImpl_, std::move(resultTicket), std::move(ls)); + server->sortAsync(ticket_, std::move(sortDescriptors), std::move(cb), resultTicket); + return TableHandleImpl::create(managerImpl_, std::move(resultTicket), std::move(ls)); } std::shared_ptr TableHandleImpl::preemptive(int32_t sampleIntervalMs) { @@ -177,14 +173,16 @@ std::shared_ptr TableHandleImpl::preemptive(int32_t sampleInter std::shared_ptr TableHandleImpl::defaultAggregateByDescriptor( ComboAggregateRequest::Aggregate descriptor, std::vector columnSpecs) { + auto *server = managerImpl_->server().get(); + auto resultTicket = server->newTicket(); std::vector descriptors; descriptors.reserve(1); descriptors.push_back(std::move(descriptor)); - auto [cb, ls] = TableHandleImpl::createEtcCallback(managerImpl_.get()); - auto resultTicket = managerImpl_->server()->comboAggregateDescriptorAsync(ticket_, - std::move(descriptors), std::move(columnSpecs), false, std::move(cb)); - return TableHandleImpl::create(shared_from_this(), managerImpl_, std::move(resultTicket), std::move(ls)); + auto [cb, ls] = TableHandleImpl::createEtcCallback(shared_from_this(), managerImpl_.get(), resultTicket); + server->comboAggregateDescriptorAsync(ticket_, std::move(descriptors), std::move(columnSpecs), + false, std::move(cb), resultTicket); + return TableHandleImpl::create(managerImpl_, std::move(resultTicket), std::move(ls)); } std::shared_ptr TableHandleImpl::defaultAggregateByType( @@ -201,10 +199,12 @@ std::shared_ptr TableHandleImpl::by(std::vector co std::shared_ptr TableHandleImpl::by( std::vector descriptors, std::vector groupByColumns) { - auto [cb, ls] = TableHandleImpl::createEtcCallback(managerImpl_.get()); - auto resultTicket = managerImpl_->server()->comboAggregateDescriptorAsync(ticket_, - std::move(descriptors), std::move(groupByColumns), false, std::move(cb)); - return TableHandleImpl::create(shared_from_this(), managerImpl_, std::move(resultTicket), std::move(ls)); + auto *server = managerImpl_->server().get(); + auto resultTicket = server->newTicket(); + auto [cb, ls] = TableHandleImpl::createEtcCallback(shared_from_this(), managerImpl_.get(), resultTicket); + server->comboAggregateDescriptorAsync(ticket_, std::move(descriptors), std::move(groupByColumns), + false, std::move(cb), resultTicket); + return TableHandleImpl::create(managerImpl_, std::move(resultTicket), std::move(ls)); } std::shared_ptr TableHandleImpl::minBy(std::vector columnSpecs) { @@ -289,10 +289,11 @@ std::shared_ptr TableHandleImpl::headBy(int64_t n, std::shared_ptr TableHandleImpl::headOrTailByHelper(int64_t n, bool head, std::vector columnSpecs) { - auto [cb, ls] = TableHandleImpl::createEtcCallback(managerImpl_.get()); - auto resultTicket = managerImpl_->server()->headOrTailByAsync(ticket_, head, n, - std::move(columnSpecs), std::move(cb)); - return TableHandleImpl::create(shared_from_this(), managerImpl_, std::move(resultTicket), std::move(ls)); + auto *server = managerImpl_->server().get(); + auto resultTicket = server->newTicket(); + auto [cb, ls] = TableHandleImpl::createEtcCallback(shared_from_this(), managerImpl_.get(), resultTicket); + server->headOrTailByAsync(ticket_, head, n, std::move(columnSpecs), std::move(cb), resultTicket); + return TableHandleImpl::create(managerImpl_, std::move(resultTicket), std::move(ls)); } std::shared_ptr TableHandleImpl::tail(int64_t n) { @@ -304,58 +305,70 @@ std::shared_ptr TableHandleImpl::head(int64_t n) { } std::shared_ptr TableHandleImpl::headOrTailHelper(bool head, int64_t n) { - auto [cb, ls] = TableHandleImpl::createEtcCallback(managerImpl_.get()); - auto resultTicket = managerImpl_->server()->headOrTailAsync(ticket_, head, n, std::move(cb)); - return TableHandleImpl::create(shared_from_this(), managerImpl_, std::move(resultTicket), std::move(ls)); + auto *server = managerImpl_->server().get(); + auto resultTicket = server->newTicket(); + auto [cb, ls] = TableHandleImpl::createEtcCallback(shared_from_this(), managerImpl_.get(), resultTicket); + server->headOrTailAsync(ticket_, head, n, std::move(cb), resultTicket); + return TableHandleImpl::create(managerImpl_, std::move(resultTicket), std::move(ls)); } std::shared_ptr TableHandleImpl::ungroup(bool nullFill, std::vector groupByColumns) { - auto [cb, ls] = TableHandleImpl::createEtcCallback(managerImpl_.get()); - auto resultTicket = managerImpl_->server()->ungroupAsync(ticket_, nullFill, std::move(groupByColumns), - std::move(cb)); - return TableHandleImpl::create(shared_from_this(), managerImpl_, std::move(resultTicket), std::move(ls)); + auto *server = managerImpl_->server().get(); + auto resultTicket = server->newTicket(); + auto [cb, ls] = TableHandleImpl::createEtcCallback(shared_from_this(), managerImpl_.get(), resultTicket); + server->ungroupAsync(ticket_, nullFill, std::move(groupByColumns), std::move(cb), resultTicket); + return TableHandleImpl::create(managerImpl_, std::move(resultTicket), std::move(ls)); } std::shared_ptr TableHandleImpl::merge(std::string keyColumn, std::vector sourceTickets) { - auto [cb, ls] = TableHandleImpl::createEtcCallback(managerImpl_.get()); - auto resultTicket = managerImpl_->server()->mergeAsync(std::move(sourceTickets), - std::move(keyColumn), std::move(cb)); - return TableHandleImpl::create(shared_from_this(), managerImpl_, std::move(resultTicket), std::move(ls)); + auto *server = managerImpl_->server().get(); + auto resultTicket = server->newTicket(); + auto [cb, ls] = TableHandleImpl::createEtcCallback(shared_from_this(), managerImpl_.get(), resultTicket); + server->mergeAsync(std::move(sourceTickets), std::move(keyColumn), std::move(cb), resultTicket); + return TableHandleImpl::create(managerImpl_, std::move(resultTicket), std::move(ls)); } std::shared_ptr TableHandleImpl::crossJoin(const TableHandleImpl &rightSide, - std::vector columnsToMatch, std::vector columnsToAdd) const { - auto [cb, ls] = TableHandleImpl::createEtcCallback(managerImpl_.get()); - auto resultTicket = managerImpl_->server()->crossJoinAsync(ticket_, rightSide.ticket_, - std::move(columnsToMatch), std::move(columnsToAdd), std::move(cb)); - return TableHandleImpl::create(shared_from_this(), managerImpl_, std::move(resultTicket), std::move(ls)); + std::vector columnsToMatch, std::vector columnsToAdd) { + auto *server = managerImpl_->server().get(); + auto resultTicket = server->newTicket(); + auto [cb, ls] = TableHandleImpl::createEtcCallback(shared_from_this(), managerImpl_.get(), resultTicket); + server->crossJoinAsync(ticket_, rightSide.ticket_, std::move(columnsToMatch), std::move(columnsToAdd), + std::move(cb), resultTicket); + return TableHandleImpl::create(managerImpl_, std::move(resultTicket), std::move(ls)); } std::shared_ptr TableHandleImpl::naturalJoin(const TableHandleImpl &rightSide, - std::vector columnsToMatch, std::vector columnsToAdd) const { - auto [cb, ls] = TableHandleImpl::createEtcCallback(managerImpl_.get()); - auto resultTicket = managerImpl_->server()->naturalJoinAsync(ticket_, rightSide.ticket_, - std::move(columnsToMatch), std::move(columnsToAdd), std::move(cb)); - return TableHandleImpl::create(shared_from_this(), managerImpl_, std::move(resultTicket), std::move(ls)); + std::vector columnsToMatch, std::vector columnsToAdd) { + auto *server = managerImpl_->server().get(); + auto resultTicket = server->newTicket(); + auto [cb, ls] = TableHandleImpl::createEtcCallback(shared_from_this(), managerImpl_.get(), resultTicket); + server->naturalJoinAsync(ticket_, rightSide.ticket_, std::move(columnsToMatch), + std::move(columnsToAdd), std::move(cb), resultTicket); + return TableHandleImpl::create(managerImpl_, std::move(resultTicket), std::move(ls)); } std::shared_ptr TableHandleImpl::exactJoin(const TableHandleImpl &rightSide, - std::vector columnsToMatch, std::vector columnsToAdd) const { - auto [cb, ls] = TableHandleImpl::createEtcCallback(managerImpl_.get()); - auto resultTicket = managerImpl_->server()->exactJoinAsync(ticket_, rightSide.ticket_, - std::move(columnsToMatch), std::move(columnsToAdd), std::move(cb)); - return TableHandleImpl::create(shared_from_this(), managerImpl_, std::move(resultTicket), std::move(ls)); + std::vector columnsToMatch, std::vector columnsToAdd) { + auto *server = managerImpl_->server().get(); + auto resultTicket = server->newTicket(); + auto [cb, ls] = TableHandleImpl::createEtcCallback(shared_from_this(), managerImpl_.get(), resultTicket); + server->exactJoinAsync(ticket_, rightSide.ticket_, std::move(columnsToMatch), std::move(columnsToAdd), + std::move(cb), resultTicket); + return TableHandleImpl::create(managerImpl_, std::move(resultTicket), std::move(ls)); } std::shared_ptr TableHandleImpl::asOfJoin(AsOfJoinTablesRequest::MatchRule matchRule, const TableHandleImpl &rightSide, std::vector columnsToMatch, std::vector columnsToAdd) { - auto [cb, ls] = TableHandleImpl::createEtcCallback(managerImpl_.get()); - auto resultTicket = managerImpl_->server()->asOfJoinAsync(matchRule, ticket_, - rightSide.ticket(), std::move(columnsToMatch), std::move(columnsToAdd), std::move(cb)); - return TableHandleImpl::create(shared_from_this(), managerImpl_, std::move(resultTicket), std::move(ls)); + auto *server = managerImpl_->server().get(); + auto resultTicket = server->newTicket(); + auto [cb, ls] = TableHandleImpl::createEtcCallback(shared_from_this(), managerImpl_.get(), resultTicket); + server->asOfJoinAsync(matchRule, ticket_, rightSide.ticket(), std::move(columnsToMatch), + std::move(columnsToAdd), std::move(cb), resultTicket); + return TableHandleImpl::create(managerImpl_, std::move(resultTicket), std::move(ls)); } std::shared_ptr TableHandleImpl::subscribe( @@ -502,16 +515,16 @@ bool TableHandleImpl::isStatic() { } namespace internal { -LazyStateInfo::LazyStateInfo(Ticket ticket, int64_t numRows, bool isStatic) : ticket_(std::move(ticket)), - numRows_(numRows), isStatic_(isStatic) {} +LazyStateInfo::LazyStateInfo(int64_t numRows, bool isStatic) : numRows_(numRows), isStatic_(isStatic) {} LazyStateInfo::LazyStateInfo(const LazyStateInfo &other) = default; LazyStateInfo &LazyStateInfo::operator=(const LazyStateInfo &other) = default; LazyStateInfo::LazyStateInfo(LazyStateInfo &&other) noexcept = default; LazyStateInfo &LazyStateInfo::operator=(LazyStateInfo &&other) noexcept = default; LazyStateInfo::~LazyStateInfo() = default; -ExportedTableCreationCallback::ExportedTableCreationCallback(CBPromise &&infoPromise) : - infoPromise_(std::move(infoPromise)) {} +ExportedTableCreationCallback::ExportedTableCreationCallback(std::shared_ptr dependency, + Ticket expectedTicket, CBPromise infoPromise) : dependency_(std::move(dependency)), + expectedTicket_(std::move(expectedTicket)), infoPromise_(std::move(infoPromise)) {} ExportedTableCreationCallback::~ExportedTableCreationCallback() = default; void ExportedTableCreationCallback::onSuccess(ExportedTableCreationResponse item) { @@ -521,7 +534,15 @@ void ExportedTableCreationCallback::onSuccess(ExportedTableCreationResponse item onFailure(std::move(ep)); return; } - LazyStateInfo info(std::move(*item.mutable_result_id()->mutable_ticket()), item.size(), item.is_static()); + + if (item.result_id().ticket().ticket() != expectedTicket_.ticket()) { + const char *message = "Result ticket was not equal to expected ticket"; + auto ep = std::make_exception_ptr(std::runtime_error(DEEPHAVEN_DEBUG_MSG(message))); + onFailure(std::move(ep)); + return; + } + + LazyStateInfo info(item.size(), item.is_static()); infoPromise_.setValue(std::move(info)); } @@ -601,8 +622,10 @@ class GetSchemaCallback final : public std::enable_shared_from_this { public: GetSchemaCallback(std::shared_ptr server, std::shared_ptr flightExecutor, - CBPromise> schemaPromise) : server_(std::move(server)), - flightExecutor_(std::move(flightExecutor)), schemaPromise_(std::move(schemaPromise)) {} + CBPromise> schemaPromise, Ticket ticket) : server_(std::move(server)), + flightExecutor_(std::move(flightExecutor)), schemaPromise_(std::move(schemaPromise)), + ticket_(std::move(ticket)) { + } ~GetSchemaCallback() final = default; void onFailure(std::exception_ptr ep) final { @@ -610,7 +633,6 @@ class GetSchemaCallback final : } void onSuccess(LazyStateInfo info) final { - ticket_ = std::move(info.ticket()); flightExecutor_->invoke(shared_from_this()); } @@ -630,12 +652,7 @@ class GetSchemaCallback final : } ); - arrow::flight::FlightDescriptor fd; - if (!ArrowUtil::tryConvertTicketToFlightDescriptor(ticket_.ticket(), &fd)) { - auto message = stringf("Couldn't convert ticket %o to a flight descriptor", ticket_.ticket()); - throw std::runtime_error(DEEPHAVEN_DEBUG_MSG(message)); - } - + auto fd = convertTicketToFlightDescriptor(ticket_.ticket()); std::unique_ptr schemaResult; auto gsResult = server_->flightClient()->GetSchema(options, fd, &schemaResult); okOrThrow(DEEPHAVEN_EXPR_MSG(gsResult)); @@ -663,9 +680,9 @@ class GetSchemaCallback final : }; LazyState::LazyState(std::shared_ptr server, std::shared_ptr flightExecutor, - CBFuture infoFuture) : server_(std::move(server)), + CBFuture infoFuture, Ticket ticket) : server_(std::move(server)), flightExecutor_(std::move(flightExecutor)), infoFuture_(std::move(infoFuture)), - schemaRequestSent_(false), schemaFuture_(schemaPromise_.makeFuture()) {} + ticket_(std::move(ticket)), schemaRequestSent_(false), schemaFuture_(schemaPromise_.makeFuture()) {} LazyState::~LazyState() = default; @@ -689,9 +706,38 @@ void LazyState::getSchemaAsync(std::shared_ptr(server_, flightExecutor_, std::move(schemaPromise_)); + auto cdCallback = std::make_shared(server_, flightExecutor_, std::move(schemaPromise_), ticket_); infoFuture_.invoke(std::move(cdCallback)); } + +void LazyState::releaseAsync() { + struct dualCallback_t final : public SFCallback, public SFCallback, + public std::enable_shared_from_this { + dualCallback_t(std::shared_ptr server, Ticket ticket) : server_(std::move(server)), + ticket_(std::move(ticket)) {} + ~dualCallback_t() final = default; + + void onSuccess(LazyStateInfo info) final { + // Once the ExportedTableCreationResponse has come back, then we can issue an Release, using ourself + // as a callback object again. + server_->releaseAsync(ticket_, shared_from_this()); + } + + void onSuccess(ReleaseResponse resp) final { + // Do nothing + } + + void onFailure(std::exception_ptr ep) final { + // Do nothing + } + + std::shared_ptr server_; + Ticket ticket_; + }; + + auto cb = std::make_shared(server_, ticket_); + infoFuture_.invoke(std::move(cb)); +} } // namespace internal } // namespace impl } // namespace deephaven::client diff --git a/cpp-client/deephaven/client/src/impl/table_handle_manager_impl.cc b/cpp-client/deephaven/client/src/impl/table_handle_manager_impl.cc index 6eb60bd6f5f..a1bfa17580d 100644 --- a/cpp-client/deephaven/client/src/impl/table_handle_manager_impl.cc +++ b/cpp-client/deephaven/client/src/impl/table_handle_manager_impl.cc @@ -36,22 +36,25 @@ TableHandleManagerImpl::TableHandleManagerImpl(Private, std::optional && TableHandleManagerImpl::~TableHandleManagerImpl() = default; std::shared_ptr TableHandleManagerImpl::emptyTable(int64_t size) { - auto [cb, ls] = TableHandleImpl::createEtcCallback(this); - auto resultTicket = server_->emptyTableAsync(size, cb); - return TableHandleImpl::create(nullptr, shared_from_this(), std::move(resultTicket), std::move(ls)); + auto resultTicket = server_->newTicket(); + auto [cb, ls] = TableHandleImpl::createEtcCallback(nullptr, this, resultTicket); + server_->emptyTableAsync(size, cb, resultTicket); + return TableHandleImpl::create(shared_from_this(), std::move(resultTicket), std::move(ls)); } std::shared_ptr TableHandleManagerImpl::fetchTable(std::string tableName) { - auto [cb, ls] = TableHandleImpl::createEtcCallback(this); - auto resultTicket = server_->fetchTableAsync(std::move(tableName), cb); - return TableHandleImpl::create(nullptr, shared_from_this(), std::move(resultTicket), std::move(ls)); + auto resultTicket = server_->newTicket(); + auto [cb, ls] = TableHandleImpl::createEtcCallback(nullptr, this, resultTicket); + server_->fetchTableAsync(std::move(tableName), cb, resultTicket); + return TableHandleImpl::create(shared_from_this(), std::move(resultTicket), std::move(ls)); } std::shared_ptr TableHandleManagerImpl::timeTable(int64_t startTimeNanos, int64_t periodNanos) { - auto [cb, ls] = TableHandleImpl::createEtcCallback(this); - auto resultTicket = server_->timeTableAsync(startTimeNanos, periodNanos, std::move(cb)); - return TableHandleImpl::create(nullptr, shared_from_this(), std::move(resultTicket), std::move(ls)); + auto resultTicket = server_->newTicket(); + auto [cb, ls] = TableHandleImpl::createEtcCallback(nullptr, this, resultTicket); + server_->timeTableAsync(startTimeNanos, periodNanos, std::move(cb), resultTicket); + return TableHandleImpl::create(shared_from_this(), std::move(resultTicket), std::move(ls)); } void TableHandleManagerImpl::runScriptAsync(std::string code, std::shared_ptr> callback) { @@ -78,15 +81,11 @@ void TableHandleManagerImpl::runScriptAsync(std::string code, std::shared_ptrexecuteCommandAsync(*consoleId_, std::move(code), std::move(cb)); } -std::tuple, arrow::flight::FlightDescriptor> -TableHandleManagerImpl::newTicket(int64_t numRows, bool isStatic) { - auto[ticket, fd] = server_->newTicketAndFlightDescriptor(); - - CBPromise infoPromise; - internal::LazyStateInfo info(ticket, numRows, isStatic); - infoPromise.setValue(std::move(info)); - auto ls = std::make_shared(server_, flightExecutor_, infoPromise.makeFuture()); - auto th = TableHandleImpl::create(nullptr, shared_from_this(), std::move(ticket), std::move(ls)); - return std::make_tuple(std::move(th), std::move(fd)); +std::shared_ptr TableHandleManagerImpl::makeTableHandleFromTicket(std::string ticket) { + Ticket resultTicket; + *resultTicket.mutable_ticket() = std::move(ticket); + auto [cb, ls] = TableHandleImpl::createEtcCallback(nullptr, this, resultTicket); + server_->getExportedTableCreationResponseAsync(resultTicket, std::move(cb)); + return TableHandleImpl::create(shared_from_this(), std::move(resultTicket), std::move(ls)); } } // namespace deephaven::client::impl diff --git a/cpp-client/deephaven/client/src/server/server.cc b/cpp-client/deephaven/client/src/server/server.cc index cc6d3a215af..68c24822a10 100644 --- a/cpp-client/deephaven/client/src/server/server.cc +++ b/cpp-client/deephaven/client/src/server/server.cc @@ -254,13 +254,6 @@ Ticket Server::newTicket() { return makeNewTicket(ticketId); } -std::tuple Server::newTicketAndFlightDescriptor() { - auto ticketId = nextFreeTicketId_++; - auto ticket = makeNewTicket(ticketId); - auto fd = arrow::flight::FlightDescriptor::Path({"export", std::to_string(ticketId)}); - return std::make_tuple(std::move(ticket), std::move(fd)); -} - void Server::getConfigurationConstantsAsync( std::shared_ptr> callback) { ConfigurationConstantsRequest req; @@ -284,119 +277,105 @@ void Server::executeCommandAsync(Ticket consoleId, std::string code, sendRpc(req, std::move(callback), consoleStub(), &ConsoleService::Stub::AsyncExecuteCommand); } -Ticket Server::emptyTableAsync(int64_t size, std::shared_ptr etcCallback) { - auto result = newTicket(); +void Server::getExportedTableCreationResponseAsync(Ticket ticket, std::shared_ptr callback) { + sendRpc(ticket, std::move(callback), tableStub(), &TableService::Stub::AsyncGetExportedTableCreationResponse); +} + +void Server::emptyTableAsync(int64_t size, std::shared_ptr etcCallback, Ticket result) { EmptyTableRequest req; - *req.mutable_result_id() = result; + *req.mutable_result_id() = std::move(result); req.set_size(size); sendRpc(req, std::move(etcCallback), tableStub(), &TableService::Stub::AsyncEmptyTable); - return result; } -Ticket Server::fetchTableAsync(std::string tableName, std::shared_ptr callback) { - auto result = newTicket(); +void Server::fetchTableAsync(std::string tableName, std::shared_ptr callback, Ticket result) { FetchTableRequest req; *req.mutable_source_id()->mutable_ticket() = makeScopeReference(tableName); - *req.mutable_result_id() = result; + *req.mutable_result_id() = std::move(result); sendRpc(req, std::move(callback), tableStub(), &TableService::Stub::AsyncFetchTable); - return result; } -Ticket Server::timeTableAsync(int64_t startTimeNanos, int64_t periodNanos, - std::shared_ptr callback) { - auto result = newTicket(); +void Server::timeTableAsync(int64_t startTimeNanos, int64_t periodNanos, std::shared_ptr callback, + Ticket result) { TimeTableRequest req; - *req.mutable_result_id() = result; + *req.mutable_result_id() = std::move(result); req.set_start_time_nanos(startTimeNanos); req.set_period_nanos(periodNanos); sendRpc(req, std::move(callback), tableStub(), &TableService::Stub::AsyncTimeTable); - return result; } -Ticket Server::selectAsync(Ticket parentTicket, std::vector columnSpecs, - std::shared_ptr etcCallback) { - return selectOrUpdateHelper(std::move(parentTicket), std::move(columnSpecs), - std::move(etcCallback), +void Server::selectAsync(Ticket parentTicket, std::vector columnSpecs, + std::shared_ptr etcCallback, Ticket result) { + selectOrUpdateHelper(std::move(parentTicket), std::move(columnSpecs), std::move(etcCallback), std::move(result), &TableService::Stub::AsyncSelect); } -Ticket Server::updateAsync(Ticket parentTicket, - std::vector columnSpecs, std::shared_ptr etcCallback) { - return selectOrUpdateHelper(std::move(parentTicket), std::move(columnSpecs), - std::move(etcCallback), +void Server::updateAsync(Ticket parentTicket, std::vector columnSpecs, + std::shared_ptr etcCallback, Ticket result) { + selectOrUpdateHelper(std::move(parentTicket), std::move(columnSpecs), std::move(etcCallback), std::move(result), &TableService::Stub::AsyncUpdate); } -Ticket Server::viewAsync(Ticket parentTicket, std::vector columnSpecs, - std::shared_ptr etcCallback) { - return selectOrUpdateHelper(std::move(parentTicket), std::move(columnSpecs), - std::move(etcCallback), +void Server::viewAsync(Ticket parentTicket, std::vector columnSpecs, + std::shared_ptr etcCallback, Ticket result) { + selectOrUpdateHelper(std::move(parentTicket), std::move(columnSpecs), std::move(etcCallback), std::move(result), &TableService::Stub::AsyncView); } -Ticket Server::updateViewAsync(Ticket parentTicket, - std::vector columnSpecs, std::shared_ptr etcCallback) { - return selectOrUpdateHelper(std::move(parentTicket), std::move(columnSpecs), - std::move(etcCallback), +void Server::updateViewAsync(Ticket parentTicket, std::vector columnSpecs, + std::shared_ptr etcCallback, Ticket result) { + selectOrUpdateHelper(std::move(parentTicket), std::move(columnSpecs), std::move(etcCallback), std::move(result), &TableService::Stub::AsyncUpdateView); } -Ticket Server::selectOrUpdateHelper(Ticket parentTicket, std::vector columnSpecs, - std::shared_ptr etcCallback, selectOrUpdateMethod_t method) { - auto result = newTicket(); +void Server::selectOrUpdateHelper(Ticket parentTicket, std::vector columnSpecs, + std::shared_ptr etcCallback, Ticket result, selectOrUpdateMethod_t method) { SelectOrUpdateRequest req; - *req.mutable_result_id() = result; + *req.mutable_result_id() = std::move(result); *req.mutable_source_id()->mutable_ticket() = std::move(parentTicket); for (auto &cs: columnSpecs) { *req.mutable_column_specs()->Add() = std::move(cs); } sendRpc(req, std::move(etcCallback), tableStub(), method); - return result; } -Ticket Server::dropColumnsAsync(Ticket parentTicket, std::vector columnSpecs, - std::shared_ptr etcCallback) { - auto result = newTicket(); +void Server::dropColumnsAsync(Ticket parentTicket, std::vector columnSpecs, + std::shared_ptr etcCallback, Ticket result) { DropColumnsRequest req; - *req.mutable_result_id() = result; + *req.mutable_result_id() = std::move(result); *req.mutable_source_id()->mutable_ticket() = std::move(parentTicket); moveVectorData(std::move(columnSpecs), req.mutable_column_names()); sendRpc(req, std::move(etcCallback), tableStub(), &TableService::Stub::AsyncDropColumns); - return result; } -Ticket Server::whereAsync(Ticket parentTicket, std::string condition, - std::shared_ptr etcCallback) { - auto result = newTicket(); +void Server::whereAsync(Ticket parentTicket, std::string condition,std::shared_ptr etcCallback, + Ticket result) { UnstructuredFilterTableRequest req; - *req.mutable_result_id() = result; + *req.mutable_result_id() = std::move(result); *req.mutable_source_id()->mutable_ticket() = std::move(parentTicket); *req.mutable_filters()->Add() = std::move(condition); sendRpc(req, std::move(etcCallback), tableStub(), &TableService::Stub::AsyncUnstructuredFilter); - return result; } -Ticket Server::sortAsync(Ticket parentTicket, std::vector sortDescriptors, - std::shared_ptr etcCallback) { - auto result = newTicket(); +void Server::sortAsync(Ticket parentTicket, std::vector sortDescriptors, + std::shared_ptr etcCallback, Ticket result) { SortTableRequest req; - *req.mutable_result_id() = result; + *req.mutable_result_id() = std::move(result); *req.mutable_source_id()->mutable_ticket() = std::move(parentTicket); for (auto &sd: sortDescriptors) { *req.mutable_sorts()->Add() = std::move(sd); } sendRpc(req, std::move(etcCallback), tableStub(), &TableService::Stub::AsyncSort); - return result; } -Ticket Server::comboAggregateDescriptorAsync(Ticket parentTicket, +void Server::comboAggregateDescriptorAsync(Ticket parentTicket, std::vector aggregates, std::vector groupByColumns, bool forceCombo, - std::shared_ptr etcCallback) { + std::shared_ptr etcCallback, + Ticket result) { - auto result = newTicket(); ComboAggregateRequest req; - *req.mutable_result_id() = result; + *req.mutable_result_id() = std::move(result); *req.mutable_source_id()->mutable_ticket() = std::move(parentTicket); for (auto &agg: aggregates) { *req.mutable_aggregates()->Add() = std::move(agg); @@ -406,14 +385,13 @@ Ticket Server::comboAggregateDescriptorAsync(Ticket parentTicket, } req.set_force_combo(forceCombo); sendRpc(req, std::move(etcCallback), tableStub(), &TableService::Stub::AsyncComboAggregate); - return result; } -Ticket Server::headOrTailByAsync(Ticket parentTicket, bool head, - int64_t n, std::vector columnSpecs, std::shared_ptr etcCallback) { - auto result = newTicket(); +void Server::headOrTailByAsync(Ticket parentTicket, bool head, + int64_t n, std::vector columnSpecs, std::shared_ptr etcCallback, + Ticket result) { HeadOrTailByRequest req; - *req.mutable_result_id() = result; + *req.mutable_result_id() = std::move(result); *req.mutable_source_id()->mutable_ticket() = std::move(parentTicket); req.set_num_rows(n); for (auto &cs: columnSpecs) { @@ -421,101 +399,87 @@ Ticket Server::headOrTailByAsync(Ticket parentTicket, bool head, } const auto &which = head ? &TableService::Stub::AsyncHeadBy : &TableService::Stub::AsyncTailBy; sendRpc(req, std::move(etcCallback), tableStub(), which); - return result; } -Ticket Server::headOrTailAsync(Ticket parentTicket, - bool head, int64_t n, std::shared_ptr etcCallback) { - auto result = newTicket(); +void Server::headOrTailAsync(Ticket parentTicket, bool head, int64_t n, std::shared_ptr etcCallback, + Ticket result) { HeadOrTailRequest req; - *req.mutable_result_id() = result; + *req.mutable_result_id() = std::move(result); *req.mutable_source_id()->mutable_ticket() = std::move(parentTicket); req.set_num_rows(n); const auto &which = head ? &TableService::Stub::AsyncHead : &TableService::Stub::AsyncTail; sendRpc(req, std::move(etcCallback), tableStub(), which); - return result; } -Ticket Server::ungroupAsync(Ticket parentTicket, bool nullFill, - std::vector groupByColumns, std::shared_ptr etcCallback) { - auto result = newTicket(); +void Server::ungroupAsync(Ticket parentTicket, bool nullFill, std::vector groupByColumns, + std::shared_ptr etcCallback, Ticket result) { UngroupRequest req; - *req.mutable_result_id() = result; + *req.mutable_result_id() = std::move(result); *req.mutable_source_id()->mutable_ticket() = std::move(parentTicket); req.set_null_fill(nullFill); moveVectorData(std::move(groupByColumns), req.mutable_columns_to_ungroup()); sendRpc(req, std::move(etcCallback), tableStub(), &TableService::Stub::AsyncUngroup); - return result; } -Ticket Server::mergeAsync(std::vector sourceTickets, std::string keyColumn, - std::shared_ptr etcCallback) { - auto result = newTicket(); +void Server::mergeAsync(std::vector sourceTickets, std::string keyColumn, + std::shared_ptr etcCallback, Ticket result) { MergeTablesRequest req; - *req.mutable_result_id() = result; + *req.mutable_result_id() = std::move(result); for (auto &t: sourceTickets) { *req.mutable_source_ids()->Add()->mutable_ticket() = std::move(t); } req.set_key_column(std::move(keyColumn)); sendRpc(req, std::move(etcCallback), tableStub(), &TableService::Stub::AsyncMergeTables); - return result; } -Ticket Server::crossJoinAsync(Ticket leftTableTicket, Ticket rightTableTicket, +void Server::crossJoinAsync(Ticket leftTableTicket, Ticket rightTableTicket, std::vector columnsToMatch, std::vector columnsToAdd, - std::shared_ptr etcCallback) { - auto result = newTicket(); + std::shared_ptr etcCallback, Ticket result) { CrossJoinTablesRequest req; - *req.mutable_result_id() = result; + *req.mutable_result_id() = std::move(result); *req.mutable_left_id()->mutable_ticket() = std::move(leftTableTicket); *req.mutable_right_id()->mutable_ticket() = std::move(rightTableTicket); moveVectorData(std::move(columnsToMatch), req.mutable_columns_to_match()); moveVectorData(std::move(columnsToAdd), req.mutable_columns_to_add()); sendRpc(req, std::move(etcCallback), tableStub(), &TableService::Stub::AsyncCrossJoinTables); - return result; } -Ticket Server::naturalJoinAsync(Ticket leftTableTicket, Ticket rightTableTicket, +void Server::naturalJoinAsync(Ticket leftTableTicket, Ticket rightTableTicket, std::vector columnsToMatch, std::vector columnsToAdd, - std::shared_ptr etcCallback) { - auto result = newTicket(); + std::shared_ptr etcCallback, Ticket result) { NaturalJoinTablesRequest req; - *req.mutable_result_id() = result; + *req.mutable_result_id() = std::move(result); *req.mutable_left_id()->mutable_ticket() = std::move(leftTableTicket); *req.mutable_right_id()->mutable_ticket() = std::move(rightTableTicket); moveVectorData(std::move(columnsToMatch), req.mutable_columns_to_match()); moveVectorData(std::move(columnsToAdd), req.mutable_columns_to_add()); sendRpc(req, std::move(etcCallback), tableStub(), &TableService::Stub::AsyncNaturalJoinTables); - return result; } -Ticket Server::exactJoinAsync(Ticket leftTableTicket, Ticket rightTableTicket, +void Server::exactJoinAsync(Ticket leftTableTicket, Ticket rightTableTicket, std::vector columnsToMatch, std::vector columnsToAdd, - std::shared_ptr etcCallback) { - auto result = newTicket(); + std::shared_ptr etcCallback, Ticket result) { ExactJoinTablesRequest req; - *req.mutable_result_id() = result; + *req.mutable_result_id() = std::move(result); *req.mutable_left_id()->mutable_ticket() = std::move(leftTableTicket); *req.mutable_right_id()->mutable_ticket() = std::move(rightTableTicket); moveVectorData(std::move(columnsToMatch), req.mutable_columns_to_match()); moveVectorData(std::move(columnsToAdd), req.mutable_columns_to_add()); sendRpc(req, std::move(etcCallback), tableStub(), &TableService::Stub::AsyncExactJoinTables); - return result; } -Ticket Server::asOfJoinAsync(AsOfJoinTablesRequest::MatchRule matchRule, Ticket leftTableTicket, +void Server::asOfJoinAsync(AsOfJoinTablesRequest::MatchRule matchRule, Ticket leftTableTicket, Ticket rightTableTicket, std::vector columnsToMatch, - std::vector columnsToAdd, std::shared_ptr etcCallback) { - auto result = newTicket(); + std::vector columnsToAdd, std::shared_ptr etcCallback, + Ticket result) { AsOfJoinTablesRequest req; - *req.mutable_result_id() = result; + *req.mutable_result_id() = std::move(result); *req.mutable_left_id()->mutable_ticket() = std::move(leftTableTicket); *req.mutable_right_id()->mutable_ticket() = std::move(rightTableTicket); moveVectorData(std::move(columnsToMatch), req.mutable_columns_to_match()); moveVectorData(std::move(columnsToAdd), req.mutable_columns_to_add()); req.set_as_of_match_rule(matchRule); sendRpc(req, std::move(etcCallback), tableStub(), &TableService::Stub::AsyncAsOfJoinTables); - return result; } void diff --git a/cpp-client/deephaven/client/src/utility/arrow_util.cc b/cpp-client/deephaven/client/src/utility/arrow_util.cc index b657b18ba48..8dacb3e4ff6 100644 --- a/cpp-client/deephaven/client/src/utility/arrow_util.cc +++ b/cpp-client/deephaven/client/src/utility/arrow_util.cc @@ -6,6 +6,8 @@ #include #include #include +#include +#include #include "deephaven/dhcore/utility/utility.h" using namespace std; @@ -20,4 +22,14 @@ void okOrThrow(const deephaven::dhcore::utility::DebugInfo &debugInfo, auto msg = stringf("Status: %o. Caller: %o", status, debugInfo); throw std::runtime_error(msg); } + +arrow::flight::FlightDescriptor convertTicketToFlightDescriptor(const std::string &ticket) { + if (ticket.length() != 5 || ticket[0] != 'e') { + const char *message = "Ticket is not in correct format for export"; + throw std::runtime_error(DEEPHAVEN_DEBUG_MSG(message)); + } + uint32_t value; + memcpy(&value, ticket.data() + 1, sizeof(uint32_t)); + return arrow::flight::FlightDescriptor::Path({"export", std::to_string(value)}); +}; } // namespace deephaven::client::utility diff --git a/cpp-client/deephaven/client/src/utility/table_maker.cc b/cpp-client/deephaven/client/src/utility/table_maker.cc index 417103464f0..3a86af22c51 100644 --- a/cpp-client/deephaven/client/src/utility/table_maker.cc +++ b/cpp-client/deephaven/client/src/utility/table_maker.cc @@ -37,18 +37,19 @@ void TableMaker::finishAddColumn(std::string name, internal::TypeConverter info) columns_.push_back(std::move(info.column())); } -TableHandle TableMaker::makeTable(const TableHandleManager &manager, int64_t numRows, bool isStatic) { +TableHandle TableMaker::makeTable(const TableHandleManager &manager) { auto schema = valueOrThrow(DEEPHAVEN_EXPR_MSG(schemaBuilder_.Finish())); auto wrapper = manager.createFlightWrapper(); - auto thfd = manager.newTableHandleAndFlightDescriptor(numRows, isStatic); + auto ticket = manager.newTicket(); + auto flightDescriptor = convertTicketToFlightDescriptor(ticket); arrow::flight::FlightCallOptions options; wrapper.addHeaders(&options); std::unique_ptr fsw; std::unique_ptr fmr; - okOrThrow(DEEPHAVEN_EXPR_MSG(wrapper.flightClient()->DoPut(options, thfd.flightDescriptor(), + okOrThrow(DEEPHAVEN_EXPR_MSG(wrapper.flightClient()->DoPut(options, flightDescriptor, schema, &fsw, &fmr))); auto batch = arrow::RecordBatch::Make(schema, numRows_, std::move(columns_)); @@ -58,7 +59,7 @@ TableHandle TableMaker::makeTable(const TableHandleManager &manager, int64_t num std::shared_ptr buf; okOrThrow(DEEPHAVEN_EXPR_MSG(fmr->ReadMetadata(&buf))); okOrThrow(DEEPHAVEN_EXPR_MSG(fsw->Close())); - return std::move(thfd.tableHandle()); + return manager.makeTableHandleFromTicket(std::move(ticket)); } namespace internal { diff --git a/cpp-client/tests/attributes_test.cc b/cpp-client/tests/attributes_test.cc index b056311c2df..c8ccd550712 100644 --- a/cpp-client/tests/attributes_test.cc +++ b/cpp-client/tests/attributes_test.cc @@ -39,4 +39,13 @@ TEST_CASE("TableHandle Dynamic Attributes", "[attributes]") { auto t = thm.timeTable(0, 1'000'000'000).update("II = ii"); CHECK(!t.isStatic()); } + +TEST_CASE("TableHandle Created by DoPut", "[attributes]") { + auto tm = TableMakerForTests::create(); + auto table = tm.table(); + CHECK(table.isStatic()); + // The columns all have the same size, so look at the source data for any one of them and get its size + auto expectedSize = int64_t(tm.columnData().importDate().size()); + CHECK(table.numRows() == expectedSize); +} } // namespace deephaven::client::tests diff --git a/cpp-client/tests/new_table_test.cc b/cpp-client/tests/new_table_test.cc index d17b40cc188..bdf80254be1 100644 --- a/cpp-client/tests/new_table_test.cc +++ b/cpp-client/tests/new_table_test.cc @@ -37,7 +37,7 @@ TEST_CASE("New Table", "[newtable]") { maker.addColumn("FloatValue", floatData); maker.addColumn("DoubleValue", doubleData); maker.addColumn("StringValue", stringData); - auto temp = maker.makeTable(tm.client().getManager(), (int64_t)byteData.size(), true); + auto temp = maker.makeTable(tm.client().getManager()); std::cout << temp.stream(true) << '\n'; } } // namespace deephaven::client::tests diff --git a/cpp-client/tests/select_test.cc b/cpp-client/tests/select_test.cc index 2540f8484a0..74bae373df2 100644 --- a/cpp-client/tests/select_test.cc +++ b/cpp-client/tests/select_test.cc @@ -70,7 +70,7 @@ TEST_CASE("Support all types", "[select]") { maker.addColumn("doubleData", doubleData); maker.addColumn("stringData", stringData); - auto t = maker.makeTable(tm.client().getManager(), (int64_t)boolData.size(), true); + auto t = maker.makeTable(tm.client().getManager()); std::cout << t.stream(true) << '\n'; @@ -99,7 +99,7 @@ TEST_CASE("Create / update / fetch a table", "[select]") { maker.addColumn("IntValue", intData); maker.addColumn("DoubleValue", doubleData); maker.addColumn("StringValue", stringData); - auto t = maker.makeTable(tm.client().getManager(), (int64_t)intData.size(), true); + auto t = maker.makeTable(tm.client().getManager()); auto t2 = t.update("Q2 = IntValue * 100"); std::cout << t2.stream(true) << '\n'; auto t3 = t2.update("Q3 = Q2 + 10"); diff --git a/cpp-client/tests/sort_test.cc b/cpp-client/tests/sort_test.cc index 05764163071..9adebcb065e 100644 --- a/cpp-client/tests/sort_test.cc +++ b/cpp-client/tests/sort_test.cc @@ -69,7 +69,7 @@ TEST_CASE("Sort temp table", "[sort]") { maker.addColumn("IntValue3", intData3); std::string tableName("sortData"); - auto tempTable = maker.makeTable(tm.client().getManager(), (int64_t)intData0.size(), true); + auto tempTable = maker.makeTable(tm.client().getManager()); auto iv0 = tempTable.getNumCol("IntValue0"); auto iv1 = tempTable.getNumCol("IntValue1"); diff --git a/cpp-client/tests/test_util.cc b/cpp-client/tests/test_util.cc index 930e4d8388d..df611c07983 100644 --- a/cpp-client/tests/test_util.cc +++ b/cpp-client/tests/test_util.cc @@ -110,7 +110,7 @@ TableMakerForTests TableMakerForTests::create() { maker.addColumn(cn.close(), cd.close()); maker.addColumn(cn.volume(), cd.volume()); - auto testTable = maker.makeTable(manager, (int64_t)cd.importDate().size(), true); + auto testTable = maker.makeTable(manager); return TableMakerForTests(std::move(client), std::move(testTable), std::move(cn), std::move(cd)); } diff --git a/cpp-examples/create_table_with_arrow_flight/main.cc b/cpp-examples/create_table_with_arrow_flight/main.cc index 0ea2bd8e36d..ad1489778a3 100644 --- a/cpp-examples/create_table_with_arrow_flight/main.cc +++ b/cpp-examples/create_table_with_arrow_flight/main.cc @@ -10,9 +10,10 @@ using deephaven::client::NumCol; using deephaven::client::Client; using deephaven::client::TableHandle; using deephaven::client::TableHandleManager; +using deephaven::client::utility::convertTicketToFlightDescriptor; using deephaven::client::utility::okOrThrow; -using deephaven::client::utility::valueOrThrow; using deephaven::client::utility::TableMaker; +using deephaven::client::utility::valueOrThrow; namespace { void doit(const TableHandleManager &manager); @@ -104,30 +105,36 @@ void doit(const TableHandleManager &manager) { // 8. Get a Deephaven "FlightWrapper" object to access Arrow Flight auto wrapper = manager.createFlightWrapper(); - // 9. Allocate a TableHandle and get its corresponding Arrow flight descriptor - auto [table, fd] = manager.newTableHandleAndFlightDescriptor(); + // 9. Allocate a Ticket to be used to reference the result + auto ticket = manager.newTicket(); // 10. DoPut takes FlightCallOptions, which need to at least contain the Deephaven // authentication headers for this session. arrow::flight::FlightCallOptions options; wrapper.addHeaders(&options); - // 11. Perform the doPut + // 11. Make a FlightDescriptor from the ticket + auto fd = deephaven::client::utility::convertTicketToFlightDescriptor(ticket); + + // 12. Perform the doPut std::unique_ptr fsw; std::unique_ptr fmr; okOrThrow(DEEPHAVEN_EXPR_MSG(wrapper.flightClient()->DoPut(options, fd, schema, &fsw, &fmr))); - // 12. Make a RecordBatch containing both the schema and the data + // 13. Make a RecordBatch containing both the schema and the data auto batch = arrow::RecordBatch::Make(schema, numRows, std::move(columns)); okOrThrow(DEEPHAVEN_EXPR_MSG(fsw->WriteRecordBatch(*batch))); okOrThrow(DEEPHAVEN_EXPR_MSG(fsw->DoneWriting())); - // 13. Read back a metadata message (ignored), then close the Writer + // 14. Read back a metadata message (ignored), then close the Writer std::shared_ptr buf; okOrThrow(DEEPHAVEN_EXPR_MSG(fmr->ReadMetadata(&buf))); okOrThrow(DEEPHAVEN_EXPR_MSG(fsw->Close())); - // 14. Use Deephaven high level operations to fetch the table and print it + // 15. Now that the table is ready, bind the ticket to a TableHandle. + auto table = manager.makeTableHandleFromTicket(ticket); + + // 16. Use Deephaven high level operations to fetch the table and print it std::cout << "table is:\n" << table.stream(true) << std::endl; } } // namespace diff --git a/cpp-examples/read_csv/main.cc b/cpp-examples/read_csv/main.cc index 1ead5a9c20e..df823a259f6 100644 --- a/cpp-examples/read_csv/main.cc +++ b/cpp-examples/read_csv/main.cc @@ -16,6 +16,7 @@ using deephaven::client::TableHandleManager; using deephaven::client::Client; +using deephaven::client::utility::convertTicketToFlightDescriptor; using deephaven::client::utility::okOrThrow; using deephaven::client::utility::valueOrThrow; @@ -67,11 +68,12 @@ arrow::Status doit(const TableHandleManager &manager, const std::string &csvfn) auto wrapper = manager.createFlightWrapper(); - auto [table_handle, fd] = manager.newTableHandleAndFlightDescriptor(); + auto ticket = manager.newTicket(); arrow::flight::FlightCallOptions options; wrapper.addHeaders(&options); + auto fd = convertTicketToFlightDescriptor(ticket); std::unique_ptr fsw; std::unique_ptr fmr; okOrThrow(DEEPHAVEN_EXPR_MSG( @@ -95,6 +97,7 @@ arrow::Status doit(const TableHandleManager &manager, const std::string &csvfn) okOrThrow(DEEPHAVEN_EXPR_MSG(fmr->ReadMetadata(&buf))); okOrThrow(DEEPHAVEN_EXPR_MSG(fsw->Close())); + auto table_handle = manager.makeTableHandleFromTicket(ticket); std::cout << "table is:\n" << table_handle.stream(true) << std::endl; table_handle.bindToVariable("showme"); return arrow::Status::OK();