Skip to content

Commit

Permalink
Wrap the new PartitionedTable interface (deephaven#2446)
Browse files Browse the repository at this point in the history
* Wrap the new PartitionedTable interface
  • Loading branch information
jmao-denver committed Jun 1, 2022
1 parent 656e01a commit b778e11
Show file tree
Hide file tree
Showing 6 changed files with 311 additions and 59 deletions.
8 changes: 4 additions & 4 deletions py/server/deephaven/plot/selectable_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from deephaven import DHError
from deephaven._wrapper import JObjectWrapper
from deephaven.table import Table
from deephaven.table import Table, PartitionedTable

_JSelectableDataSet = jpy.get_type("io.deephaven.plot.filters.SelectableDataSet")
_JTableDefinition = jpy.get_type("io.deephaven.engine.table.TableDefinition")
Expand Down Expand Up @@ -54,11 +54,11 @@ def one_click(t: Table, by: List[str] = None, require_all_filters: bool = False)
raise DHError(e, "failed in one_click.") from e


def one_click_partitioned_table(pt: jpy.JType, require_all_filters: bool = False) -> SelectableDataSet:
def one_click_partitioned_table(pt: PartitionedTable, require_all_filters: bool = False) -> SelectableDataSet:
""" Creates a SelectableDataSet with the specified columns from the table map.
Args:
pt (jpy.JType): the source partitioned table
pt (PartitionedTable): the source partitioned table
require_all_filters (bool): false to display data when not all oneclicks are selected; true to only
display data when appropriate oneclicks are selected
Expand All @@ -69,6 +69,6 @@ def one_click_partitioned_table(pt: jpy.JType, require_all_filters: bool = False
DHError
"""
try:
return SelectableDataSet(j_sds=_JSelectables.oneClick(pt, require_all_filters))
return SelectableDataSet(j_sds=_JSelectables.oneClick(pt.j_partitioned_table, require_all_filters))
except Exception as e:
raise DHError(e, "failed in one_click.") from e
264 changes: 214 additions & 50 deletions py/server/deephaven/table.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#
# Copyright (c) 2016-2021 Deephaven Data Labs and Patent Pending
#
""" This module implements the Table class and functions that work with Tables. """
""" This module implements the Table and PartitionedTable classes which are the main instruments for working with
Deephaven refreshing and static data."""

from __future__ import annotations

from enum import Enum, auto
from typing import Union, Sequence, List
from typing import Union, Sequence, List, Any, Optional

import jpy

Expand All @@ -25,6 +27,7 @@
_JPair = jpy.get_type("io.deephaven.api.agg.Pair")
_JMatchPair = jpy.get_type("io.deephaven.engine.table.MatchPair")
_JLayoutHintBuilder = jpy.get_type("io.deephaven.engine.util.LayoutHintBuilder")
_JPartitionedTable = jpy.get_type("io.deephaven.engine.table.PartitionedTable")


class SortDirection(Enum):
Expand All @@ -44,6 +47,26 @@ class AsOfMatchRule(Enum):
GREATER_THAN = _JAsOfMatchRule.GREATER_THAN


def _sort_column(col, dir_):
return (_JSortColumn.desc(_JColumnName.of(col)) if dir_ == SortDirection.DESCENDING else _JSortColumn.asc(
_JColumnName.of(col)))


def _td_to_columns(table_definition):
cols = []
j_cols = table_definition.getColumnList().toArray()
for j_col in j_cols:
cols.append(
Column(
name=j_col.getName(),
data_type=dtypes.from_jtype(j_col.getDataType()),
component_type=dtypes.from_jtype(j_col.getComponentType()),
column_type=ColumnType(j_col.getColumnType()),
)
)
return cols


class Table(JObjectWrapper):
"""A Table represents a Deephaven table. It allows applications to perform powerful Deephaven table operations.
Expand Down Expand Up @@ -91,18 +114,7 @@ def columns(self) -> List[Column]:
if self._schema:
return self._schema

self._schema = []
j_col_list = self._definition.getColumnList()
for i in range(j_col_list.size()):
j_col = j_col_list.get(i)
self._schema.append(
Column(
name=j_col.getName(),
data_type=dtypes.from_jtype(j_col.getDataType()),
component_type=dtypes.from_jtype(j_col.getComponentType()),
column_type=ColumnType(j_col.getColumnType()),
)
)
self._schema = _td_to_columns(self._definition)
return self._schema

@property
Expand Down Expand Up @@ -615,8 +627,8 @@ def sort(self, order_by: Union[str, Sequence[str]],
Args:
order_by (Union[str, Sequence[str]]): the column(s) to be sorted on
order (Union[SortDirection, Sequence[SortDirection], optional): the corresponding sort directions for each sort
column, default is None. In the absence of explicit sort directions, data will be sorted in the ascending order.
order (Union[SortDirection, Sequence[SortDirection], optional): the corresponding sort directions for
each sort column, default is None, meaning ascending order for all the sort columns.
Returns:
a new table
Expand All @@ -625,24 +637,17 @@ def sort(self, order_by: Union[str, Sequence[str]],
DHError
"""

def sort_column(col, dir_):
return (
_JSortColumn.desc(_JColumnName.of(col))
if dir_ == SortDirection.DESCENDING
else _JSortColumn.asc(_JColumnName.of(col))
)

try:
order_by = to_sequence(order_by)
if not order:
order = (SortDirection.ASCENDING,) * len(order_by)
order = to_sequence(order)
if order:
sort_columns = [
sort_column(col, dir_) for col, dir_ in zip(order_by, order)
]
j_sc_list = j_array_list(sort_columns)
return Table(j_table=self.j_table.sort(j_sc_list))
else:
return Table(j_table=self.j_table.sort(*order_by))
if len(order_by) != len(order):
raise DHError(message="The number of sort columns must be the same as the number of sort directions.")

sort_columns = [_sort_column(col, dir_) for col, dir_ in zip(order_by, order)]
j_sc_list = j_array_list(sort_columns)
return Table(j_table=self.j_table.sort(j_sc_list))
except Exception as e:
raise DHError(e, "table sort operation failed.") from e

Expand Down Expand Up @@ -723,7 +728,8 @@ def exact_join(self, table: Table, on: Union[str, Sequence[str]], joins: Union[s
except Exception as e:
raise DHError(e, "table exact_join operation failed.") from e

def join(self, table: Table, on: Union[str, Sequence[str]] = None, joins: Union[str, Sequence[str]] = None) -> Table:
def join(self, table: Table, on: Union[str, Sequence[str]] = None,
joins: Union[str, Sequence[str]] = None) -> Table:
"""The join method creates a new table containing rows that have matching values in both tables. Rows that
do not have matching criteria will not be included in the result. If there are multiple matches between a row
from the left table and rows from the right table, all matching combinations will be included. If no columns
Expand Down Expand Up @@ -1171,24 +1177,6 @@ def agg_all_by(self, agg: Aggregation, by: Union[str, Sequence[str]] = None) ->

# endregion

def partition_by(self, by: Union[str, Sequence[str]]) -> jpy.JType:
""" Creates a TableMap (opaque) by dividing this table into sub-tables.
Args:
by (Union[str, Sequence[str]]): the column(s) by which to group data
Returns:
A TableMap containing a sub-table for each group
Raises:
DHError
"""
try:
by = to_sequence(by)
return self.j_table.partitionBy(*by)
except Exception as e:
raise DHError(e, "failed to create a TableMap.") from e

def format_columns(self, formulas: Union[str, List[str]]) -> Table:
""" Applies color formatting to the columns of the table.
Expand Down Expand Up @@ -1285,3 +1273,179 @@ def layout_hints(self, front: Union[str, List[str]] = None, back: Union[str, Lis
return Table(j_table=self.j_table.setLayoutHints(_j_layout_hint_builder.build()))
except Exception as e:
raise DHError(e, "failed to set layout hints on table") from e

def partition_by(self, by: Union[str, Sequence[str]], drop_keys: bool = False) -> PartitionedTable:
""" Creates a PartitionedTable from this table, partitioned according to the specified key columns.
Args:
by (Union[str, Sequence[str]]): the column(s) by which to group data
drop_keys (bool): whether to drop key columns in the constituent tables, default is False
Returns:
A PartitionedTable containing a sub-table for each group
Raises:
DHError
"""
try:
by = to_sequence(by)
return PartitionedTable(j_partitioned_table=self.j_table.partitionBy(drop_keys, *by))
except Exception as e:
raise DHError(e, "failed to create a partitioned table.") from e


class PartitionedTable(JObjectWrapper):
"""A partitioned table is a table with one column containing like-defined constituent tables, optionally with
key columns defined to allow binary operation based transformation or joins with other like-keyed partitioned
tables. """

j_object_type = _JPartitionedTable

@property
def j_object(self) -> jpy.JType:
self.j_partitioned_table

def __init__(self, j_partitioned_table):
self.j_partitioned_table = j_partitioned_table
self._schema = None
self._table = None
self._key_columns = None
self._unique_keys = None
self._constituent_column = None
self._constituent_changes_permitted = None

@property
def table(self) -> Table:
"""The underlying Table."""
if self._table is None:
self._table = Table(j_table=self.j_partitioned_table.table())
return self._table

@property
def key_columns(self) -> List[str]:
"""The partition key column names."""
if self._key_columns is None:
self._key_columns = list(self.j_partitioned_table.keyColumnNames().toArray())
return self._key_columns

@property
def unique_keys(self) -> bool:
"""Whether the keys in the underlying table are unique. If keys are unique, one can expect that
select_distinct(key_column_names) and view(key_column_names) operations produce equivalent tables."""
if self._unique_keys is None:
self._unique_keys = self.j_partitioned_table.uniqueKeys()
return self._unique_keys

@property
def constituent_column(self) -> str:
"""The constituent column name."""
if self._constituent_column is None:
self._constituent_column = self.j_partitioned_table.constituentColumnName()
return self._constituent_column

@property
def constituent_table_columns(self) -> List[Column]:
"""The column definitions shared by the constituent tables."""
if not self._schema:
self._schema = _td_to_columns(self.j_partitioned_table.constituentDefinition())

return self._schema

@property
def constituent_changes_permitted(self) -> bool:
"""Whether the constituents of the underlying partitioned table can change, specifically whether the values of
the constituent column can change.
Note, this is unrelated to whether the constituent tables are refreshing, or whether the underlying partitioned
table is refreshing. Also note that the underlying partitioned table must be refreshing if it contains
any refreshing constituents.
"""
if self._constituent_changes_permitted is None:
self._constituent_changes_permitted = self.j_partitioned_table.constituentChangesPermitted()
return self._constituent_changes_permitted

def merge(self) -> Table:
"""Makes a new Table that contains all the rows from all the constituent tables.
Returns:
a Table
Raises:
DHError
"""
try:
return Table(j_table=self.j_partitioned_table.merge())
except Exception as e:
raise DHError(e, "failed to merge all the constituent tables.")

def filter(self, filters: Union[str, Filter, Sequence[str], Sequence[Filter]]) -> PartitionedTable:
"""Makes a new PartitionedTable from the result of applying filters to the underlying partitioned table.
Args:
filters (Union[str, Filter, Sequence[str], Sequence[Filter]]): the filter condition expression(s) or
Filter object(s)
Returns:
a PartitionedTable
Raises:
DHError
"""
filters = to_sequence(filters)
if isinstance(filters[0], str):
filters = Filter.from_(filters)
filters = to_sequence(filters)
try:
return PartitionedTable(j_partitioned_table=self.j_partitioned_table.filter(j_array_list(filters)))
except Exception as e:
raise DHError(e, "failed to apply filters to the partitioned table.") from e

def sort(self, order_by: Union[str, Sequence[str]],
order: Union[SortDirection, Sequence[SortDirection]] = None) -> PartitionedTable:
"""Makes a new PartitionedTable from sorting the underlying partitioned table.
Args:
order_by (Union[str, Sequence[str]]): the column(s) to be sorted on, can't include the constituent column
order (Union[SortDirection, Sequence[SortDirection], optional): the corresponding sort directions for
each sort column, default is None, meaning ascending order for all the sort columns.
Returns:
a new PartitionedTable
Raises:
DHError
"""

try:
order_by = to_sequence(order_by)
if not order:
order = (SortDirection.ASCENDING,) * len(order_by)
order = to_sequence(order)
if len(order_by) != len(order):
raise DHError(message="The number of sort columns must be the same as the number of sort directions.")

sort_columns = [_sort_column(col, dir_) for col, dir_ in zip(order_by, order)]
j_sc_list = j_array_list(sort_columns)
return PartitionedTable(j_partitioned_table=self.j_partitioned_table.sort(j_sc_list))
except Exception as e:
raise DHError(e, "failed to sort the partitioned table.") from e

def get_constituent(self, key_values: Sequence[Any]) -> Optional[Table]:
"""Gets a single constituent table by its corresponding key column values.
Args:
key_values (Sequence[Any]): the values of the key columns
Returns:
a Table or None
"""
j_table = self.j_partitioned_table.constituentFor(*key_values)
if j_table:
return Table(j_table=j_table)
else:
return None

@property
def constituent_tables(self) -> List[Table]:
"""Returns all the current constituent tables."""
return list(map(Table, self.j_partitioned_table.constituents()))
Loading

0 comments on commit b778e11

Please sign in to comment.