From f3f1de8930a536fbb8b40d1538a14a16aab0856b Mon Sep 17 00:00:00 2001 From: Eddie Gelberg <32350869+egelberg@users.noreply.github.com> Date: Fri, 6 Oct 2023 15:44:36 -0400 Subject: [PATCH] Closes #2716: Add dataframe merge functionality (#2781) * add merge functionality * moving functionality to dataframe.py * remove numeric import * change exception error to TypeError * int col float behavior * remove extraneous code and fix type errors * change the float cast from np to ak * Update arkouda/dataframe.py Co-authored-by: pierce <48131946+pierce314159@users.noreply.github.com> * Update arkouda/dataframe.py Co-authored-by: pierce <48131946+pierce314159@users.noreply.github.com> * Update arkouda/dataframe.py Co-authored-by: pierce <48131946+pierce314159@users.noreply.github.com> * address some of Pierce's comments * identical column suffixes * added df.merge functions * bug fix for the right_join_merge method * add merge test for dataframe * temp test fix, order is wonky but not wrong * Update arkouda/dataframe.py --------- Co-authored-by: Eddie Co-authored-by: pierce <48131946+pierce314159@users.noreply.github.com> Co-authored-by: Pierce Hayes --- arkouda/dataframe.py | 298 +++++++++++++++++++++++++++++++++++++++- tests/dataframe_test.py | 67 +++++++++ 2 files changed, 364 insertions(+), 1 deletion(-) diff --git a/arkouda/dataframe.py b/arkouda/dataframe.py index e00eb16d0b..fd21e40c78 100644 --- a/arkouda/dataframe.py +++ b/arkouda/dataframe.py @@ -11,6 +11,7 @@ import pandas as pd # type: ignore from typeguard import typechecked +from arkouda.alignment import find from arkouda.categorical import Categorical from arkouda.client import generic_msg, maxTransferBytes from arkouda.client_dtypes import BitVector, Fields, IPv4 @@ -21,11 +22,12 @@ from arkouda.groupbyclass import GroupBy as akGroupBy from arkouda.groupbyclass import unique from arkouda.index import Index +from arkouda.join import inner_join from arkouda.numeric import cast as akcast from arkouda.numeric import cumsum, where from arkouda.pdarrayclass import RegistrationError, pdarray from arkouda.pdarraycreation import arange, array, create_pdarray, zeros -from arkouda.pdarraysetops import concatenate, in1d, intersect1d +from arkouda.pdarraysetops import concatenate, in1d, intersect1d, setdiff1d from arkouda.row import Row from arkouda.segarray import SegArray from arkouda.series import Series @@ -43,6 +45,9 @@ "intersect", "invert_permutation", "intx", + "inner_join_merge", + "right_join_merge", + "merge", ] @@ -2238,6 +2243,118 @@ def numeric_help(d): ret_dict = json.loads(generic_msg(cmd="corrMatrix", args=args)) return DataFrame({c: create_pdarray(ret_dict[c]) for c in self.columns}) + @typechecked + def inner_join_merge( + self, + right: DataFrame, + on: str, + left_suffix: str = "_x", + right_suffix: str = "_y", + ) -> DataFrame: + """ + Utilizes the ak.join.inner_join function to return an ak + DataFrame object containing only rows that are in both + self and right Dataframes, (based on the "on" param), + as well as their associated values. For this function self + is considered the left dataframe. + + Parameters + ---------- + right : DataFrame + The Right DataFrame to be joined + on : str + The name of the DataFrame column the join is being + performed on + left_suffix: str = "_x" + A string indicating the suffix to add to columns from self for overlapping + column names in both left and right. Defaults to "_x" + right_suffix: str = "_y" + A string indicating the suffix to add to columns from the other dataframe for overlapping + column names in both left and right. Defaults to "_y" + + Returns + ------- + DataFrame + Inner-Joined Arkouda DataFrame + """ + return inner_join_merge(self, right, on, left_suffix, right_suffix) + + def right_join_merge(self, right: DataFrame, on: str) -> DataFrame: + """ + Utilizes the ak.join.inner_join_merge function to return an + ak DataFrame object containing all the rows in the right Dataframe, + as well as corresponding rows in self (based on the "on" param), + and all of their associated values. For this function self + is considered the left dataframe. + Based on pandas merge functionality. + + Parameters + ---------- + right : DataFrame + The Right DataFrame to be joined + on : str + The name of the DataFrame column the join is being + performed on + + Returns + ------- + DataFrame + Right-Joined Arkouda DataFrame + """ + return right_join_merge(self, right, on) + + @typechecked + def merge( + self, + right: DataFrame, + on: str, + how: str, + left_suffix: str = "_x", + right_suffix: str = "_y", + ) -> DataFrame: + """ + Utilizes the ak.join.inner_join_merge and the ak.join.right_join_merge + functions to return a merged Arkouda DataFrame object + containing rows from both DataFrames as specified by the merge + condition (based on the "how" and "on" parameters). For this function self + is considered the left dataframe. + Based on pandas merge functionality. + https://github.com/pandas-dev/pandas/blob/main/pandas/core/reshape/merge.py#L137 + + Parameters + ---------- + right: DataFrame + The Right DataFrame to be joined + on: str + The name of the DataFrame column the join is being + performed on + how: str + The merge condition. + Must be "inner", "left", or "right" + left_suffix: str = "_x" + A string indicating the suffix to add to columns from the left dataframe for overlapping + column names in both left and right. Defaults to "_x". Only used when how is "inner" + right_suffix: str = "_y" + A string indicating the suffix to add to columns from the right dataframe for overlapping + column names in both left and right. Defaults to "_y". Only used when how is "inner" + + Returns + ------- + DataFrame + Joined Arkouda DataFrame + """ + + if how == "inner": + return inner_join_merge(self, right, on, left_suffix, right_suffix) + elif how == "right": + return right_join_merge(self, right, on) + elif how == "left": + return right_join_merge(right, self, on) + else: + raise ValueError( + f"Unexpected value of {how} for how. Must choose: 'inner', 'left', or 'right'" + ) + @typechecked def register(self, user_defined_name: str) -> DataFrame: """ @@ -2706,3 +2823,182 @@ def invert_permutation(perm): if (unique(perm).size != perm.size) and (perm.size != rng + 1): raise ValueError("The array is not a permutation.") return coargsort([perm, arange(perm.size)]) + + +@typechecked +def inner_join_merge( + left: DataFrame, + right: DataFrame, + on: str, + left_suffix: str = "_x", + right_suffix: str = "_y", +) -> DataFrame: + """ + Utilizes the ak.join.inner_join function to return an ak + DataFrame object containing only rows that are in both + the left and right Dataframes, (based on the "on" param), + as well as their associated values. + Parameters + ---------- + left: DataFrame + The Left DataFrame to be joined + right: DataFrame + The Right DataFrame to be joined + on: str + The name of the DataFrame column the join is being + performed on + left_suffix: str = "_x" + A string indicating the suffix to add to columns from the left dataframe for overlapping + column names in both left and right. Defaults to "_x" + right_suffix: str = "_y" + A string indicating the suffix to add to columns from the right dataframe for overlapping + column names in both left and right. Defaults to "_y" + Returns + ------- + DataFrame + Inner-Joined Arkouda DataFrame + """ + + left_inds, right_inds = inner_join(left[on], right[on]) + + left_cols = left.columns.copy() + left_cols.remove(on) + right_cols = right.columns.copy() + right_cols.remove(on) + + new_dict = {on: left[on][left_inds]} + + for col in left_cols: + if col in right_cols: + new_col = col + left_suffix + else: + new_col = col + new_dict[new_col] = left[col][left_inds] + for col in right_cols: + if col in left_cols: + new_col = col + right_suffix + else: + new_col = col + new_dict[new_col] = right[col][right_inds] + + return DataFrame(new_dict) + + +def right_join_merge( + left: DataFrame, + right: DataFrame, + on: str, + left_suffix: str = "_x", + right_suffix: str = "_y", +) -> DataFrame: + """ + Utilizes the ak.join.inner_join_merge function to return an + ak DataFrame object containing all the rows in the right Dataframe, + as well as corresponding rows in the left (based on the "on" param), + and all of their associated values. + Based on pandas merge functionality. + Parameters + ---------- + left: DataFrame + The Left DataFrame to be joined + right: DataFrame + The Right DataFrame to be joined + on: str + The name of the DataFrame column the join is being + performed on + left_suffix: str = "_x" + A string indicating the suffix to add to columns from the left dataframe for overlapping + column names in both left and right. Defaults to "_x" + right_suffix: str = "_y" + A string indicating the suffix to add to columns from the right dataframe for overlapping + column names in both left and right. Defaults to "_y" + Returns + ------- + DataFrame + Right-Joined Arkouda DataFrame + """ + + left_cols = left.columns.copy() + left_cols.remove(on) + + in_left = inner_join_merge(left, right, on, left_suffix, right_suffix) + in_left_cols = in_left.columns.copy() + in_left_cols.remove(on) + + not_in_left = right[find(setdiff1d(right[on], left[on]), right[on])] + for col in not_in_left.columns: + if col in left_cols: + new_col = col + right_suffix + not_in_left[new_col] = not_in_left[col] + not_in_left = not_in_left.drop(col, axis=1) + + nan_cols = list(set(in_left) - set(in_left).intersection(set(not_in_left))) + + for col in nan_cols: + # Create a nan array for all values not in the left df + nan_arr = zeros(len(not_in_left)) + nan_arr.fill(np.nan) + left_col_type = type(in_left[col]) + if in_left[col].dtype == int: + in_left[col] = akcast(in_left[col], akfloat64) + else: + nan_arr = akcast(nan_arr, in_left[col].dtype) + + try: + not_in_left[col] = left_col_type(nan_arr) + except TypeError: + not_in_left[col] = nan_arr + + right_ak_df = DataFrame.append(in_left, not_in_left) + + return right_ak_df + + +@typechecked +def merge( + left: DataFrame, + right: DataFrame, + on: str, + how: str, + left_suffix: str = "_x", + right_suffix: str = "_y", +) -> DataFrame: + """ + Utilizes the ak.join.inner_join_merge and the ak.join.right_join_merge + functions to return a merged Arkouda DataFrame object + containing rows from both DataFrames as specified by the merge + condition (based on the "how" and "on" parameters). + Based on pandas merge functionality. + https://github.com/pandas-dev/pandas/blob/main/pandas/core/reshape/merge.py#L137 + Parameters + ---------- + left: DataFrame + The Left DataFrame to be joined + right: DataFrame + The Right DataFrame to be joined + on: str + The name of the DataFrame column the join is being + performed on + how: str + The merge condition. + Must be "inner", "left", or "right" + left_suffix: str = "_x" + A string indicating the suffix to add to columns from the left dataframe for overlapping + column names in both left and right. Defaults to "_x". Only used when how is "inner" + right_suffix: str = "_y" + A string indicating the suffix to add to columns from the right dataframe for overlapping + column names in both left and right. Defaults to "_y". Only used when how is "inner" + Returns + ------- + DataFrame + Joined Arkouda DataFrame + """ + + if how == 'inner': + return inner_join_merge(left, right, on, left_suffix, right_suffix) + elif how == 'right': + return right_join_merge(left, right, on, left_suffix, right_suffix) + elif how == 'left': + return right_join_merge(right, left, on, right_suffix, left_suffix) + else: + raise ValueError(f"Unexpected value of {how} for how. Must choose: 'inner', 'left', or 'right'") diff --git a/tests/dataframe_test.py b/tests/dataframe_test.py index 4c1c06c5e8..85579584c9 100644 --- a/tests/dataframe_test.py +++ b/tests/dataframe_test.py @@ -746,3 +746,70 @@ def test_subset(self): self.assertListEqual(df.index.to_list(), df2.index.to_list()) self.assertListEqual(df["a"].to_list(), df2["a"].to_list()) self.assertListEqual(df["b"].to_list(), df2["b"].to_list()) + + def test_merge(self): + df1 = ak.DataFrame( + { + "key": ak.arange(4), + "value1": ak.array(["A", "B", "C", "D"]), + } + ) + + df2 = ak.DataFrame( + { + "key": ak.arange(2, 6, 1), + "value1": ak.array(["A", "B", "D", "F"]), + "value2": ak.array(["apple", "banana", "cherry", "date"]), + } + ) + + ij_expected_df = ak.DataFrame( + { + "key": ak.array([2, 3]), + "value1_x": ak.array(["C", "D"]), + "value1_y": ak.array(["A", "B"]), + "value2": ak.array(["apple", "banana"]) + } + ) + + ij_merged_df = ak.merge(df1, df2, how="inner", on="key") + + self.assertListEqual(ij_expected_df.columns, ij_merged_df.columns) + self.assertListEqual(ij_expected_df["key"].to_list(), ij_merged_df["key"].to_list()) + self.assertListEqual(ij_expected_df["value1_x"].to_list(), ij_merged_df["value1_x"].to_list()) + self.assertListEqual(ij_expected_df["value1_y"].to_list(), ij_merged_df["value1_y"].to_list()) + self.assertListEqual(ij_expected_df["value2"].to_list(), ij_merged_df["value2"].to_list()) + + rj_expected_df = ak.DataFrame( + { + "key": ak.array([2, 3, 4, 5]), + "value1_x": ak.array(["C", "D", "nan", "nan"]), + "value1_y": ak.array(["A", "B", "D", "F"]), + "value2": ak.array(["apple", "banana", "cherry", "date"]) + } + ) + + rj_merged_df = ak.merge(df1, df2, how="right", on="key") + + self.assertListEqual(rj_expected_df.columns, rj_merged_df.columns) + self.assertListEqual(rj_expected_df["key"].to_list(), rj_merged_df["key"].to_list()) + self.assertListEqual(rj_expected_df["value1_x"].to_list(), rj_merged_df["value1_x"].to_list()) + self.assertListEqual(rj_expected_df["value1_y"].to_list(), rj_merged_df["value1_y"].to_list()) + self.assertListEqual(rj_expected_df["value2"].to_list(), rj_merged_df["value2"].to_list()) + + lj_expected_df = ak.DataFrame( + { + "key": ak.array([2, 3, 0, 1]), + "value1_y": ak.array(["A", "B", "nan", "nan"]), + "value2": ak.array(["apple", "banana", "nan", "nan"]), + "value1_x": ak.array(["C", "D", "A", "B"]), + } + ) + + lj_merged_df = ak.merge(df1, df2, how="left", on="key") + + self.assertListEqual(lj_expected_df.columns, lj_merged_df.columns) + self.assertListEqual(lj_expected_df["key"].to_list(), lj_merged_df["key"].to_list()) + self.assertListEqual(lj_expected_df["value1_x"].to_list(), lj_merged_df["value1_x"].to_list()) + self.assertListEqual(lj_expected_df["value1_y"].to_list(), lj_merged_df["value1_y"].to_list()) + self.assertListEqual(lj_expected_df["value2"].to_list(), lj_merged_df["value2"].to_list())