Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add conv_null option in to_pandas() to convert DH nulls to pd.NA #4237

Merged
merged 4 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions py/server/deephaven/numpy.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
_JPrimitiveArrayConversionUtility = jpy.get_type("io.deephaven.integrations.common.PrimitiveArrayConversionUtility")
_JDataAccessHelpers = jpy.get_type("io.deephaven.engine.table.impl.DataAccessHelpers")


def _to_column_name(name: str) -> str:
""" Transforms the given name string into a valid table column name. """
tmp_name = re.sub(r"\W+", " ", str(name)).strip()
Expand Down
78 changes: 59 additions & 19 deletions py/server/deephaven/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,45 @@
#

""" This module supports the conversion between Deephaven tables and pandas DataFrames. """
from typing import List
from typing import List, Dict, Tuple

import jpy
import numpy as np
import pandas as pd
import pyarrow as pa

from deephaven import DHError, new_table, dtypes, arrow
from deephaven.arrow import SUPPORTED_ARROW_TYPES
from deephaven.column import Column
from deephaven.constants import NULL_BYTE, NULL_SHORT, NULL_CHAR, NULL_INT, NULL_LONG, NULL_FLOAT, NULL_DOUBLE
from deephaven.constants import NULL_BYTE, NULL_SHORT, NULL_INT, NULL_LONG, NULL_FLOAT, NULL_DOUBLE, NULL_CHAR
from deephaven.dtypes import DType
from deephaven.numpy import column_to_numpy_array, _make_input_column
from deephaven.table import Table

_NULL_BOOLEAN_AS_BYTE = jpy.get_type("io.deephaven.util.BooleanUtils").NULL_BOOLEAN_AS_BYTE
_JPrimitiveArrayConversionUtility = jpy.get_type("io.deephaven.integrations.common.PrimitiveArrayConversionUtility")
_JDataAccessHelpers = jpy.get_type("io.deephaven.engine.table.impl.DataAccessHelpers")
_is_dtype_backend_supported = pd.__version__ >= "2.0.0"

_DTYPE_NULL_MAPPING: Dict[DType, Tuple] = {
dtypes.bool_: (_NULL_BOOLEAN_AS_BYTE, pd.BooleanDtype),
dtypes.byte: (NULL_BYTE, pd.Int8Dtype),
dtypes.short: (NULL_SHORT, pd.Int16Dtype),
dtypes.char: (NULL_CHAR, pd.UInt16Dtype),
dtypes.int32: (NULL_INT, pd.Int32Dtype),
dtypes.int64: (NULL_LONG, pd.Int64Dtype),
dtypes.float32: (NULL_FLOAT, pd.Float32Dtype),
dtypes.float64: (NULL_DOUBLE, pd.Float64Dtype),
}


def _column_to_series(table: Table, col_def: Column) -> pd.Series:
def _column_to_series(table: Table, col_def: Column, conv_null: bool) -> pd.Series:
"""Produce a copy of the specified column as a pandas.Series object.

Args:
table (Table): the table
col_def (Column): the column definition
conv_null (bool): whether to check for Deephaven nulls in the data and automatically replace them with
pd.NA.

Returns:
a pandas Series
Expand All @@ -37,9 +51,22 @@ def _column_to_series(table: Table, col_def: Column) -> pd.Series:
"""
try:
data_col = _JDataAccessHelpers.getColumn(table.j_table, col_def.name)
np_array = column_to_numpy_array(col_def, data_col.getDirect())
if conv_null and col_def.data_type == dtypes.bool_:
j_array = _JPrimitiveArrayConversionUtility.translateArrayBooleanToByte(data_col.getDirect())
np_array = np.frombuffer(j_array, dtype=np.byte)
s = pd.Series(data=np_array, dtype=pd.Int8Dtype(), copy=False)
s.mask(s == _NULL_BOOLEAN_AS_BYTE, inplace=True)
return s.astype(pd.BooleanDtype(), copy=False)

return pd.Series(data=np_array, copy=False)
np_array = column_to_numpy_array(col_def, data_col.getDirect())
if conv_null and (null_pair := _DTYPE_NULL_MAPPING.get(col_def.data_type)) is not None:
nv = null_pair[0]
pd_ex_dtype = null_pair[1]
s = pd.Series(data=np_array, dtype=pd_ex_dtype(), copy=False)
s.mask(s == nv, inplace=True)
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
else:
s = pd.Series(data=np_array, copy=False)
return s
except DHError:
raise
except Exception as e:
Expand Down Expand Up @@ -85,7 +112,8 @@ def _column_to_series(table: Table, col_def: Column) -> pd.Series:
}


def to_pandas(table: Table, cols: List[str] = None, dtype_backend: str = None) -> pd.DataFrame:
def to_pandas(table: Table, cols: List[str] = None, dtype_backend: str = None, conv_null: bool = True) -> \
pd.DataFrame:
"""Produces a pandas DataFrame from a table.

Note that the **entire table** is going to be cloned into memory, so the total number of entries in the table
Expand All @@ -99,6 +127,8 @@ def to_pandas(table: Table, cols: List[str] = None, dtype_backend: str = None) -
nullable dtypes are used for all dtypes that have a nullable implementation when “numpy_nullable” is set,
pyarrow is used for all dtypes if “pyarrow” is set. default is None, meaning Numpy backed DataFrames with
no nullable dtypes.
conv_null (bool): When dtype_backend is not set, whether to check for Deephaven nulls in the data and
automatically replace them with pd.NA. default is True.

Returns:
a pandas DataFrame
Expand All @@ -111,9 +141,12 @@ def to_pandas(table: Table, cols: List[str] = None, dtype_backend: str = None) -
raise DHError(message=f"the dtype_backend ({dtype_backend}) option is only available for pandas 2.0.0 and "
f"above. {pd.__version__} is being used.")

type_mapper = _PYARROW_TO_PANDAS_TYPE_MAPPERS.get(dtype_backend)
if dtype_backend is not None and not conv_null:
raise DHError(message="conv_null can't be turned off when dtype_backend is either numpy_nullable or "
"pyarrow")

# if nullable dtypes (pandas or pyarrow) is requested
if type_mapper:
if type_mapper := _PYARROW_TO_PANDAS_TYPE_MAPPERS.get(dtype_backend):
pa_table = arrow.to_arrow(table=table, cols=cols)
df = pa_table.to_pandas(types_mapper=type_mapper)
del pa_table
Expand All @@ -134,7 +167,7 @@ def to_pandas(table: Table, cols: List[str] = None, dtype_backend: str = None) -

data = {}
for col in cols:
series = _column_to_series(table, col_def_dict[col])
series = _column_to_series(table, col_def_dict[col], conv_null)
data[col] = series

dtype_set = set([v.dtype for k, v in data.items()])
Expand All @@ -153,7 +186,7 @@ def to_pandas(table: Table, cols: List[str] = None, dtype_backend: str = None) -
_EX_DTYPE_NULL_MAP = {
# This reflects the fact that in the server we use NULL_BOOLEAN_AS_BYTE - the byte encoding of null boolean to
# translate boxed Boolean to/from primitive bytes
pd.BooleanDtype: NULL_BYTE,
pd.BooleanDtype: _NULL_BOOLEAN_AS_BYTE,
pd.Int8Dtype: NULL_BYTE,
pd.Int16Dtype: NULL_SHORT,
pd.UInt16Dtype: NULL_CHAR,
Expand All @@ -173,20 +206,27 @@ def to_pandas(table: Table, cols: List[str] = None, dtype_backend: str = None) -
}


def _map_na(np_array: np.ndarray):
def _map_na(array: [np.ndarray, pd.api.extensions.ExtensionArray]):
"""Replaces the pd.NA values in the array if it is of pandas ExtensionDtype(nullable)."""
pd_dtype = np_array.dtype
pd_dtype = array.dtype
if not isinstance(pd_dtype, pd.api.extensions.ExtensionDtype):
return np_array
return array

dh_null = _EX_DTYPE_NULL_MAP.get(type(pd_dtype)) or _EX_DTYPE_NULL_MAP.get(pd_dtype)
if isinstance(pd_dtype, pd.StringDtype) or isinstance(pd_dtype, pd.BooleanDtype) or pd_dtype == pd.ArrowDtype(
pa.bool_()):
np_array = np.array(list(map(lambda v: dh_null if v is pd.NA else v, np_array)))
# To preserve NaNs in floating point arrays, Pandas doesn't distinguish NaN/Null as far as NA testing is
# concerned, thus its fillna() method will replace both NaN/Null in the data.
if isinstance(pd_dtype, (pd.Float32Dtype, pd.Float64Dtype)) and isinstance(getattr(array, "_data"), np.ndarray):
np_array = array._data
null_mask = np.logical_and(array._mask, np.logical_not(np.isnan(np_array)))
np_array[null_mask] = dh_null
return np_array

if isinstance(pd_dtype, (pd.StringDtype, pd.BooleanDtype)) or pd_dtype == pd.ArrowDtype(pa.bool_()):
array = np.array(list(map(lambda v: dh_null if v is pd.NA else v, array)))
elif dh_null is not None:
np_array = np_array.fillna(dh_null)
array = array.fillna(dh_null)

return np_array
return array


def to_table(df: pd.DataFrame, cols: List[str] = None) -> Table:
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
2 changes: 1 addition & 1 deletion py/server/tests/test_learn_gather.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def base_test(self, source, model, np_dtype):
gatherer_colmajor = lambda rowset, colset: gather.table_to_numpy_2d(rowset, colset,
gather.MemoryLayout.COLUMN_MAJOR, np_dtype)

array_from_table = to_pandas(source).values
array_from_table = to_pandas(source, conv_null=False).values

gathered_rowmajor = gatherer_rowmajor(rows, cols)
gathered_colmajor = gatherer_colmajor(rows, cols)
Expand Down
58 changes: 42 additions & 16 deletions py/server/tests/test_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@
import pandas as pd
import pyarrow as pa

from deephaven import dtypes, new_table, DHError
from deephaven import dtypes, new_table, DHError, empty_table
from deephaven.column import byte_col, char_col, short_col, bool_col, int_col, long_col, float_col, double_col, \
string_col, datetime_col, pyobj_col, jobj_col
from deephaven.constants import NULL_LONG, NULL_SHORT, NULL_INT, NULL_BYTE, NULL_CHAR
from deephaven.constants import NULL_LONG, NULL_SHORT, NULL_INT, NULL_BYTE, NULL_CHAR, NULL_FLOAT, NULL_DOUBLE, \
NULL_BOOLEAN
from deephaven.jcompat import j_array_list
from deephaven.pandas import to_pandas, to_table
from deephaven.time import parse_instant, epoch_nanos_to_instant
from tests.testbase import BaseTestCase
from tests.testbase import BaseTestCase, table_equals


@dataclass
Expand Down Expand Up @@ -54,8 +55,8 @@ def tearDown(self) -> None:
self.test_table = None
super().tearDown()

def test_to_pandas(self):
df = to_pandas(self.test_table)
def test_to_pandas_no_conv_null(self):
df = to_pandas(self.test_table, conv_null=False)
self.assertEqual(len(df.columns), len(self.test_table.columns))
self.assertEqual(df.size, 2 * len(self.test_table.columns))
df_series = [df[col] for col in list(df.columns)]
Expand All @@ -71,7 +72,7 @@ def test_to_pandas_remaps(self):
prepared_table = self.test_table.update(
formulas=["Long = isNull(Long_) ? Double.NaN : Long_"])

df = to_pandas(prepared_table, cols=["Boolean", "Long"])
df = to_pandas(prepared_table, cols=["Boolean", "Long"], conv_null=False)
self.assertEqual(df['Long'].dtype, np.float64)
self.assertEqual(df['Boolean'].values.dtype, np.bool_)

Expand Down Expand Up @@ -115,7 +116,7 @@ def test_to_table(self):
double_col(name="Double", data=[1.01, -1.01]),
]
test_table = new_table(cols=input_cols)
df = to_pandas(test_table)
df = to_pandas(test_table, conv_null=False)
table_from_df = to_table(df)
self.assert_table_equals(table_from_df, test_table)

Expand All @@ -124,7 +125,7 @@ def test_to_table_boolean_with_none(self):
table_with_null_bool = new_table(cols=input_cols)
prepared_table = table_with_null_bool.update(
formulas=["Boolean = isNull(Boolean) ? (byte)NULL_BYTE : (Boolean == true ? 1: 0)"])
df = to_pandas(prepared_table)
df = to_pandas(prepared_table, conv_null=False)
table_from_df = to_table(df)
self.assert_table_equals(table_from_df, prepared_table)

Expand Down Expand Up @@ -212,7 +213,8 @@ def test_to_table_nullable(self):

self.assertEqual(table.size, 3)
table_string = table.to_string()
self.assertEqual(10, table_string.count("null"))
self.assertEqual(8, table_string.count("null"))
self.assertEqual(2, table_string.count("NaN"))

def test_arrow_backend(self):
with self.subTest("pyarrow-backend"):
Expand Down Expand Up @@ -296,19 +298,43 @@ def test_numpy_array(self):

def test_pandas_category_type(self):
df = pd.DataFrame({
'zipcode': {17384: 98125, 2680: 98107, 722: 98005, 18754: 98109, 14554: 98155},
'bathrooms': {17384: 1.5, 2680: 0.75, 722: 3.25, 18754: 1.0, 14554: 2.5},
'sqft_lot': {17384: 1650, 2680: 3700, 722: 51836, 18754: 2640, 14554: 9603},
'bedrooms': {17384: 2, 2680: 2, 722: 4, 18754: 2, 14554: 4},
'sqft_living': {17384: 1430, 2680: 1440, 722: 4670, 18754: 1130, 14554: 3180},
'floors': {17384: 3.0, 2680: 1.0, 722: 2.0, 18754: 1.0, 14554: 2.0}
})
'zipcode': {17384: 98125, 2680: 98107, 722: 98005, 18754: 98109, 14554: 98155},
'bathrooms': {17384: 1.5, 2680: 0.75, 722: 3.25, 18754: 1.0, 14554: 2.5},
'sqft_lot': {17384: 1650, 2680: 3700, 722: 51836, 18754: 2640, 14554: 9603},
'bedrooms': {17384: 2, 2680: 2, 722: 4, 18754: 2, 14554: 4},
'sqft_living': {17384: 1430, 2680: 1440, 722: 4670, 18754: 1130, 14554: 3180},
'floors': {17384: 3.0, 2680: 1.0, 722: 2.0, 18754: 1.0, 14554: 2.0}
})
df['zipcode'] = df.zipcode.astype('category')
df['bathrooms'] = df.bathrooms.astype('category')
t = to_table(df)
self.assertEqual(t.columns[0].data_type, dtypes.int64)
self.assertEqual(t.columns[1].data_type, dtypes.double)

def test_conv_null(self):
input_cols = [
bool_col(name="Boolean", data=(True, NULL_BOOLEAN)),
byte_col(name="Byte", data=(1, NULL_BYTE)),
char_col(name="Char", data=(1, NULL_CHAR)),
short_col(name="Short", data=[1, NULL_SHORT]),
int_col(name="Int_", data=[1, NULL_INT]),
long_col(name="Long_", data=[1, NULL_LONG]),
float_col(name="Float_", data=[np.nan, NULL_FLOAT]),
double_col(name="Double_", data=[np.nan, NULL_DOUBLE]),
datetime_col(name="Datetime", data=[epoch_nanos_to_instant(1), None]),
]
t = new_table(cols=input_cols)
df = to_pandas(t, conv_null=True)
dh_table = to_table(df)
self.assert_table_equals(t, dh_table)

dtype_backends = ["numpy_nullable", "pyarrow"]
for dbe in dtype_backends:
with self.subTest(dbe):
df = to_pandas(t, dtype_backend=dbe)
dh_table = to_table(df)
self.assert_table_equals(t, dh_table)


if __name__ == '__main__':
unittest.main()
12 changes: 6 additions & 6 deletions py/server/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,19 @@

import os
import shutil
import unittest
import tempfile
import unittest

import pandas
import pyarrow.parquet

from deephaven.pandas import to_pandas, to_table

from deephaven import empty_table, dtypes, new_table
from deephaven.column import InputColumn
from deephaven.pandas import to_pandas, to_table
from deephaven.parquet import write, batch_write, read, delete, ColumnInstruction

from tests.testbase import BaseTestCase


class ParquetTestCase(BaseTestCase):
""" Test cases for the deephaven.ParquetTools module (performed locally) """

Expand Down Expand Up @@ -282,7 +282,8 @@ def round_trip_with_compression(self, compression_codec_name, dh_table, vector_c
self.assert_table_equals(dh_table, result_table)

# Write the pandas dataframe back to parquet (via pyarraow) and read it back using deephaven.parquet to compare
dataframe.to_parquet('data_from_pandas.parquet', compression=None if compression_codec_name is 'UNCOMPRESSED' else compression_codec_name)
dataframe.to_parquet('data_from_pandas.parquet',
compression=None if compression_codec_name is 'UNCOMPRESSED' else compression_codec_name)
result_table = read('data_from_pandas.parquet')
self.assert_table_equals(dh_table, result_table)

Expand All @@ -294,6 +295,5 @@ def round_trip_with_compression(self, compression_codec_name, dh_table, vector_c
# self.assert_table_equals(dh_table, result_table)



if __name__ == '__main__':
unittest.main()
Loading