Skip to content

Commit

Permalink
Update R Client to keep up with cpp client, add is_static and num_rows (
Browse files Browse the repository at this point in the history
#4001)

* Update R Client to keep up with cpp client, add is_static and num_rows

* Revert API, still keep up with cpp, appropriate docs changes

* Fix formatting issue in cpp

* Fix names in code, change error message for dynamic tables

* Fix semicolons, nrow function name, error message
  • Loading branch information
alexpeters1208 committed Jun 16, 2023
1 parent 2b2630a commit f71a057
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 28 deletions.
9 changes: 5 additions & 4 deletions R/rdeephaven/R/client_wrapper.R
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ Client <- R6Class("Client",
return(TableHandle$new(private$tibble_to_dh_table(table_object)))
}
else if (table_object_class[[1]] == "RecordBatchReader") {
return(TableHandle$new(private$rbr_to_dh_table(table_object)))
num_rows = dim(table_object$read_table())[[1]] # TODO: delete when c++ api fix is merged
return(TableHandle$new(private$rbr_to_dh_table(table_object, num_rows))) # TODO: return(TableHandle$new(private$rbr_to_dh_table(table_object)))
}
else if ((length(table_object_class) == 4 &&
table_object_class[[1]] == "Table" &&
Expand Down Expand Up @@ -98,15 +99,15 @@ Client <- R6Class("Client",
}
},

rbr_to_dh_table = function(rbr) {
rbr_to_dh_table = function(rbr, num_rows) { # TODO: rbr_to_dh_table = function(rbr)
ptr = private$internal_client$new_arrow_array_stream_ptr()
rbr$export_to_c(ptr)
return(private$internal_client$new_table_from_arrow_array_stream_ptr(ptr))
return(private$internal_client$new_table_from_arrow_array_stream_ptr(ptr, num_rows)) # TODO: return(private$internal_client$new_table_from_arrow_array_stream_ptr(ptr))
},

arrow_to_dh_table = function(arrow_tbl) {
rbr = as_record_batch_reader(arrow_tbl)
return(private$rbr_to_dh_table(rbr))
return(private$rbr_to_dh_table(rbr, dim(arrow_tbl)[1])) # TODO: return(private$rbr_to_dh_table(rbr))
},

tibble_to_dh_table = function(tibbl) {
Expand Down
22 changes: 20 additions & 2 deletions R/rdeephaven/R/table_handle_wrapper.R
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,24 @@ TableHandle <- R6Class("TableHandle",
you are trying to call the constructor of TableHandle directly, which is not advised.")
}
private$internal_table_handle <- table_handle
private$is_static_field <- private$internal_table_handle$is_static()
},

#' @description
#' Whether the table referenced by this TableHandle is static or not.
#' @return TRUE if the table is static, or FALSE if the table is ticking.
is_static = function() {
return(private$is_static_field)
},

#' @description
#' Number of rows in the table referenced by this TableHandle, currently only implemented for static tables.
#' @return The number of rows in the table.
nrow = function() {
if(!private$is_static_field) {
stop("The number of rows is not yet supported for dynamic tables.")
}
return(private$internal_table_handle$num_rows())
},

#' @description
Expand Down Expand Up @@ -82,9 +100,9 @@ TableHandle <- R6Class("TableHandle",
arrow_tbl = self$to_arrow_table()
return(as.data.frame(as.data.frame(arrow_tbl))) # TODO: for some reason as.data.frame on arrow table returns a tibble, not a data frame
}

),
private = list(
internal_table_handle = NULL
internal_table_handle = NULL,
is_static_field = NULL
)
)
2 changes: 1 addition & 1 deletion R/rdeephaven/man/ClientOptions.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 28 additions & 0 deletions R/rdeephaven/man/TableHandle.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 37 additions & 21 deletions R/rdeephaven/src/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include "deephaven/client/client.h"
#include "deephaven/client/flight.h"
#include "deephaven/client/utility/arrow_util.h"

#include <arrow/c/abi.h>
#include <arrow/c/bridge.h>
Expand All @@ -24,6 +25,20 @@ class TableHandleWrapper {

// TODO: DEEPHAVEN QUERY METHODS WILL GO HERE

/**
* Whether the table was static at the time internal_tbl_hdl was created.
*/
bool isStatic() {
return internal_tbl_hdl.isStatic();
}

/**
* Number of rows in the table at the time internal_tbl_hdl was created.
*/
int64_t numRows() {
return internal_tbl_hdl.numRows();
}

/**
* Binds the table referenced by this table handle to a variable on the server called tableName.
* Without this call, new tables are not accessible from the client.
Expand All @@ -50,7 +65,7 @@ class TableHandleWrapper {

// XPtr is needed here to ensure Rcpp can properly handle type casting, as it does not like raw pointers
return Rcpp::XPtr<ArrowArrayStream>(stream_ptr, true);
};
}

private:
deephaven::client::TableHandle internal_tbl_hdl;
Expand All @@ -63,23 +78,23 @@ class ClientOptionsWrapper {

ClientOptionsWrapper() {
internal_options = new deephaven::client::ClientOptions();
};
}

void setDefaultAuthentication() {
internal_options->setDefaultAuthentication();
};
}

void setBasicAuthentication(const std::string &username, const std::string &password) {
internal_options->setBasicAuthentication(username, password);
};
}

void setCustomAuthentication(const std::string &authenticationKey, const std::string &authenticationValue) {
internal_options->setCustomAuthentication(authenticationKey, authenticationValue);
};
}

void setSessionType(const std::string &sessionType) {
internal_options->setSessionType(sessionType);
};
}

private:

Expand All @@ -95,19 +110,19 @@ class ClientWrapper {
/**
* Fetches a reference to a table named tableName on the server if it exists.
* @param tableName Name of the table to search for.
* @return TableHandle reference to the fetched table.
* @return TableHandle reference to the fetched table.
*/
TableHandleWrapper* openTable(std::string tableName) {
return new TableHandleWrapper(internal_tbl_hdl_mngr.fetchTable(tableName));
};
}

/**
* Runs a script on the server in the console language if a console was created.
* @param code String of the code to be executed on the server.
*/
void runScript(std::string code) {
internal_tbl_hdl_mngr.runScript(code);
};
}

/**
* Checks for the existence of a table named tableName on the server.
Expand All @@ -123,7 +138,7 @@ class ClientWrapper {
return false;
}
return true;
};
}

/**
* Allocates memory for an ArrowArrayStream C struct and returns a pointer to the new chunk of memory.
Expand All @@ -132,40 +147,39 @@ class ClientWrapper {
SEXP newArrowArrayStreamPtr() {
ArrowArrayStream* stream_ptr = new ArrowArrayStream();
return Rcpp::XPtr<ArrowArrayStream>(stream_ptr, true);
};
}

/**
* Uses a pointer to a populated ArrowArrayStream C struct to create a new table on the server from the data in the C struct.
* @param stream_ptr Pointer to an existing and populated ArrayArrayStream, populated by a call to RecordBatchReader$export_to_c(ptr) from R.
*/
TableHandleWrapper* newTableFromArrowArrayStreamPtr(Rcpp::XPtr<ArrowArrayStream> stream_ptr) {
TableHandleWrapper* newTableFromArrowArrayStreamPtr(Rcpp::XPtr<ArrowArrayStream> stream_ptr, int64_t numRows) { // TODO: remove numRows arg when c++ api fix is merged

auto wrapper = internal_tbl_hdl_mngr.createFlightWrapper();
arrow::flight::FlightCallOptions options;
wrapper.addAuthHeaders(&options);

// extract RecordBatchReader from the struct pointed to by the passed tream_ptr
// extract RecordBatchReader from the struct pointed to by the passed stream_ptr
std::shared_ptr<arrow::RecordBatchReader> record_batch_reader = arrow::ImportRecordBatchReader(stream_ptr.get()).ValueOrDie();
auto schema = record_batch_reader.get()->schema();

// write RecordBatchReader data to table on server with DoPut
std::unique_ptr<arrow::flight::FlightStreamWriter> fsw;
std::unique_ptr<arrow::flight::FlightMetadataReader> fmr;
auto [new_tbl_hdl, fd] = internal_tbl_hdl_mngr.newTableHandleAndFlightDescriptor();
DEEPHAVEN_EXPR_MSG(wrapper.flightClient()->DoPut(options, fd, schema, &fsw, &fmr)); // TODO: need to add okOrThrow
auto [new_tbl_hdl, fd] = internal_tbl_hdl_mngr.newTableHandleAndFlightDescriptor(numRows, true); // TODO: remove numRows arg when c++ api fix is merged
deephaven::client::utility::okOrThrow(DEEPHAVEN_EXPR_MSG(wrapper.flightClient()->DoPut(options, fd, schema, &fsw, &fmr)));
while(true) {
std::shared_ptr<arrow::RecordBatch> this_batch;
DEEPHAVEN_EXPR_MSG(record_batch_reader->ReadNext(&this_batch)); // TODO: need to add ok or throw
deephaven::client::utility::okOrThrow(DEEPHAVEN_EXPR_MSG(record_batch_reader->ReadNext(&this_batch)));
if (this_batch == nullptr) {
break;
}
DEEPHAVEN_EXPR_MSG(fsw->WriteRecordBatch(*this_batch)); // TODO: need to add okOrThrow
deephaven::client::utility::okOrThrow(DEEPHAVEN_EXPR_MSG(fsw->WriteRecordBatch(*this_batch)));
}
DEEPHAVEN_EXPR_MSG(fsw->DoneWriting()); // TODO: need to add okOrThrow
DEEPHAVEN_EXPR_MSG(fsw->Close()); // TODO: need to add okOrThrow

deephaven::client::utility::okOrThrow(DEEPHAVEN_EXPR_MSG(fsw->DoneWriting()));
deephaven::client::utility::okOrThrow(DEEPHAVEN_EXPR_MSG(fsw->Close()));
return new TableHandleWrapper(new_tbl_hdl);
};
}

private:
ClientWrapper(deephaven::client::Client ref) : internal_client(std::move(ref)) {};
Expand Down Expand Up @@ -201,6 +215,8 @@ RCPP_EXPOSED_CLASS(ArrowArrayStream)
RCPP_MODULE(DeephavenInternalModule) {

class_<TableHandleWrapper>("INTERNAL_TableHandle")
.method("is_static", &TableHandleWrapper::isStatic)
.method("num_rows", &TableHandleWrapper::numRows)
.method("bind_to_variable", &TableHandleWrapper::bindToVariable)
.method("get_arrow_array_stream_ptr", &TableHandleWrapper::getArrowArrayStreamPtr)
;
Expand Down

0 comments on commit f71a057

Please sign in to comment.