Skip to content

Commit

Permalink
Modify C++ and R client wrapper to keep up with cpp client changes (#…
Browse files Browse the repository at this point in the history
…4034)

* Add more informative error message for bad message header

* Modify newTableFromArrowArrayStreamPtr and client_wrapper to keep with cpp client

* Revert change in ArrowFlightUtil

* Remove ok or throw comment

* Delete Read-and-delete-me
  • Loading branch information
alexpeters1208 authored Jun 20, 2023
1 parent 98fc858 commit 46cea12
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 16 deletions.
9 changes: 4 additions & 5 deletions R/rdeephaven/R/client_wrapper.R
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ Client <- R6Class("Client",
return(TableHandle$new(private$tibble_to_dh_table(table_object)))
}
else if (table_object_class[[1]] == "RecordBatchReader") {
num_rows = dim(table_object$read_table())[[1]] # TODO: delete when c++ api fix is merged
return(TableHandle$new(private$rbr_to_dh_table(table_object, num_rows))) # TODO: return(TableHandle$new(private$rbr_to_dh_table(table_object)))
return(TableHandle$new(private$rbr_to_dh_table(table_object)))
}
else if ((length(table_object_class) == 4 &&
table_object_class[[1]] == "Table" &&
Expand Down Expand Up @@ -99,15 +98,15 @@ Client <- R6Class("Client",
}
},

rbr_to_dh_table = function(rbr, num_rows) { # TODO: rbr_to_dh_table = function(rbr)
rbr_to_dh_table = function(rbr) {
ptr = private$internal_client$new_arrow_array_stream_ptr()
rbr$export_to_c(ptr)
return(private$internal_client$new_table_from_arrow_array_stream_ptr(ptr, num_rows)) # TODO: return(private$internal_client$new_table_from_arrow_array_stream_ptr(ptr))
return(private$internal_client$new_table_from_arrow_array_stream_ptr(ptr))
},

arrow_to_dh_table = function(arrow_tbl) {
rbr = as_record_batch_reader(arrow_tbl)
return(private$rbr_to_dh_table(rbr, dim(arrow_tbl)[1])) # TODO: return(private$rbr_to_dh_table(rbr))
return(private$rbr_to_dh_table(rbr))
},

tibble_to_dh_table = function(tibbl) {
Expand Down
8 changes: 0 additions & 8 deletions R/rdeephaven/Read-and-delete-me

This file was deleted.

11 changes: 8 additions & 3 deletions R/rdeephaven/src/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class TableHandleWrapper {
std::shared_ptr<arrow::flight::FlightStreamReader> fsr = internal_tbl_hdl.getFlightStreamReader();

std::vector<std::shared_ptr<arrow::RecordBatch>> empty_record_batches;
DEEPHAVEN_EXPR_MSG(fsr->ReadAll(&empty_record_batches)); // TODO: need to add OK or throw
deephaven::client::utility::okOrThrow(DEEPHAVEN_EXPR_MSG(fsr->ReadAll(&empty_record_batches)));

std::shared_ptr<arrow::RecordBatchReader> record_batch_reader = arrow::RecordBatchReader::Make(empty_record_batches).ValueOrDie();
ArrowArrayStream* stream_ptr = new ArrowArrayStream();
Expand Down Expand Up @@ -153,7 +153,7 @@ class ClientWrapper {
* Uses a pointer to a populated ArrowArrayStream C struct to create a new table on the server from the data in the C struct.
* @param stream_ptr Pointer to an existing and populated ArrayArrayStream, populated by a call to RecordBatchReader$export_to_c(ptr) from R.
*/
TableHandleWrapper* newTableFromArrowArrayStreamPtr(Rcpp::XPtr<ArrowArrayStream> stream_ptr, int64_t numRows) { // TODO: remove numRows arg when c++ api fix is merged
TableHandleWrapper* newTableFromArrowArrayStreamPtr(Rcpp::XPtr<ArrowArrayStream> stream_ptr) {

auto wrapper = internal_tbl_hdl_mngr.createFlightWrapper();
arrow::flight::FlightCallOptions options;
Expand All @@ -166,7 +166,10 @@ class ClientWrapper {
// write RecordBatchReader data to table on server with DoPut
std::unique_ptr<arrow::flight::FlightStreamWriter> fsw;
std::unique_ptr<arrow::flight::FlightMetadataReader> fmr;
auto [new_tbl_hdl, fd] = internal_tbl_hdl_mngr.newTableHandleAndFlightDescriptor(numRows, true); // TODO: remove numRows arg when c++ api fix is merged

auto ticket = internal_tbl_hdl_mngr.newTicket();
auto fd = deephaven::client::utility::convertTicketToFlightDescriptor(ticket);

deephaven::client::utility::okOrThrow(DEEPHAVEN_EXPR_MSG(wrapper.flightClient()->DoPut(options, fd, schema, &fsw, &fmr)));
while(true) {
std::shared_ptr<arrow::RecordBatch> this_batch;
Expand All @@ -178,6 +181,8 @@ class ClientWrapper {
}
deephaven::client::utility::okOrThrow(DEEPHAVEN_EXPR_MSG(fsw->DoneWriting()));
deephaven::client::utility::okOrThrow(DEEPHAVEN_EXPR_MSG(fsw->Close()));

auto new_tbl_hdl = internal_tbl_hdl_mngr.makeTableHandleFromTicket(ticket);
return new TableHandleWrapper(new_tbl_hdl);
}

Expand Down

0 comments on commit 46cea12

Please sign in to comment.