diff --git a/.github/workflows/edgetest.yml b/.github/workflows/edgetest.yml index e12e56c5..b53cf46c 100644 --- a/.github/workflows/edgetest.yml +++ b/.github/workflows/edgetest.yml @@ -1,5 +1,5 @@ # This workflow runs edgetest on datacompy @ 17:30 UTC Fridays. -# https://github.com/fdosani/run-edgetest-action +# https://github.com/edgetest-dev/run-edgetest-action name: Run edgetest on: @@ -14,7 +14,7 @@ jobs: with: ref: develop - id: run-edgetest - uses: fdosani/run-edgetest-action@v1.2 + uses: edgetest-dev/run-edgetest-action@v1.4 with: edgetest-flags: '-c pyproject.toml --export' base-branch: 'develop' diff --git a/datacompy/__init__.py b/datacompy/__init__.py index 3ddb0b77..7608f8cb 100644 --- a/datacompy/__init__.py +++ b/datacompy/__init__.py @@ -13,11 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "0.10.4" +__version__ = "0.10.5" from datacompy.core import * from datacompy.fugue import ( all_columns_match, + all_rows_overlap, intersect_columns, is_match, report, diff --git a/datacompy/fugue.py b/datacompy/fugue.py index 729f6913..80038aa2 100644 --- a/datacompy/fugue.py +++ b/datacompy/fugue.py @@ -195,6 +195,102 @@ def is_match( return all(matches) +def all_rows_overlap( + df1: AnyDataFrame, + df2: AnyDataFrame, + join_columns: Union[str, List[str]], + abs_tol: float = 0, + rel_tol: float = 0, + df1_name: str = "df1", + df2_name: str = "df2", + ignore_spaces: bool = False, + ignore_case: bool = False, + cast_column_names_lower: bool = True, + parallelism: Optional[int] = None, + strict_schema: bool = False, +) -> bool: + """Check if the rows are all present in both dataframes + + Parameters + ---------- + df1 : ``AnyDataFrame`` + First dataframe to check + df2 : ``AnyDataFrame`` + Second dataframe to check + join_columns : list or str, optional + Column(s) to join dataframes on. If a string is passed in, that one + column will be used. + abs_tol : float, optional + Absolute tolerance between two values. + rel_tol : float, optional + Relative tolerance between two values. + df1_name : str, optional + A string name for the first dataframe. This allows the reporting to + print out an actual name instead of "df1", and allows human users to + more easily track the dataframes. + df2_name : str, optional + A string name for the second dataframe + ignore_spaces : bool, optional + Flag to strip whitespace (including newlines) from string columns (including any join + columns) + ignore_case : bool, optional + Flag to ignore the case of string columns + cast_column_names_lower: bool, optional + Boolean indicator that controls of column names will be cast into lower case + parallelism: int, optional + An integer representing the amount of parallelism. Entering a value for this + will force to use of Fugue over just vanilla Pandas + strict_schema: bool, optional + The schema must match exactly if set to ``True``. This includes the names and types. Allows for a fast fail. + + Returns + ------- + bool + True if all rows in df1 are in df2 and vice versa (based on + existence for join option) + """ + if ( + isinstance(df1, pd.DataFrame) + and isinstance(df2, pd.DataFrame) + and parallelism is None # user did not specify parallelism + and fa.get_current_parallelism() == 1 # currently on a local execution engine + ): + comp = Compare( + df1=df1, + df2=df2, + join_columns=join_columns, + abs_tol=abs_tol, + rel_tol=rel_tol, + df1_name=df1_name, + df2_name=df2_name, + ignore_spaces=ignore_spaces, + ignore_case=ignore_case, + cast_column_names_lower=cast_column_names_lower, + ) + return comp.all_rows_overlap() + + try: + overlap = _distributed_compare( + df1=df1, + df2=df2, + join_columns=join_columns, + return_obj_func=lambda comp: comp.all_rows_overlap(), + abs_tol=abs_tol, + rel_tol=rel_tol, + df1_name=df1_name, + df2_name=df2_name, + ignore_spaces=ignore_spaces, + ignore_case=ignore_case, + cast_column_names_lower=cast_column_names_lower, + parallelism=parallelism, + strict_schema=strict_schema, + ) + except _StrictSchemaError: + return False + + return all(overlap) + + def report( df1: AnyDataFrame, df2: AnyDataFrame, @@ -210,7 +306,7 @@ def report( column_count=10, html_file=None, parallelism: Optional[int] = None, -) -> None: +) -> str: """Returns a string representation of a report. The representation can then be printed or saved to a file. @@ -460,7 +556,7 @@ def _distributed_compare( parallelism: Optional[int] = None, strict_schema: bool = False, ) -> List[Any]: - """Compare the data distributedly using the core Compare class + """Compare the data distributively using the core Compare class Both df1 and df2 should be dataframes containing all of the join_columns, with unique column names. Differences between values are compared to @@ -541,6 +637,7 @@ def _distributed_compare( def _serialize(dfs: Iterable[pd.DataFrame], left: bool) -> Iterable[Dict[str, Any]]: for df in dfs: + df = df.convert_dtypes() cols = {} for name in df.columns: col = df[name] @@ -577,8 +674,28 @@ def _deserialize( arr = [pickle.loads(r["data"]) for r in df if r["left"] == left] if len(arr) > 0: return pd.concat(arr).sort_values(schema.names).reset_index(drop=True) - return pd.DataFrame( - {k: pd.Series(dtype=v) for k, v in schema.pandas_dtype.items()} + # The following is how to construct an empty pandas dataframe with + # the correct schema, it avoids pandas schema inference which is wrong. + # This is not needed when upgrading to Fugue >= 0.8.7 + sample_row: List[Any] = [] + for field in schema.fields: + if pa.types.is_string(field.type): + sample_row.append("x") + elif pa.types.is_integer(field.type): + sample_row.append(1) + elif pa.types.is_floating(field.type): + sample_row.append(1.1) + elif pa.types.is_boolean(field.type): + sample_row.append(True) + elif pa.types.is_timestamp(field.type): + sample_row.append(pd.NaT) + else: + sample_row.append(None) + return ( + pd.DataFrame([sample_row], columns=schema.names) + .astype(schema.pandas_dtype) + .convert_dtypes() + .head(0) ) def _comp(df: List[Dict[str, Any]]) -> List[List[Any]]: diff --git a/docs/source/fugue_usage.rst b/docs/source/fugue_usage.rst index 1d28c6fc..34791c34 100644 --- a/docs/source/fugue_usage.rst +++ b/docs/source/fugue_usage.rst @@ -10,6 +10,10 @@ Basic Usage The Fugue implementation can be accessed via the: +- ``datacompy.unq_columns`` +- ``datacompy.intersect_columns`` +- ``datacompy.all_columns_match`` +- ``datacompy.all_rows_overlap`` - ``datacompy.is_match`` - and ``datacompy.report`` functions diff --git a/pyproject.toml b/pyproject.toml index 0b68eb38..ae11c20c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,7 +15,7 @@ dependencies = [ "pandas<=2.0.2,>=0.25.0", "numpy<=1.26.0,>=1.22.0", "ordered-set<=4.1.0,>=4.0.2", - "fugue<=0.9.0,>=0.8.6", + "fugue<=0.8.7,>=0.8.7", ] requires-python = ">=3.8.0" classifiers = [ @@ -94,12 +94,13 @@ dev = [ "datacompy[build]", ] -[isort] +[tool.isort] multi_line_output = 3 include_trailing_comma = true force_grid_wrap = 0 use_parentheses = true line_length = 88 +profile = "black" [edgetest.envs.core] python_version = "3.9" diff --git a/tests/test_fugue.py b/tests/test_fugue.py index b65e6f71..47f7008b 100644 --- a/tests/test_fugue.py +++ b/tests/test_fugue.py @@ -29,6 +29,7 @@ from datacompy import ( Compare, all_columns_match, + all_rows_overlap, intersect_columns, is_match, report, @@ -50,7 +51,14 @@ def ref_df(): df1_copy = df1.copy() df2 = df1.copy().drop(columns=["c"]) df3 = df1.copy().drop(columns=["a", "b"]) - return [df1, df1_copy, df2, df3] + df4 = pd.DataFrame( + dict( + a=np.random.randint(1, 12, 100), # shift the join col + b=np.random.rand(100), + c=np.random.choice(["aaa", "b_c", "csd"], 100), + ) + ) + return [df1, df1_copy, df2, df3, df4] @pytest.fixture @@ -80,37 +88,39 @@ def upper_col_df(shuffle_df): @pytest.fixture def simple_diff_df1(): - return pd.DataFrame(dict(aa=[0, 1, 0], bb=[2.1, 3.1, 4.1])) + return pd.DataFrame(dict(aa=[0, 1, 0], bb=[2.1, 3.1, 4.1])).convert_dtypes() @pytest.fixture def simple_diff_df2(): - return pd.DataFrame(dict(aa=[1, 0, 1], bb=[3.1, 4.1, 5.1], cc=["a", "b", "c"])) + return pd.DataFrame( + dict(aa=[1, 0, 1], bb=[3.1, 4.1, 5.1], cc=["a", "b", "c"]) + ).convert_dtypes() @pytest.fixture def no_intersection_diff_df1(): np.random.seed(0) - return pd.DataFrame(dict(x=["a"], y=[0.1])) + return pd.DataFrame(dict(x=["a"], y=[0.1])).convert_dtypes() @pytest.fixture def no_intersection_diff_df2(): - return pd.DataFrame(dict(x=["b"], y=[1.1])) + return pd.DataFrame(dict(x=["b"], y=[1.1])).convert_dtypes() @pytest.fixture def large_diff_df1(): np.random.seed(0) data = np.random.randint(0, 7, size=10000) - return pd.DataFrame({"x": data, "y": np.array([9] * 10000)}) + return pd.DataFrame({"x": data, "y": np.array([9] * 10000)}).convert_dtypes() @pytest.fixture def large_diff_df2(): np.random.seed(0) data = np.random.randint(6, 11, size=10000) - return pd.DataFrame({"x": data, "y": np.array([9] * 10000)}) + return pd.DataFrame({"x": data, "y": np.array([9] * 10000)}).convert_dtypes() def test_is_match_native( @@ -590,3 +600,73 @@ def test_all_columns_match_duckdb(ref_df): assert all_columns_match(df1, df3) is False assert all_columns_match(df1_copy, df1) is True assert all_columns_match(df3, df2) is False + + +def test_all_rows_overlap_native( + ref_df, + shuffle_df, +): + # defaults to Compare class + assert all_rows_overlap(ref_df[0], ref_df[0].copy(), join_columns="a") + assert all_rows_overlap(ref_df[0], shuffle_df, join_columns="a") + assert not all_rows_overlap(ref_df[0], ref_df[4], join_columns="a") + # Fugue + assert all_rows_overlap(ref_df[0], shuffle_df, join_columns="a", parallelism=2) + assert not all_rows_overlap(ref_df[0], ref_df[4], join_columns="a", parallelism=2) + + +def test_all_rows_overlap_spark( + spark_session, + ref_df, + shuffle_df, +): + ref_df[0].iteritems = ref_df[0].items # pandas 2 compatibility + ref_df[4].iteritems = ref_df[4].items # pandas 2 compatibility + shuffle_df.iteritems = shuffle_df.items # pandas 2 compatibility + rdf = spark_session.createDataFrame(ref_df[0]) + rdf_copy = spark_session.createDataFrame(ref_df[0]) + rdf4 = spark_session.createDataFrame(ref_df[4]) + sdf = spark_session.createDataFrame(shuffle_df) + + assert all_rows_overlap(rdf, rdf_copy, join_columns="a") + assert all_rows_overlap(rdf, sdf, join_columns="a") + assert not all_rows_overlap(rdf, rdf4, join_columns="a") + assert all_rows_overlap( + spark_session.sql("SELECT 'a' AS a, 'b' AS b"), + spark_session.sql("SELECT 'a' AS a, 'b' AS b"), + join_columns="a", + ) + + +def test_all_rows_overlap_polars( + ref_df, + shuffle_df, +): + rdf = pl.from_pandas(ref_df[0]) + rdf_copy = pl.from_pandas(ref_df[0].copy()) + rdf4 = pl.from_pandas(ref_df[4]) + sdf = pl.from_pandas(shuffle_df) + + assert all_rows_overlap(rdf, rdf_copy, join_columns="a") + assert all_rows_overlap(rdf, sdf, join_columns="a") + assert not all_rows_overlap(rdf, rdf4, join_columns="a") + + +def test_all_rows_overlap_duckdb( + ref_df, + shuffle_df, +): + with duckdb.connect(): + rdf = duckdb.from_df(ref_df[0]) + rdf_copy = duckdb.from_df(ref_df[0].copy()) + rdf4 = duckdb.from_df(ref_df[4]) + sdf = duckdb.from_df(shuffle_df) + + assert all_rows_overlap(rdf, rdf_copy, join_columns="a") + assert all_rows_overlap(rdf, sdf, join_columns="a") + assert not all_rows_overlap(rdf, rdf4, join_columns="a") + assert all_rows_overlap( + duckdb.sql("SELECT 'a' AS a, 'b' AS b"), + duckdb.sql("SELECT 'a' AS a, 'b' AS b"), + join_columns="a", + )