From b778e11bc76958bd8668d1de3387d25bb395aa75 Mon Sep 17 00:00:00 2001 From: Jianfeng Mao <4297243+jmao-denver@users.noreply.github.com> Date: Wed, 1 Jun 2022 12:30:23 -0600 Subject: [PATCH] Wrap the new PartitionedTable interface (#2446) * Wrap the new PartitionedTable interface --- .../deephaven/plot/selectable_dataset.py | 8 +- py/server/deephaven/table.py | 264 ++++++++++++++---- py/server/tests/test_partitioned_table.py | 81 ++++++ .../test_plot/test_selectable_dataset.py | 1 + py/server/tests/test_table.py | 10 +- py/server/tests/testbase.py | 6 +- 6 files changed, 311 insertions(+), 59 deletions(-) create mode 100644 py/server/tests/test_partitioned_table.py diff --git a/py/server/deephaven/plot/selectable_dataset.py b/py/server/deephaven/plot/selectable_dataset.py index d1c1e36d462..af1b7c6bf06 100644 --- a/py/server/deephaven/plot/selectable_dataset.py +++ b/py/server/deephaven/plot/selectable_dataset.py @@ -10,7 +10,7 @@ from deephaven import DHError from deephaven._wrapper import JObjectWrapper -from deephaven.table import Table +from deephaven.table import Table, PartitionedTable _JSelectableDataSet = jpy.get_type("io.deephaven.plot.filters.SelectableDataSet") _JTableDefinition = jpy.get_type("io.deephaven.engine.table.TableDefinition") @@ -54,11 +54,11 @@ def one_click(t: Table, by: List[str] = None, require_all_filters: bool = False) raise DHError(e, "failed in one_click.") from e -def one_click_partitioned_table(pt: jpy.JType, require_all_filters: bool = False) -> SelectableDataSet: +def one_click_partitioned_table(pt: PartitionedTable, require_all_filters: bool = False) -> SelectableDataSet: """ Creates a SelectableDataSet with the specified columns from the table map. Args: - pt (jpy.JType): the source partitioned table + pt (PartitionedTable): the source partitioned table require_all_filters (bool): false to display data when not all oneclicks are selected; true to only display data when appropriate oneclicks are selected @@ -69,6 +69,6 @@ def one_click_partitioned_table(pt: jpy.JType, require_all_filters: bool = False DHError """ try: - return SelectableDataSet(j_sds=_JSelectables.oneClick(pt, require_all_filters)) + return SelectableDataSet(j_sds=_JSelectables.oneClick(pt.j_partitioned_table, require_all_filters)) except Exception as e: raise DHError(e, "failed in one_click.") from e diff --git a/py/server/deephaven/table.py b/py/server/deephaven/table.py index 0538305c8b1..e3a2da77423 100644 --- a/py/server/deephaven/table.py +++ b/py/server/deephaven/table.py @@ -1,11 +1,13 @@ # # Copyright (c) 2016-2021 Deephaven Data Labs and Patent Pending # -""" This module implements the Table class and functions that work with Tables. """ +""" This module implements the Table and PartitionedTable classes which are the main instruments for working with +Deephaven refreshing and static data.""" + from __future__ import annotations from enum import Enum, auto -from typing import Union, Sequence, List +from typing import Union, Sequence, List, Any, Optional import jpy @@ -25,6 +27,7 @@ _JPair = jpy.get_type("io.deephaven.api.agg.Pair") _JMatchPair = jpy.get_type("io.deephaven.engine.table.MatchPair") _JLayoutHintBuilder = jpy.get_type("io.deephaven.engine.util.LayoutHintBuilder") +_JPartitionedTable = jpy.get_type("io.deephaven.engine.table.PartitionedTable") class SortDirection(Enum): @@ -44,6 +47,26 @@ class AsOfMatchRule(Enum): GREATER_THAN = _JAsOfMatchRule.GREATER_THAN +def _sort_column(col, dir_): + return (_JSortColumn.desc(_JColumnName.of(col)) if dir_ == SortDirection.DESCENDING else _JSortColumn.asc( + _JColumnName.of(col))) + + +def _td_to_columns(table_definition): + cols = [] + j_cols = table_definition.getColumnList().toArray() + for j_col in j_cols: + cols.append( + Column( + name=j_col.getName(), + data_type=dtypes.from_jtype(j_col.getDataType()), + component_type=dtypes.from_jtype(j_col.getComponentType()), + column_type=ColumnType(j_col.getColumnType()), + ) + ) + return cols + + class Table(JObjectWrapper): """A Table represents a Deephaven table. It allows applications to perform powerful Deephaven table operations. @@ -91,18 +114,7 @@ def columns(self) -> List[Column]: if self._schema: return self._schema - self._schema = [] - j_col_list = self._definition.getColumnList() - for i in range(j_col_list.size()): - j_col = j_col_list.get(i) - self._schema.append( - Column( - name=j_col.getName(), - data_type=dtypes.from_jtype(j_col.getDataType()), - component_type=dtypes.from_jtype(j_col.getComponentType()), - column_type=ColumnType(j_col.getColumnType()), - ) - ) + self._schema = _td_to_columns(self._definition) return self._schema @property @@ -615,8 +627,8 @@ def sort(self, order_by: Union[str, Sequence[str]], Args: order_by (Union[str, Sequence[str]]): the column(s) to be sorted on - order (Union[SortDirection, Sequence[SortDirection], optional): the corresponding sort directions for each sort - column, default is None. In the absence of explicit sort directions, data will be sorted in the ascending order. + order (Union[SortDirection, Sequence[SortDirection], optional): the corresponding sort directions for + each sort column, default is None, meaning ascending order for all the sort columns. Returns: a new table @@ -625,24 +637,17 @@ def sort(self, order_by: Union[str, Sequence[str]], DHError """ - def sort_column(col, dir_): - return ( - _JSortColumn.desc(_JColumnName.of(col)) - if dir_ == SortDirection.DESCENDING - else _JSortColumn.asc(_JColumnName.of(col)) - ) - try: order_by = to_sequence(order_by) + if not order: + order = (SortDirection.ASCENDING,) * len(order_by) order = to_sequence(order) - if order: - sort_columns = [ - sort_column(col, dir_) for col, dir_ in zip(order_by, order) - ] - j_sc_list = j_array_list(sort_columns) - return Table(j_table=self.j_table.sort(j_sc_list)) - else: - return Table(j_table=self.j_table.sort(*order_by)) + if len(order_by) != len(order): + raise DHError(message="The number of sort columns must be the same as the number of sort directions.") + + sort_columns = [_sort_column(col, dir_) for col, dir_ in zip(order_by, order)] + j_sc_list = j_array_list(sort_columns) + return Table(j_table=self.j_table.sort(j_sc_list)) except Exception as e: raise DHError(e, "table sort operation failed.") from e @@ -723,7 +728,8 @@ def exact_join(self, table: Table, on: Union[str, Sequence[str]], joins: Union[s except Exception as e: raise DHError(e, "table exact_join operation failed.") from e - def join(self, table: Table, on: Union[str, Sequence[str]] = None, joins: Union[str, Sequence[str]] = None) -> Table: + def join(self, table: Table, on: Union[str, Sequence[str]] = None, + joins: Union[str, Sequence[str]] = None) -> Table: """The join method creates a new table containing rows that have matching values in both tables. Rows that do not have matching criteria will not be included in the result. If there are multiple matches between a row from the left table and rows from the right table, all matching combinations will be included. If no columns @@ -1171,24 +1177,6 @@ def agg_all_by(self, agg: Aggregation, by: Union[str, Sequence[str]] = None) -> # endregion - def partition_by(self, by: Union[str, Sequence[str]]) -> jpy.JType: - """ Creates a TableMap (opaque) by dividing this table into sub-tables. - - Args: - by (Union[str, Sequence[str]]): the column(s) by which to group data - - Returns: - A TableMap containing a sub-table for each group - - Raises: - DHError - """ - try: - by = to_sequence(by) - return self.j_table.partitionBy(*by) - except Exception as e: - raise DHError(e, "failed to create a TableMap.") from e - def format_columns(self, formulas: Union[str, List[str]]) -> Table: """ Applies color formatting to the columns of the table. @@ -1285,3 +1273,179 @@ def layout_hints(self, front: Union[str, List[str]] = None, back: Union[str, Lis return Table(j_table=self.j_table.setLayoutHints(_j_layout_hint_builder.build())) except Exception as e: raise DHError(e, "failed to set layout hints on table") from e + + def partition_by(self, by: Union[str, Sequence[str]], drop_keys: bool = False) -> PartitionedTable: + """ Creates a PartitionedTable from this table, partitioned according to the specified key columns. + + Args: + by (Union[str, Sequence[str]]): the column(s) by which to group data + drop_keys (bool): whether to drop key columns in the constituent tables, default is False + + Returns: + A PartitionedTable containing a sub-table for each group + + Raises: + DHError + """ + try: + by = to_sequence(by) + return PartitionedTable(j_partitioned_table=self.j_table.partitionBy(drop_keys, *by)) + except Exception as e: + raise DHError(e, "failed to create a partitioned table.") from e + + +class PartitionedTable(JObjectWrapper): + """A partitioned table is a table with one column containing like-defined constituent tables, optionally with + key columns defined to allow binary operation based transformation or joins with other like-keyed partitioned + tables. """ + + j_object_type = _JPartitionedTable + + @property + def j_object(self) -> jpy.JType: + self.j_partitioned_table + + def __init__(self, j_partitioned_table): + self.j_partitioned_table = j_partitioned_table + self._schema = None + self._table = None + self._key_columns = None + self._unique_keys = None + self._constituent_column = None + self._constituent_changes_permitted = None + + @property + def table(self) -> Table: + """The underlying Table.""" + if self._table is None: + self._table = Table(j_table=self.j_partitioned_table.table()) + return self._table + + @property + def key_columns(self) -> List[str]: + """The partition key column names.""" + if self._key_columns is None: + self._key_columns = list(self.j_partitioned_table.keyColumnNames().toArray()) + return self._key_columns + + @property + def unique_keys(self) -> bool: + """Whether the keys in the underlying table are unique. If keys are unique, one can expect that + select_distinct(key_column_names) and view(key_column_names) operations produce equivalent tables.""" + if self._unique_keys is None: + self._unique_keys = self.j_partitioned_table.uniqueKeys() + return self._unique_keys + + @property + def constituent_column(self) -> str: + """The constituent column name.""" + if self._constituent_column is None: + self._constituent_column = self.j_partitioned_table.constituentColumnName() + return self._constituent_column + + @property + def constituent_table_columns(self) -> List[Column]: + """The column definitions shared by the constituent tables.""" + if not self._schema: + self._schema = _td_to_columns(self.j_partitioned_table.constituentDefinition()) + + return self._schema + + @property + def constituent_changes_permitted(self) -> bool: + """Whether the constituents of the underlying partitioned table can change, specifically whether the values of + the constituent column can change. + + Note, this is unrelated to whether the constituent tables are refreshing, or whether the underlying partitioned + table is refreshing. Also note that the underlying partitioned table must be refreshing if it contains + any refreshing constituents. + """ + if self._constituent_changes_permitted is None: + self._constituent_changes_permitted = self.j_partitioned_table.constituentChangesPermitted() + return self._constituent_changes_permitted + + def merge(self) -> Table: + """Makes a new Table that contains all the rows from all the constituent tables. + + Returns: + a Table + + Raises: + DHError + """ + try: + return Table(j_table=self.j_partitioned_table.merge()) + except Exception as e: + raise DHError(e, "failed to merge all the constituent tables.") + + def filter(self, filters: Union[str, Filter, Sequence[str], Sequence[Filter]]) -> PartitionedTable: + """Makes a new PartitionedTable from the result of applying filters to the underlying partitioned table. + + Args: + filters (Union[str, Filter, Sequence[str], Sequence[Filter]]): the filter condition expression(s) or + Filter object(s) + + Returns: + a PartitionedTable + + Raises: + DHError + """ + filters = to_sequence(filters) + if isinstance(filters[0], str): + filters = Filter.from_(filters) + filters = to_sequence(filters) + try: + return PartitionedTable(j_partitioned_table=self.j_partitioned_table.filter(j_array_list(filters))) + except Exception as e: + raise DHError(e, "failed to apply filters to the partitioned table.") from e + + def sort(self, order_by: Union[str, Sequence[str]], + order: Union[SortDirection, Sequence[SortDirection]] = None) -> PartitionedTable: + """Makes a new PartitionedTable from sorting the underlying partitioned table. + + Args: + order_by (Union[str, Sequence[str]]): the column(s) to be sorted on, can't include the constituent column + order (Union[SortDirection, Sequence[SortDirection], optional): the corresponding sort directions for + each sort column, default is None, meaning ascending order for all the sort columns. + + Returns: + a new PartitionedTable + + Raises: + DHError + """ + + try: + order_by = to_sequence(order_by) + if not order: + order = (SortDirection.ASCENDING,) * len(order_by) + order = to_sequence(order) + if len(order_by) != len(order): + raise DHError(message="The number of sort columns must be the same as the number of sort directions.") + + sort_columns = [_sort_column(col, dir_) for col, dir_ in zip(order_by, order)] + j_sc_list = j_array_list(sort_columns) + return PartitionedTable(j_partitioned_table=self.j_partitioned_table.sort(j_sc_list)) + except Exception as e: + raise DHError(e, "failed to sort the partitioned table.") from e + + def get_constituent(self, key_values: Sequence[Any]) -> Optional[Table]: + """Gets a single constituent table by its corresponding key column values. + + Args: + key_values (Sequence[Any]): the values of the key columns + + Returns: + a Table or None + """ + j_table = self.j_partitioned_table.constituentFor(*key_values) + if j_table: + return Table(j_table=j_table) + else: + return None + + @property + def constituent_tables(self) -> List[Table]: + """Returns all the current constituent tables.""" + return list(map(Table, self.j_partitioned_table.constituents())) diff --git a/py/server/tests/test_partitioned_table.py b/py/server/tests/test_partitioned_table.py new file mode 100644 index 00000000000..2c65afad56d --- /dev/null +++ b/py/server/tests/test_partitioned_table.py @@ -0,0 +1,81 @@ +# +# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending +# +import unittest + +from deephaven.filters import Filter + +from deephaven import read_csv, DHError +from tests.testbase import BaseTestCase + + +class PartitionedTableTestCase(BaseTestCase): + def setUp(self): + self.test_table = read_csv("tests/data/test_table.csv") + self.partitioned_table = self.test_table.partition_by(by=["c", "e"]) + + def tearDown(self): + self.partitioned_table = None + self.test_table = None + + def test_table(self): + self.assertIsNotNone(self.partitioned_table.table) + + def test_key_columns(self): + self.assertEqual(self.partitioned_table.key_columns, ["c", "e"]) + + def test_constituent_column(self): + self.assertEqual(self.partitioned_table.constituent_column, "__CONSTITUENT__") + + def test_unique_keys(self): + self.assertTrue(self.partitioned_table.unique_keys) + + def test_constituent_change_permitted(self): + self.assertFalse(self.partitioned_table.constituent_changes_permitted) + + def test_constituent_table_columns(self): + self.assertEqual(self.test_table.columns, self.partitioned_table.constituent_table_columns) + + def test_merge(self): + t = self.partitioned_table.merge() + self.assert_table_equals(t, self.test_table) + + def test_filter(self): + conditions = ["c < 0", "e > 0"] + filters = Filter.from_(conditions) + pt = self.partitioned_table.filter(filters) + self.assertIsNotNone(pt) + + filters = ["c < 0", "e > 0"] + pt = self.partitioned_table.filter(filters) + self.assertIsNotNone(pt) + + with self.assertRaises(DHError) as cm: + conditions = ["a > 100", "b < 1000"] + filters = Filter.from_(conditions) + pt = self.partitioned_table.filter(filters) + self.assertIn("RuntimeError", str(cm.exception)) + + def test_sort(self): + new_pt = self.partitioned_table.sort(order_by=["c"]) + self.assertIsNotNone(new_pt) + + with self.assertRaises(DHError) as cm: + new_pt = self.partitioned_table.sort(order_by=["a", "b"]) + self.assertIn("NoSuchColumnException", str(cm.exception)) + + with self.assertRaises(DHError) as cm: + new_pt = self.partitioned_table.sort(order_by=self.partitioned_table.constituent_column) + self.assertIn("Unsupported sort on constituent column", str(cm.exception)) + + def test_get_constituent(self): + keys = [917, 167] + self.assertIsNotNone(self.partitioned_table.get_constituent(keys)) + + def test_constituents(self): + constituent_tables = self.partitioned_table.constituent_tables + self.assertGreater(len(constituent_tables), 0) + + +if __name__ == '__main__': + unittest.main() diff --git a/py/server/tests/test_plot/test_selectable_dataset.py b/py/server/tests/test_plot/test_selectable_dataset.py index d39b0b1a838..3cc8f4a6d36 100644 --- a/py/server/tests/test_plot/test_selectable_dataset.py +++ b/py/server/tests/test_plot/test_selectable_dataset.py @@ -34,5 +34,6 @@ def test_one_click_tm(self): sds = one_click_partitioned_table(pt, require_all_filters=True) self.assertIsNotNone(sds) + if __name__ == '__main__': unittest.main() diff --git a/py/server/tests/test_table.py b/py/server/tests/test_table.py index 0cbaf18639e..543fe071918 100644 --- a/py/server/tests/test_table.py +++ b/py/server/tests/test_table.py @@ -20,7 +20,7 @@ def tearDown(self) -> None: def test_repr(self): regex = r"deephaven\.table\.Table\(io\.deephaven\.engine\.table\.Table\(objectRef=0x.+\{.+\}\)\)" for i in range(0, 8): - t = empty_table(10**i).update("a=i") + t = empty_table(10 ** i).update("a=i") result = repr(t) self.assertRegex(result, regex) self.assertLessEqual(len(result), 120) @@ -186,11 +186,15 @@ def test_head_tail_pct(self): # Table operation category: Sort # def test_sort(self): - sorted_table = self.test_table.sort(order_by=["a", "b"], order=[SortDirection.DESCENDING]) + sorted_table = self.test_table.sort(order_by=["a", "b"], + order=[SortDirection.DESCENDING, SortDirection.ASCENDING]) + self.assertEqual(sorted_table.size, self.test_table.size) + with self.assertRaises(DHError) as cm: + sorted_table = self.test_table.sort(order_by=["a", "b"], order=[SortDirection.DESCENDING]) self.assertEqual(sorted_table.size, self.test_table.size) sorted_table = self.test_table.sort(order_by="a", order=SortDirection.DESCENDING) self.assertEqual(sorted_table.size, self.test_table.size) - sorted_table = self.test_table.sort(order_by=[], order=SortDirection.DESCENDING) + sorted_table = self.test_table.sort(order_by=[], order=[]) self.assertEqual(sorted_table, self.test_table) def test_restrict_sort_to(self): diff --git a/py/server/tests/testbase.py b/py/server/tests/testbase.py index db1f3cb3cd7..f8e8b39aabb 100644 --- a/py/server/tests/testbase.py +++ b/py/server/tests/testbase.py @@ -6,12 +6,14 @@ import unittest import jpy +from deephaven import DHError from deephaven._ugp import ugp_exclusive_lock from deephaven.table import Table _JTableTools = jpy.get_type("io.deephaven.engine.util.TableTools") + def table_equals(table_a: Table, table_b: Table) -> bool: try: return False if _JTableTools.diff(table_a.j_table, table_b.j_table, 1) else True @@ -43,10 +45,10 @@ def wait_ticking_table_update(self, table: Table, row_count: int, timeout: int): timeout (int): the number of seconds to wait """ with ugp_exclusive_lock(): - timeout *= 10**9 + timeout *= 10 ** 9 while table.size < row_count and timeout > 0: s_time = time.time_ns() - table.j_table.awaitUpdate(timeout // 10**6) + table.j_table.awaitUpdate(timeout // 10 ** 6) timeout -= time.time_ns() - s_time self.assertGreaterEqual(table.size, row_count)