From 46cea1223744765dabc2ba02dd04ca6873cc5dba Mon Sep 17 00:00:00 2001 From: Alex Peters <80283343+alexpeters1208@users.noreply.github.com> Date: Tue, 20 Jun 2023 16:18:24 -0500 Subject: [PATCH] Modify C++ and R client wrapper to keep up with cpp client changes (#4034) * Add more informative error message for bad message header * Modify newTableFromArrowArrayStreamPtr and client_wrapper to keep with cpp client * Revert change in ArrowFlightUtil * Remove ok or throw comment * Delete Read-and-delete-me --- R/rdeephaven/R/client_wrapper.R | 9 ++++----- R/rdeephaven/Read-and-delete-me | 8 -------- R/rdeephaven/src/client.cpp | 11 ++++++++--- 3 files changed, 12 insertions(+), 16 deletions(-) delete mode 100644 R/rdeephaven/Read-and-delete-me diff --git a/R/rdeephaven/R/client_wrapper.R b/R/rdeephaven/R/client_wrapper.R index 7bea747ef39..b02eea406b6 100644 --- a/R/rdeephaven/R/client_wrapper.R +++ b/R/rdeephaven/R/client_wrapper.R @@ -63,8 +63,7 @@ Client <- R6Class("Client", return(TableHandle$new(private$tibble_to_dh_table(table_object))) } else if (table_object_class[[1]] == "RecordBatchReader") { - num_rows = dim(table_object$read_table())[[1]] # TODO: delete when c++ api fix is merged - return(TableHandle$new(private$rbr_to_dh_table(table_object, num_rows))) # TODO: return(TableHandle$new(private$rbr_to_dh_table(table_object))) + return(TableHandle$new(private$rbr_to_dh_table(table_object))) } else if ((length(table_object_class) == 4 && table_object_class[[1]] == "Table" && @@ -99,15 +98,15 @@ Client <- R6Class("Client", } }, - rbr_to_dh_table = function(rbr, num_rows) { # TODO: rbr_to_dh_table = function(rbr) + rbr_to_dh_table = function(rbr) { ptr = private$internal_client$new_arrow_array_stream_ptr() rbr$export_to_c(ptr) - return(private$internal_client$new_table_from_arrow_array_stream_ptr(ptr, num_rows)) # TODO: return(private$internal_client$new_table_from_arrow_array_stream_ptr(ptr)) + return(private$internal_client$new_table_from_arrow_array_stream_ptr(ptr)) }, arrow_to_dh_table = function(arrow_tbl) { rbr = as_record_batch_reader(arrow_tbl) - return(private$rbr_to_dh_table(rbr, dim(arrow_tbl)[1])) # TODO: return(private$rbr_to_dh_table(rbr)) + return(private$rbr_to_dh_table(rbr)) }, tibble_to_dh_table = function(tibbl) { diff --git a/R/rdeephaven/Read-and-delete-me b/R/rdeephaven/Read-and-delete-me deleted file mode 100644 index dcaca43dc12..00000000000 --- a/R/rdeephaven/Read-and-delete-me +++ /dev/null @@ -1,8 +0,0 @@ -* Edit the help file skeletons in 'man', possibly combining help files for multiple functions. -* Edit the exports in 'NAMESPACE', and add necessary imports. -* Put any C/C++/Fortran code in 'src'. -* If you have compiled code, add a useDynLib() directive to 'NAMESPACE'. -* Run R CMD build to build the package tarball. -* Run R CMD check to check the package tarball. - -Read "Writing R Extensions" for more information. diff --git a/R/rdeephaven/src/client.cpp b/R/rdeephaven/src/client.cpp index 334cd56598b..121450fd838 100644 --- a/R/rdeephaven/src/client.cpp +++ b/R/rdeephaven/src/client.cpp @@ -57,7 +57,7 @@ class TableHandleWrapper { std::shared_ptr fsr = internal_tbl_hdl.getFlightStreamReader(); std::vector> empty_record_batches; - DEEPHAVEN_EXPR_MSG(fsr->ReadAll(&empty_record_batches)); // TODO: need to add OK or throw + deephaven::client::utility::okOrThrow(DEEPHAVEN_EXPR_MSG(fsr->ReadAll(&empty_record_batches))); std::shared_ptr record_batch_reader = arrow::RecordBatchReader::Make(empty_record_batches).ValueOrDie(); ArrowArrayStream* stream_ptr = new ArrowArrayStream(); @@ -153,7 +153,7 @@ class ClientWrapper { * Uses a pointer to a populated ArrowArrayStream C struct to create a new table on the server from the data in the C struct. * @param stream_ptr Pointer to an existing and populated ArrayArrayStream, populated by a call to RecordBatchReader$export_to_c(ptr) from R. */ - TableHandleWrapper* newTableFromArrowArrayStreamPtr(Rcpp::XPtr stream_ptr, int64_t numRows) { // TODO: remove numRows arg when c++ api fix is merged + TableHandleWrapper* newTableFromArrowArrayStreamPtr(Rcpp::XPtr stream_ptr) { auto wrapper = internal_tbl_hdl_mngr.createFlightWrapper(); arrow::flight::FlightCallOptions options; @@ -166,7 +166,10 @@ class ClientWrapper { // write RecordBatchReader data to table on server with DoPut std::unique_ptr fsw; std::unique_ptr fmr; - auto [new_tbl_hdl, fd] = internal_tbl_hdl_mngr.newTableHandleAndFlightDescriptor(numRows, true); // TODO: remove numRows arg when c++ api fix is merged + + auto ticket = internal_tbl_hdl_mngr.newTicket(); + auto fd = deephaven::client::utility::convertTicketToFlightDescriptor(ticket); + deephaven::client::utility::okOrThrow(DEEPHAVEN_EXPR_MSG(wrapper.flightClient()->DoPut(options, fd, schema, &fsw, &fmr))); while(true) { std::shared_ptr this_batch; @@ -178,6 +181,8 @@ class ClientWrapper { } deephaven::client::utility::okOrThrow(DEEPHAVEN_EXPR_MSG(fsw->DoneWriting())); deephaven::client::utility::okOrThrow(DEEPHAVEN_EXPR_MSG(fsw->Close())); + + auto new_tbl_hdl = internal_tbl_hdl_mngr.makeTableHandleFromTicket(ticket); return new TableHandleWrapper(new_tbl_hdl); }