From 5eee07a822415995912ecacd7461ac3eb74182ac Mon Sep 17 00:00:00 2001 From: Tess Hayes Date: Wed, 11 Sep 2024 13:00:41 -0400 Subject: [PATCH 1/2] Fixes #3762: Fix dataframe groupby aggregations when keys contain `NaN`s This PR (fixes #3762) using dataframe groupby with keys that contain `NaN`s would cause the aggregations to fail. To resolve this, we mask out the values that belong to the `NaN` segment --- arkouda/dataframe.py | 48 +++++++++++++++++++++++++++++++++++------ tests/dataframe_test.py | 31 ++++++++++++++++++++++++-- 2 files changed, 71 insertions(+), 8 deletions(-) diff --git a/arkouda/dataframe.py b/arkouda/dataframe.py index bdeed1b7a1..e1f25d1d19 100644 --- a/arkouda/dataframe.py +++ b/arkouda/dataframe.py @@ -4,6 +4,7 @@ import os import random from collections import UserDict +from functools import reduce from typing import Callable, Dict, List, Optional, Tuple, Union, cast from warnings import warn @@ -24,10 +25,11 @@ from arkouda.join import inner_join from arkouda.numpy import cast as akcast from arkouda.numpy import cumsum, where -from arkouda.numpy.dtypes import bigint +from arkouda.numpy.dtypes import _is_dtype_in_union, bigint from arkouda.numpy.dtypes import bool_ as akbool from arkouda.numpy.dtypes import float64 as akfloat64 from arkouda.numpy.dtypes import int64 as akint64 +from arkouda.numpy.dtypes import numeric_scalars from arkouda.numpy.dtypes import uint64 as akuint64 from arkouda.pdarrayclass import RegistrationError, pdarray from arkouda.pdarraycreation import arange, array, create_pdarray, full, zeros @@ -104,7 +106,8 @@ class DataFrameGroupBy: If True the grouped values of the aggregation keys will be treated as an index. """ - def __init__(self, gb, df, gb_key_names=None, as_index=True): + def __init__(self, gb, df, gb_key_names=None, as_index=True, dropna=True): + self.gb = gb self.df = df self.gb_key_names = gb_key_names @@ -112,6 +115,39 @@ def __init__(self, gb, df, gb_key_names=None, as_index=True): for attr in ["nkeys", "permutation", "unique_keys", "segments"]: setattr(self, attr, getattr(gb, attr)) + self.dropna = dropna + self.where_not_nan = None + self.all_non_nan = False + + if dropna: + from arkouda import all as ak_all + from arkouda import isnan + + # calculate ~isnan on each key then & them all together + # keep up with if they're all_non_nan, so we can skip indexing later + key_cols = ( + [df[k] for k in gb_key_names] if isinstance(gb_key_names, List) else [df[gb_key_names]] + ) + where_key_not_nan = [ + ~isnan(col) + for col in key_cols + if isinstance(col, pdarray) and _is_dtype_in_union(col.dtype, numeric_scalars) + ] + + if len(where_key_not_nan) == 0: + # if empty then none of the keys are pdarray, so non are nan + self.all_non_nan = True + else: + self.where_not_nan = reduce(lambda x, y: x & y, where_key_not_nan) + self.all_non_nan = ak_all(self.where_not_nan) + + def _get_df_col(self, c): + # helper function to mask out the values where the keys are nan when dropna is True + if not self.dropna or self.all_non_nan: + return self.df.data[c] + else: + return self.df.data[c][self.where_not_nan] + @classmethod def _make_aggop(cls, opname): numerical_dtypes = [akfloat64, akint64, akuint64] @@ -148,18 +184,18 @@ def aggop(self, colnames=None): if isinstance(colnames, List): if isinstance(self.gb_key_names, str): return DataFrame( - {c: self.gb.aggregate(self.df.data[c], opname)[1] for c in colnames}, + {c: self.gb.aggregate(self._get_df_col(c), opname)[1] for c in colnames}, index=Index(self.gb.unique_keys, name=self.gb_key_names), ) elif isinstance(self.gb_key_names, list) and len(self.gb_key_names) == 1: return DataFrame( - {c: self.gb.aggregate(self.df.data[c], opname)[1] for c in colnames}, + {c: self.gb.aggregate(self._get_df_col(c), opname)[1] for c in colnames}, index=Index(self.gb.unique_keys, name=self.gb_key_names[0]), ) elif isinstance(self.gb_key_names, list): column_dict = dict(zip(self.gb_key_names, self.unique_keys)) for c in colnames: - column_dict[c] = self.gb.aggregate(self.df.data[c], opname)[1] + column_dict[c] = self.gb.aggregate(self._get_df_col(c), opname)[1] return DataFrame(column_dict) else: return None @@ -2678,7 +2714,7 @@ def GroupBy(self, keys, use_series=False, as_index=True, dropna=True): gb = akGroupBy(cols, dropna=dropna) if use_series: - gb = DataFrameGroupBy(gb, self, gb_key_names=keys, as_index=as_index) + gb = DataFrameGroupBy(gb, self, gb_key_names=keys, as_index=as_index, dropna=dropna) return gb def memory_usage(self, index=True, unit="B") -> Series: diff --git a/tests/dataframe_test.py b/tests/dataframe_test.py index 693d91f012..b536f73b0a 100644 --- a/tests/dataframe_test.py +++ b/tests/dataframe_test.py @@ -1,7 +1,5 @@ -import glob import itertools import os -import tempfile import numpy as np import pandas as pd @@ -664,6 +662,35 @@ def test_gb_aggregations_with_nans(self, agg): pd_result = getattr(pd_df.groupby(group_on, as_index=False), agg)() assert_frame_equal(ak_result.to_pandas(retain_index=True), pd_result) + # TODO aggregations of string columns not currently supported (even for count) + df.drop("key1", axis=1, inplace=True) + df.drop("key2", axis=1, inplace=True) + pd_df = df.to_pandas() + + group_on = ["nums1", "nums2"] + ak_result = getattr(df.groupby(group_on), agg)() + pd_result = getattr(pd_df.groupby(group_on, as_index=False), agg)() + assert_frame_equal(ak_result.to_pandas(retain_index=True), pd_result) + + # TODO aggregation mishandling NaN see issue #3765 + df.drop("nums2", axis=1, inplace=True) + pd_df = df.to_pandas() + group_on = "nums1" + ak_result = getattr(df.groupby(group_on), agg)() + pd_result = getattr(pd_df.groupby(group_on), agg)() + assert_frame_equal(ak_result.to_pandas(retain_index=True), pd_result) + + def test_count_nan_bug(self): + # verify reproducer for #3762 is fixed + df = ak.DataFrame({"A": [1, 2, 2, np.nan], "B": [3, 4, 5, 6], "C": [1, np.nan, 2, 3]}) + ak_result = df.groupby("A").count() + pd_result = df.to_pandas().groupby("A").count() + assert_frame_equal(ak_result.to_pandas(retain_index=True), pd_result) + + ak_result = df.groupby(["A", "C"], as_index=False).count() + pd_result = df.to_pandas().groupby(["A", "C"], as_index=False).count() + assert_frame_equal(ak_result.to_pandas(retain_index=True), pd_result) + def test_gb_aggregations_return_dataframe(self): ak_df = self.build_ak_df_example2() pd_df = ak_df.to_pandas(retain_index=True) From fc5f4ea3fe180b7a3a777f0af4c26ad3e4dc40f3 Mon Sep 17 00:00:00 2001 From: Tess Hayes Date: Wed, 18 Sep 2024 14:25:32 -0400 Subject: [PATCH 2/2] updated in response to PR feedback --- arkouda/dataframe.py | 8 ++++---- tests/dataframe_test.py | 26 ++++++++++++++------------ 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/arkouda/dataframe.py b/arkouda/dataframe.py index e1f25d1d19..a6deade216 100644 --- a/arkouda/dataframe.py +++ b/arkouda/dataframe.py @@ -106,7 +106,7 @@ class DataFrameGroupBy: If True the grouped values of the aggregation keys will be treated as an index. """ - def __init__(self, gb, df, gb_key_names=None, as_index=True, dropna=True): + def __init__(self, gb, df, gb_key_names=None, as_index=True): self.gb = gb self.df = df @@ -115,11 +115,11 @@ def __init__(self, gb, df, gb_key_names=None, as_index=True, dropna=True): for attr in ["nkeys", "permutation", "unique_keys", "segments"]: setattr(self, attr, getattr(gb, attr)) - self.dropna = dropna + self.dropna = self.gb.dropna self.where_not_nan = None self.all_non_nan = False - if dropna: + if self.dropna: from arkouda import all as ak_all from arkouda import isnan @@ -2714,7 +2714,7 @@ def GroupBy(self, keys, use_series=False, as_index=True, dropna=True): gb = akGroupBy(cols, dropna=dropna) if use_series: - gb = DataFrameGroupBy(gb, self, gb_key_names=keys, as_index=as_index, dropna=dropna) + gb = DataFrameGroupBy(gb, self, gb_key_names=keys, as_index=as_index) return gb def memory_usage(self, index=True, unit="B") -> Series: diff --git a/tests/dataframe_test.py b/tests/dataframe_test.py index b536f73b0a..fd6c1f6b9c 100644 --- a/tests/dataframe_test.py +++ b/tests/dataframe_test.py @@ -650,16 +650,17 @@ def test_gb_aggregations_example_numeric_types(self, agg): pd_result = getattr(pd_df.groupby(group_on), agg)() assert_frame_equal(ak_result.to_pandas(retain_index=True), pd_result) + @pytest.mark.parametrize("dropna", [True, False]) @pytest.mark.parametrize("agg", ["count", "max", "mean", "median", "min", "std", "sum", "var"]) - def test_gb_aggregations_with_nans(self, agg): + def test_gb_aggregations_with_nans(self, agg, dropna): df = self.build_ak_df_with_nans() # @TODO handle bool columns correctly df.drop("bools", axis=1, inplace=True) pd_df = df.to_pandas() group_on = ["key1", "key2"] - ak_result = getattr(df.groupby(group_on), agg)() - pd_result = getattr(pd_df.groupby(group_on, as_index=False), agg)() + ak_result = getattr(df.groupby(group_on, dropna=dropna), agg)() + pd_result = getattr(pd_df.groupby(group_on, as_index=False, dropna=dropna), agg)() assert_frame_equal(ak_result.to_pandas(retain_index=True), pd_result) # TODO aggregations of string columns not currently supported (even for count) @@ -668,27 +669,28 @@ def test_gb_aggregations_with_nans(self, agg): pd_df = df.to_pandas() group_on = ["nums1", "nums2"] - ak_result = getattr(df.groupby(group_on), agg)() - pd_result = getattr(pd_df.groupby(group_on, as_index=False), agg)() + ak_result = getattr(df.groupby(group_on, dropna=dropna), agg)() + pd_result = getattr(pd_df.groupby(group_on, as_index=False, dropna=dropna), agg)() assert_frame_equal(ak_result.to_pandas(retain_index=True), pd_result) # TODO aggregation mishandling NaN see issue #3765 df.drop("nums2", axis=1, inplace=True) pd_df = df.to_pandas() group_on = "nums1" - ak_result = getattr(df.groupby(group_on), agg)() - pd_result = getattr(pd_df.groupby(group_on), agg)() + ak_result = getattr(df.groupby(group_on, dropna=dropna), agg)() + pd_result = getattr(pd_df.groupby(group_on, dropna=dropna), agg)() assert_frame_equal(ak_result.to_pandas(retain_index=True), pd_result) - def test_count_nan_bug(self): + @pytest.mark.parametrize("dropna", [True, False]) + def test_count_nan_bug(self, dropna): # verify reproducer for #3762 is fixed df = ak.DataFrame({"A": [1, 2, 2, np.nan], "B": [3, 4, 5, 6], "C": [1, np.nan, 2, 3]}) - ak_result = df.groupby("A").count() - pd_result = df.to_pandas().groupby("A").count() + ak_result = df.groupby("A", dropna=dropna).count() + pd_result = df.to_pandas().groupby("A", dropna=dropna).count() assert_frame_equal(ak_result.to_pandas(retain_index=True), pd_result) - ak_result = df.groupby(["A", "C"], as_index=False).count() - pd_result = df.to_pandas().groupby(["A", "C"], as_index=False).count() + ak_result = df.groupby(["A", "C"], as_index=False, dropna=dropna).count() + pd_result = df.to_pandas().groupby(["A", "C"], as_index=False, dropna=dropna).count() assert_frame_equal(ak_result.to_pandas(retain_index=True), pd_result) def test_gb_aggregations_return_dataframe(self):