Skip to content

Commit

Permalink
C++ Client: Add more GRPC entry points: Aj, Raj, LazyUpdate, WhereIn,…
Browse files Browse the repository at this point in the history
… etc (#4404)
  • Loading branch information
kosak authored Aug 30, 2023
1 parent 975475a commit 84199c7
Show file tree
Hide file tree
Showing 14 changed files with 719 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ class TableHandleImpl : public std::enable_shared_from_this<TableHandleImpl> {
[[nodiscard]]
std::shared_ptr<TableHandleImpl> Update(std::vector<std::string> column_specs);
[[nodiscard]]
std::shared_ptr<TableHandleImpl> LazyUpdate(std::vector<std::string> column_specs);
[[nodiscard]]
std::shared_ptr<TableHandleImpl> View(std::vector<std::string> column_specs);
[[nodiscard]]
std::shared_ptr<TableHandleImpl> DropColumns(std::vector<std::string> column_specs);
Expand Down Expand Up @@ -245,14 +247,28 @@ class TableHandleImpl : public std::enable_shared_from_this<TableHandleImpl> {
std::vector<std::string> columns_to_match, std::vector<std::string> columns_to_add);

[[nodiscard]]
std::shared_ptr<TableHandleImpl> AsOfJoin(AsOfJoinTablesRequest::MatchRule match_rule,
const TableHandleImpl &right_side, std::vector<std::string> columns_to_match,
std::vector<std::string> columns_to_add);
std::shared_ptr<TableHandleImpl> Aj(const TableHandleImpl &right_side,
std::vector<std::string> on, std::vector<std::string> joins);

[[nodiscard]]
std::shared_ptr<TableHandleImpl> Raj(const TableHandleImpl &right_side,
std::vector<std::string> on, std::vector<std::string> joins);

[[nodiscard]]
std::shared_ptr<TableHandleImpl> LeftOuterJoin(const TableHandleImpl &right_side,
std::vector<std::string> on, std::vector<std::string> joins);

[[nodiscard]]
std::shared_ptr<TableHandleImpl> UpdateBy(std::vector<std::shared_ptr<UpdateByOperationImpl>> ops,
std::vector<std::string> by);

[[nodiscard]]
std::shared_ptr<TableHandleImpl> SelectDistinct(std::vector<std::string> columns);

[[nodiscard]]
std::shared_ptr<TableHandleImpl> WhereIn(const TableHandleImpl &filter_table,
std::vector<std::string> columns);

[[nodiscard]]
std::vector<std::shared_ptr<ColumnImpl>> GetColumnImpls();
[[nodiscard]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ class Server : public std::enable_shared_from_this<Server> {
void UpdateAsync(Ticket parent_ticket, std::vector<std::string> column_specs,
std::shared_ptr<EtcCallback> etc_callback, Ticket result);

void LazyUpdateAsync(Ticket parent_ticket, std::vector<std::string> column_specs,
std::shared_ptr<EtcCallback> etc_callback, Ticket result);

void ViewAsync(Ticket parent_ticket, std::vector<std::string> column_specs,
std::shared_ptr<EtcCallback> etc_callback, Ticket result);

Expand Down Expand Up @@ -231,14 +234,28 @@ class Server : public std::enable_shared_from_this<Server> {
std::vector<std::string> columns_to_match, std::vector<std::string> columns_to_add,
std::shared_ptr<EtcCallback> etc_callback, Ticket result);

void AsOfJoinAsync(AsOfJoinTablesRequest::MatchRule match_rule, Ticket left_table_ticket,
Ticket right_table_ticket, std::vector<std::string> columns_to_match,
std::vector<std::string> columns_to_add, std::shared_ptr<EtcCallback> etc_callback, Ticket result);
void AjAsync(Ticket left_table_ticket, Ticket right_table_ticket,
std::vector<std::string> on, std::vector<std::string> joins,
std::shared_ptr<EtcCallback> etc_callback, Ticket result);

void RajAsync(Ticket left_table_ticket, Ticket right_table_ticket,
std::vector<std::string> on, std::vector<std::string> joins,
std::shared_ptr<EtcCallback> etc_callback, Ticket result);

void LeftOuterJoinAsync(Ticket left_table_ticket, Ticket right_table_ticket,
std::vector<std::string> on, std::vector<std::string> joins,
std::shared_ptr<EtcCallback> etc_callback, Ticket result);

void UpdateByAsync(Ticket source, std::vector<UpdateByOperation> operations,
std::vector<std::string> group_by_columns,
std::shared_ptr<EtcCallback> etc_callback, Ticket result);

void SelectDistinctAsync(Ticket source, std::vector<std::string> columns,
std::shared_ptr<EtcCallback> etc_callback, Ticket result);

void WhereInAsync(Ticket left_table_ticket, Ticket right_table_ticket,
std::vector<std::string> columns, std::shared_ptr<EtcCallback> etc_callback, Ticket result);

void BindToVariableAsync(const Ticket &console_id, const Ticket &table_id, std::string variable,
std::shared_ptr<SFCallback<BindTableToVariableResponse>> callback);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -788,13 +788,22 @@ class TableHandle {
TableHandle Update(Args &&... args) const;
/**
* Creates a new table from this table, but including the additional specified columns.
* @param columnSpecs The columnSpecs to add. For exampe, {"X = A + 5", "Y = X * 2"}.
* @param columnSpecs The columnSpecs to add. For example, {"X = A + 5", "Y = X * 2"}.
* See the Deephaven documentation for the difference between Update() and UpdateView().
* @return A TableHandle referencing the new table
*/
[[nodiscard]]
TableHandle Update(std::vector<std::string> column_specs) const;

/**
* Creates a new table containing a new cached formula column for each argument.
* @param columnSpecs The columnSpecs to add. For exampe, {"X = A + 5", "Y = X * 2"}.
* See the Deephaven documentation for the difference between Update() and LazyUpdate().
* @return A TableHandle referencing the new table
*/
[[nodiscard]]
TableHandle LazyUpdate(std::vector<std::string> column_specs) const;

/**
* A variadic form of UpdateView(std::vector<std::string>) const that takes a combination of
* argument types.
Expand Down Expand Up @@ -1351,15 +1360,96 @@ class TableHandle {
* @param rightSide The table to join with this table
* @param columnsToMatch The columns to join on
* @param columnsToAdd The columns from the right side to add, and possibly rename.
* @return
* @return A TableHandle referencing the new table
*/
[[nodiscard]]
TableHandle ExactJoin(const TableHandle &right_side, std::vector<MatchWithColumn> columnsToMatch,
std::vector<SelectColumn> columns_to_add) const;

/**
* Creates a new table containing all the rows and columns of the left table, plus additional
* columns containing data from the right table. For columns appended to the left table (joins),
* row values equal the row values from the right table where the keys from the left table most
* closely match the keys from the right table without going over. If there is no matching key in
* the right table, appended row values are NULL.
*
* @param right_side The table to join with this table
* @param on The column(s) to match, can be a common name or a match condition of two
* columns, e.g. 'col_a = col_b'. The first 'N-1' matches are exact matches. The final match is
* an inexact match. The inexact match can use either '>' or '>='. If a common name is used
* for the inexact match, '>=' is used for the comparison.
* @param joins The column(s) to be added from the right table to the result table, can be
* renaming expressions, i.e. "new_col = col"; default is empty, which means all the columns
* from the right table, excluding those specified in 'on'
*/
[[nodiscard]]
TableHandle Aj(const TableHandle &right_side, std::vector<std::string> on,
std::vector<std::string> joins = {}) const;

/**
* Creates a new table containing all the rows and columns of the left table, plus additional
* columns containing data from the right table. For columns appended to the left table (joins),
* row values equal the row values from the right table where the keys from the left table most closely
* match the keys from the right table without going under. If there is no matching key in the
* right table, appended row values are NULL.
*
* @param right_side The table to join with this table
* @param on The column(s) to match, can be a common name or a match condition of two
* columns, e.g. 'col_a = col_b'. The first 'N-1' matches are exact matches. The final match is
* an inexact match. The inexact match can use either '<' or '<='. If a common name is used
* for the inexact match, '<=' is used for the comparison.
* @param joins The column(s) to be added from the right table to the result table, can be
* renaming expressions, i.e. "new_col = col"; default is empty, which means all the columns
* from the right table, excluding those specified in 'on'
*/
[[nodiscard]]
TableHandle Raj(const TableHandle &right_side, std::vector<std::string> on,
std::vector<std::string> joins = {}) const;

// [[nodiscard]]
// TableHandle RangeJoin(const TableHandle &right_side, std::vector<std::string> on,
// std::vector<Aggregate> aggregations) const;


[[nodiscard]]
TableHandle LeftOuterJoin(const TableHandle &right_side, std::vector<std::string> on,
std::vector<std::string> joins) const;

/**
* Performs one or more UpdateByOperation ops grouped by zero or more key columns to calculate
* cumulative or window-based aggregations of columns in a source table. Operations include
* cumulative sums, moving averages, EMAs, etc. The aggregations are defined by the provided
* operations, which support incremental aggregations over the corresponding rows in the source
* table. Cumulative aggregations use all rows in the source table, whereas rolling aggregations
* will apply position or time-based windowing relative to the current row. Calculations are
* performed over all rows or each row group as identified by the provided key columns.
* @param ops The requested UpdateByOperation ops
* @param by The columns to group by
* @return A TableHandle referencing the new table
*/
[[nodiscard]]
TableHandle UpdateBy(std::vector<UpdateByOperation> ops, std::vector<std::string> by) const;

/**
* Creates a new table containing all of the unique values for a set of key columns.
* When used on multiple columns, it looks for distinct sets of values in the selected columns.
* @param columns The set of key columns
* @return A TableHandle referencing the new table
*/
[[nodiscard]]
TableHandle SelectDistinct(std::vector<std::string> columns) const;

/**
* Creates a new table containing rows from the source table, where the rows match values in the
* filter table. The filter is updated whenever either table changes. See the Deephaven
* documentation for the difference between "Where" and "WhereIn".
* @param filter_table The table containing the set of values to filter on
* @param columns The columns to match on
* @return
*/
[[nodiscard]]
TableHandle WhereIn(const TableHandle &filter_table, std::vector<std::string> columns) const;

/**
* Binds this table to a variable name in the QueryScope.
* @param variable The QueryScope variable to bind to.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,34 @@ TypeConverter TypeConverter::CreateNew(const std::vector<T> &values) {
auto array = builder_res.ValueUnsafe();
return TypeConverter(std::move(data_type), traits_t::kDeephavenTypeName, std::move(array));
}

template<>
inline TypeConverter TypeConverter::CreateNew(const std::vector<deephaven::dhcore::DateTime> &values) {
using deephaven::client::utility::OkOrThrow;
using deephaven::dhcore::utility::Stringf;

// TODO(kosak): put somewhere
constexpr const char *kDeephavenTypeName = "java.time.ZonedDateTime";

auto data_type = arrow::timestamp(arrow::TimeUnit::NANO, "UTC");
arrow::TimestampBuilder builder(data_type, arrow::default_memory_pool());
for (const auto &value : values) {
bool valid;
const auto *contained_value = TryGetContainedValue(&value, &valid);
if (valid) {
OkOrThrow(DEEPHAVEN_EXPR_MSG(builder.Append(contained_value->Nanos())));
} else {
OkOrThrow(DEEPHAVEN_EXPR_MSG(builder.AppendNull()));
}
}
auto builder_res = builder.Finish();
if (!builder_res.ok()) {
auto message = Stringf("Error building array of type %o: %o",
kDeephavenTypeName, builder_res.status().ToString());
}
auto array = builder_res.ValueUnsafe();
return TypeConverter(std::move(data_type), kDeephavenTypeName, std::move(array));
}
} // namespace internal

template<typename T>
Expand Down
38 changes: 38 additions & 0 deletions cpp-client/deephaven/dhclient/src/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ Aggregate Aggregate::First(std::vector<std::string> column_specs) {
return createAggForMatchPairs(ComboAggregateRequest::FIRST, std::move(column_specs));
}

Aggregate Aggregate::Group(std::vector<std::string> column_specs) {
return createAggForMatchPairs(ComboAggregateRequest::GROUP, std::move(column_specs));
}

Aggregate Aggregate::Last(std::vector<std::string> column_specs) {
return createAggForMatchPairs(ComboAggregateRequest::LAST, std::move(column_specs));
}
Expand Down Expand Up @@ -318,6 +322,11 @@ TableHandle TableHandle::Update(std::vector<std::string> columnSpecs) const {
return TableHandle(std::move(qt_impl));
}

TableHandle TableHandle::LazyUpdate(std::vector<std::string> columnSpecs) const {
auto qt_impl = impl_->LazyUpdate(std::move(columnSpecs));
return TableHandle(std::move(qt_impl));
}

TableHandle TableHandle::View(std::vector<std::string> columnSpecs) const {
auto qt_impl = impl_->View(std::move(columnSpecs));
return TableHandle(std::move(qt_impl));
Expand Down Expand Up @@ -499,6 +508,24 @@ TableHandle TableHandle::ExactJoin(const TableHandle &rightSide,
return TableHandle(std::move(qt_impl));
}

TableHandle TableHandle::Aj(const TableHandle &right_side,
std::vector<std::string> on, std::vector<std::string> joins) const {
auto qt_impl = impl_->Aj(*right_side.impl_, std::move(on), std::move(joins));
return TableHandle(std::move(qt_impl));
}

TableHandle TableHandle::Raj(const TableHandle &right_side,
std::vector<std::string> on, std::vector<std::string> joins) const {
auto qt_impl = impl_->Raj(*right_side.impl_, std::move(on), std::move(joins));
return TableHandle(std::move(qt_impl));
}

TableHandle TableHandle::LeftOuterJoin(const TableHandle &right_side, std::vector<std::string> on,
std::vector<std::string> joins) const {
auto qt_impl = impl_->LeftOuterJoin(*right_side.impl_, std::move(on), std::move(joins));
return TableHandle(std::move(qt_impl));
}

TableHandle TableHandle::ExactJoin(const TableHandle &rightSide,
std::vector<MatchWithColumn> columnsToMatch, std::vector<SelectColumn> columnsToAdd) const {
auto ctm_strings = toIrisRepresentation(columnsToMatch);
Expand All @@ -515,6 +542,17 @@ TableHandle TableHandle::UpdateBy(std::vector<UpdateByOperation> ops, std::vecto
return TableHandle(std::move(th_impl));
}

TableHandle TableHandle::SelectDistinct(std::vector<std::string> columns) const {
auto qt_impl = impl_->SelectDistinct(std::move(columns));
return TableHandle(std::move(qt_impl));
}

TableHandle TableHandle::WhereIn(const TableHandle &filter_table,
std::vector<std::string> columns) const {
auto th_impl = impl_->WhereIn(*filter_table.impl_, std::move(columns));
return TableHandle(std::move(th_impl));
}

void TableHandle::BindToVariable(std::string variable) const {
auto res = SFCallback<>::CreateForFuture();
BindToVariableAsync(std::move(variable), std::move(res.first));
Expand Down
56 changes: 51 additions & 5 deletions cpp-client/deephaven/dhclient/src/impl/table_handle_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,14 @@ std::shared_ptr<TableHandleImpl> TableHandleImpl::Update(std::vector<std::string
return TableHandleImpl::Create(managerImpl_, std::move(result_ticket), std::move(ls));
}

std::shared_ptr<TableHandleImpl> TableHandleImpl::LazyUpdate(std::vector<std::string> column_specs) {
auto *server = managerImpl_->Server().get();
auto result_ticket = server->NewTicket();
auto [cb, ls] = TableHandleImpl::CreateEtcCallback(shared_from_this(), managerImpl_.get(), result_ticket);
server->LazyUpdateAsync(ticket_, std::move(column_specs), std::move(cb), result_ticket);
return TableHandleImpl::Create(managerImpl_, std::move(result_ticket), std::move(ls));
}

std::shared_ptr<TableHandleImpl> TableHandleImpl::View(std::vector<std::string> column_specs) {
auto *server = managerImpl_->Server().get();
auto result_ticket = server->NewTicket();
Expand Down Expand Up @@ -361,14 +369,33 @@ std::shared_ptr<TableHandleImpl> TableHandleImpl::ExactJoin(const TableHandleImp
return TableHandleImpl::Create(managerImpl_, std::move(result_ticket), std::move(ls));
}

std::shared_ptr<TableHandleImpl> TableHandleImpl::AsOfJoin(AsOfJoinTablesRequest::MatchRule match_rule,
const TableHandleImpl &right_side, std::vector<std::string> columns_to_match,
std::vector<std::string> columns_to_add) {
std::shared_ptr<TableHandleImpl> TableHandleImpl::Aj(const TableHandleImpl &right_side,
std::vector<std::string> on, std::vector<std::string> joins) {
auto *server = managerImpl_->Server().get();
auto result_ticket = server->NewTicket();
auto [cb, ls] = TableHandleImpl::CreateEtcCallback(shared_from_this(), managerImpl_.get(), result_ticket);
server->AsOfJoinAsync(match_rule, ticket_, right_side.Ticket(), std::move(columns_to_match),
std::move(columns_to_add), std::move(cb), result_ticket);
server->AjAsync(ticket_, right_side.ticket_, std::move(on), std::move(joins),
std::move(cb), result_ticket);
return TableHandleImpl::Create(managerImpl_, std::move(result_ticket), std::move(ls));
}

std::shared_ptr<TableHandleImpl> TableHandleImpl::Raj(const TableHandleImpl &right_side,
std::vector<std::string> on, std::vector<std::string> joins) {
auto *server = managerImpl_->Server().get();
auto result_ticket = server->NewTicket();
auto [cb, ls] = TableHandleImpl::CreateEtcCallback(shared_from_this(), managerImpl_.get(), result_ticket);
server->RajAsync(ticket_, right_side.ticket_, std::move(on), std::move(joins),
std::move(cb), result_ticket);
return TableHandleImpl::Create(managerImpl_, std::move(result_ticket), std::move(ls));
}

std::shared_ptr<TableHandleImpl> TableHandleImpl::LeftOuterJoin(const TableHandleImpl &right_side,
std::vector<std::string> on, std::vector<std::string> joins) {
auto *server = managerImpl_->Server().get();
auto result_ticket = server->NewTicket();
auto [cb, ls] = TableHandleImpl::CreateEtcCallback(shared_from_this(), managerImpl_.get(), result_ticket);
server->LeftOuterJoinAsync(ticket_, right_side.ticket_, std::move(on), std::move(joins),
std::move(cb), result_ticket);
return TableHandleImpl::Create(managerImpl_, std::move(result_ticket), std::move(ls));
}

Expand All @@ -386,6 +413,25 @@ TableHandleImpl::UpdateBy(std::vector<std::shared_ptr<UpdateByOperationImpl>> op
return TableHandleImpl::Create(managerImpl_, std::move(result_ticket), std::move(ls));
}

std::shared_ptr<TableHandleImpl>
TableHandleImpl::SelectDistinct(std::vector<std::string> columns) {
auto *server = managerImpl_->Server().get();
auto result_ticket = server->NewTicket();
auto [cb, ls] = TableHandleImpl::CreateEtcCallback(shared_from_this(), managerImpl_.get(), result_ticket);
server->SelectDistinctAsync(ticket_, std::move(columns), std::move(cb), result_ticket);
return TableHandleImpl::Create(managerImpl_, std::move(result_ticket), std::move(ls));
}

std::shared_ptr<TableHandleImpl>
TableHandleImpl::WhereIn(const deephaven::client::impl::TableHandleImpl &filter_table,
std::vector<std::string> columns) {
auto *server = managerImpl_->Server().get();
auto result_ticket = server->NewTicket();
auto [cb, ls] = TableHandleImpl::CreateEtcCallback(shared_from_this(), managerImpl_.get(), result_ticket);
server->WhereInAsync(ticket_, filter_table.ticket_, std::move(columns), std::move(cb), result_ticket);
return TableHandleImpl::Create(managerImpl_, std::move(result_ticket), std::move(ls));
}

namespace {
class CStyleTickingCallback final : public TickingCallback {
public:
Expand Down
Loading

0 comments on commit 84199c7

Please sign in to comment.