From f71a0579d096fda7dd7b9c9fc2a681ef96b575a9 Mon Sep 17 00:00:00 2001 From: Alex Peters <80283343+alexpeters1208@users.noreply.github.com> Date: Fri, 16 Jun 2023 17:23:45 -0500 Subject: [PATCH] Update R Client to keep up with cpp client, add is_static and num_rows (#4001) * Update R Client to keep up with cpp client, add is_static and num_rows * Revert API, still keep up with cpp, appropriate docs changes * Fix formatting issue in cpp * Fix names in code, change error message for dynamic tables * Fix semicolons, nrow function name, error message --- R/rdeephaven/R/client_wrapper.R | 9 +++-- R/rdeephaven/R/table_handle_wrapper.R | 22 +++++++++- R/rdeephaven/man/ClientOptions.Rd | 2 +- R/rdeephaven/man/TableHandle.Rd | 28 +++++++++++++ R/rdeephaven/src/client.cpp | 58 +++++++++++++++++---------- 5 files changed, 91 insertions(+), 28 deletions(-) diff --git a/R/rdeephaven/R/client_wrapper.R b/R/rdeephaven/R/client_wrapper.R index b02eea406b6..7bea747ef39 100644 --- a/R/rdeephaven/R/client_wrapper.R +++ b/R/rdeephaven/R/client_wrapper.R @@ -63,7 +63,8 @@ Client <- R6Class("Client", return(TableHandle$new(private$tibble_to_dh_table(table_object))) } else if (table_object_class[[1]] == "RecordBatchReader") { - return(TableHandle$new(private$rbr_to_dh_table(table_object))) + num_rows = dim(table_object$read_table())[[1]] # TODO: delete when c++ api fix is merged + return(TableHandle$new(private$rbr_to_dh_table(table_object, num_rows))) # TODO: return(TableHandle$new(private$rbr_to_dh_table(table_object))) } else if ((length(table_object_class) == 4 && table_object_class[[1]] == "Table" && @@ -98,15 +99,15 @@ Client <- R6Class("Client", } }, - rbr_to_dh_table = function(rbr) { + rbr_to_dh_table = function(rbr, num_rows) { # TODO: rbr_to_dh_table = function(rbr) ptr = private$internal_client$new_arrow_array_stream_ptr() rbr$export_to_c(ptr) - return(private$internal_client$new_table_from_arrow_array_stream_ptr(ptr)) + return(private$internal_client$new_table_from_arrow_array_stream_ptr(ptr, num_rows)) # TODO: return(private$internal_client$new_table_from_arrow_array_stream_ptr(ptr)) }, arrow_to_dh_table = function(arrow_tbl) { rbr = as_record_batch_reader(arrow_tbl) - return(private$rbr_to_dh_table(rbr)) + return(private$rbr_to_dh_table(rbr, dim(arrow_tbl)[1])) # TODO: return(private$rbr_to_dh_table(rbr)) }, tibble_to_dh_table = function(tibbl) { diff --git a/R/rdeephaven/R/table_handle_wrapper.R b/R/rdeephaven/R/table_handle_wrapper.R index b10380008a4..50cfc871a3a 100644 --- a/R/rdeephaven/R/table_handle_wrapper.R +++ b/R/rdeephaven/R/table_handle_wrapper.R @@ -35,6 +35,24 @@ TableHandle <- R6Class("TableHandle", you are trying to call the constructor of TableHandle directly, which is not advised.") } private$internal_table_handle <- table_handle + private$is_static_field <- private$internal_table_handle$is_static() + }, + + #' @description + #' Whether the table referenced by this TableHandle is static or not. + #' @return TRUE if the table is static, or FALSE if the table is ticking. + is_static = function() { + return(private$is_static_field) + }, + + #' @description + #' Number of rows in the table referenced by this TableHandle, currently only implemented for static tables. + #' @return The number of rows in the table. + nrow = function() { + if(!private$is_static_field) { + stop("The number of rows is not yet supported for dynamic tables.") + } + return(private$internal_table_handle$num_rows()) }, #' @description @@ -82,9 +100,9 @@ TableHandle <- R6Class("TableHandle", arrow_tbl = self$to_arrow_table() return(as.data.frame(as.data.frame(arrow_tbl))) # TODO: for some reason as.data.frame on arrow table returns a tibble, not a data frame } - ), private = list( - internal_table_handle = NULL + internal_table_handle = NULL, + is_static_field = NULL ) ) diff --git a/R/rdeephaven/man/ClientOptions.Rd b/R/rdeephaven/man/ClientOptions.Rd index 0ec21bac5ef..0ff38b95b96 100644 --- a/R/rdeephaven/man/ClientOptions.Rd +++ b/R/rdeephaven/man/ClientOptions.Rd @@ -107,7 +107,7 @@ Use custom (general key/value based) authentication. \if{html}{\out{}} \if{latex}{\out{\hypertarget{method-ClientOptions-set_session_type}{}}} \subsection{Method \code{set_session_type()}}{ -Set the session type of the console (e.g. "python", "groovy", etc.). The session type must be a type supported on the server. +Set the session type of the console (e.g., "python", "groovy", etc.). The session type must be supported on the server. \subsection{Usage}{ \if{html}{\out{
}}\preformatted{ClientOptions$set_session_type(session_type)}\if{html}{\out{
}} } diff --git a/R/rdeephaven/man/TableHandle.Rd b/R/rdeephaven/man/TableHandle.Rd index 0c7164977e4..c2bc107af0c 100644 --- a/R/rdeephaven/man/TableHandle.Rd +++ b/R/rdeephaven/man/TableHandle.Rd @@ -32,6 +32,8 @@ new_table_handle2$bind_to_variable("new_table") \subsection{Public methods}{ \itemize{ \item \href{#method-TableHandle-new}{\code{TableHandle$new()}} +\item \href{#method-TableHandle-is_static}{\code{TableHandle$is_static()}} +\item \href{#method-TableHandle-nrow}{\code{TableHandle$nrow()}} \item \href{#method-TableHandle-bind_to_variable}{\code{TableHandle$bind_to_variable()}} \item \href{#method-TableHandle-to_arrow_record_batch_stream_reader}{\code{TableHandle$to_arrow_record_batch_stream_reader()}} \item \href{#method-TableHandle-to_arrow_table}{\code{TableHandle$to_arrow_table()}} @@ -48,6 +50,32 @@ new_table_handle2$bind_to_variable("new_table") \if{html}{\out{
}}\preformatted{TableHandle$new(table_handle)}\if{html}{\out{
}} } +} +\if{html}{\out{
}} +\if{html}{\out{}} +\if{latex}{\out{\hypertarget{method-TableHandle-is_static}{}}} +\subsection{Method \code{is_static()}}{ +Whether the table referenced by this TableHandle is static or not. +\subsection{Usage}{ +\if{html}{\out{
}}\preformatted{TableHandle$is_static()}\if{html}{\out{
}} +} + +\subsection{Returns}{ +TRUE if the table is static, or FALSE if the table is ticking. +} +} +\if{html}{\out{
}} +\if{html}{\out{}} +\if{latex}{\out{\hypertarget{method-TableHandle-nrow}{}}} +\subsection{Method \code{nrow()}}{ +Number of rows in the table referenced by this TableHandle, currently only implemented for static tables. +\subsection{Usage}{ +\if{html}{\out{
}}\preformatted{TableHandle$nrow()}\if{html}{\out{
}} +} + +\subsection{Returns}{ +The number of rows in the table. +} } \if{html}{\out{
}} \if{html}{\out{}} diff --git a/R/rdeephaven/src/client.cpp b/R/rdeephaven/src/client.cpp index 2a38c4c0f03..a6cca6570ba 100644 --- a/R/rdeephaven/src/client.cpp +++ b/R/rdeephaven/src/client.cpp @@ -6,6 +6,7 @@ #include "deephaven/client/client.h" #include "deephaven/client/flight.h" +#include "deephaven/client/utility/arrow_util.h" #include #include @@ -24,6 +25,20 @@ class TableHandleWrapper { // TODO: DEEPHAVEN QUERY METHODS WILL GO HERE + /** + * Whether the table was static at the time internal_tbl_hdl was created. + */ + bool isStatic() { + return internal_tbl_hdl.isStatic(); + } + + /** + * Number of rows in the table at the time internal_tbl_hdl was created. + */ + int64_t numRows() { + return internal_tbl_hdl.numRows(); + } + /** * Binds the table referenced by this table handle to a variable on the server called tableName. * Without this call, new tables are not accessible from the client. @@ -50,7 +65,7 @@ class TableHandleWrapper { // XPtr is needed here to ensure Rcpp can properly handle type casting, as it does not like raw pointers return Rcpp::XPtr(stream_ptr, true); - }; + } private: deephaven::client::TableHandle internal_tbl_hdl; @@ -63,23 +78,23 @@ class ClientOptionsWrapper { ClientOptionsWrapper() { internal_options = new deephaven::client::ClientOptions(); - }; + } void setDefaultAuthentication() { internal_options->setDefaultAuthentication(); - }; + } void setBasicAuthentication(const std::string &username, const std::string &password) { internal_options->setBasicAuthentication(username, password); - }; + } void setCustomAuthentication(const std::string &authenticationKey, const std::string &authenticationValue) { internal_options->setCustomAuthentication(authenticationKey, authenticationValue); - }; + } void setSessionType(const std::string &sessionType) { internal_options->setSessionType(sessionType); - }; + } private: @@ -95,11 +110,11 @@ class ClientWrapper { /** * Fetches a reference to a table named tableName on the server if it exists. * @param tableName Name of the table to search for. - * @return TableHandle reference to the fetched table. + * @return TableHandle reference to the fetched table. */ TableHandleWrapper* openTable(std::string tableName) { return new TableHandleWrapper(internal_tbl_hdl_mngr.fetchTable(tableName)); - }; + } /** * Runs a script on the server in the console language if a console was created. @@ -107,7 +122,7 @@ class ClientWrapper { */ void runScript(std::string code) { internal_tbl_hdl_mngr.runScript(code); - }; + } /** * Checks for the existence of a table named tableName on the server. @@ -123,7 +138,7 @@ class ClientWrapper { return false; } return true; - }; + } /** * Allocates memory for an ArrowArrayStream C struct and returns a pointer to the new chunk of memory. @@ -132,40 +147,39 @@ class ClientWrapper { SEXP newArrowArrayStreamPtr() { ArrowArrayStream* stream_ptr = new ArrowArrayStream(); return Rcpp::XPtr(stream_ptr, true); - }; + } /** * Uses a pointer to a populated ArrowArrayStream C struct to create a new table on the server from the data in the C struct. * @param stream_ptr Pointer to an existing and populated ArrayArrayStream, populated by a call to RecordBatchReader$export_to_c(ptr) from R. */ - TableHandleWrapper* newTableFromArrowArrayStreamPtr(Rcpp::XPtr stream_ptr) { + TableHandleWrapper* newTableFromArrowArrayStreamPtr(Rcpp::XPtr stream_ptr, int64_t numRows) { // TODO: remove numRows arg when c++ api fix is merged auto wrapper = internal_tbl_hdl_mngr.createFlightWrapper(); arrow::flight::FlightCallOptions options; wrapper.addAuthHeaders(&options); - // extract RecordBatchReader from the struct pointed to by the passed tream_ptr + // extract RecordBatchReader from the struct pointed to by the passed stream_ptr std::shared_ptr record_batch_reader = arrow::ImportRecordBatchReader(stream_ptr.get()).ValueOrDie(); auto schema = record_batch_reader.get()->schema(); // write RecordBatchReader data to table on server with DoPut std::unique_ptr fsw; std::unique_ptr fmr; - auto [new_tbl_hdl, fd] = internal_tbl_hdl_mngr.newTableHandleAndFlightDescriptor(); - DEEPHAVEN_EXPR_MSG(wrapper.flightClient()->DoPut(options, fd, schema, &fsw, &fmr)); // TODO: need to add okOrThrow + auto [new_tbl_hdl, fd] = internal_tbl_hdl_mngr.newTableHandleAndFlightDescriptor(numRows, true); // TODO: remove numRows arg when c++ api fix is merged + deephaven::client::utility::okOrThrow(DEEPHAVEN_EXPR_MSG(wrapper.flightClient()->DoPut(options, fd, schema, &fsw, &fmr))); while(true) { std::shared_ptr this_batch; - DEEPHAVEN_EXPR_MSG(record_batch_reader->ReadNext(&this_batch)); // TODO: need to add ok or throw + deephaven::client::utility::okOrThrow(DEEPHAVEN_EXPR_MSG(record_batch_reader->ReadNext(&this_batch))); if (this_batch == nullptr) { break; } - DEEPHAVEN_EXPR_MSG(fsw->WriteRecordBatch(*this_batch)); // TODO: need to add okOrThrow + deephaven::client::utility::okOrThrow(DEEPHAVEN_EXPR_MSG(fsw->WriteRecordBatch(*this_batch))); } - DEEPHAVEN_EXPR_MSG(fsw->DoneWriting()); // TODO: need to add okOrThrow - DEEPHAVEN_EXPR_MSG(fsw->Close()); // TODO: need to add okOrThrow - + deephaven::client::utility::okOrThrow(DEEPHAVEN_EXPR_MSG(fsw->DoneWriting())); + deephaven::client::utility::okOrThrow(DEEPHAVEN_EXPR_MSG(fsw->Close())); return new TableHandleWrapper(new_tbl_hdl); - }; + } private: ClientWrapper(deephaven::client::Client ref) : internal_client(std::move(ref)) {}; @@ -201,6 +215,8 @@ RCPP_EXPOSED_CLASS(ArrowArrayStream) RCPP_MODULE(DeephavenInternalModule) { class_("INTERNAL_TableHandle") + .method("is_static", &TableHandleWrapper::isStatic) + .method("num_rows", &TableHandleWrapper::numRows) .method("bind_to_variable", &TableHandleWrapper::bindToVariable) .method("get_arrow_array_stream_ptr", &TableHandleWrapper::getArrowArrayStreamPtr) ;