From 67d0fa2c537b33249c8533c5e2bb3adf3ced42d9 Mon Sep 17 00:00:00 2001 From: Faisal Date: Wed, 16 Oct 2024 17:13:31 -0300 Subject: [PATCH 1/3] fixes #334 (#338) --- datacompy/core.py | 4 ++-- datacompy/fugue.py | 4 ++-- datacompy/polars.py | 4 ++-- datacompy/spark/pandas.py | 4 ++-- datacompy/spark/sql.py | 4 ++-- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/datacompy/core.py b/datacompy/core.py index f9a3a314..51c3fc74 100644 --- a/datacompy/core.py +++ b/datacompy/core.py @@ -606,8 +606,8 @@ def df_to_str(pdf: pd.DataFrame) -> str: report += render( "column_summary.txt", len(self.intersect_columns()), - len(self.df1_unq_columns()), - len(self.df2_unq_columns()), + f"{len(self.df1_unq_columns())} {self.df1_unq_columns().items}", + f"{len(self.df2_unq_columns())} {self.df2_unq_columns().items}", self.df1_name, self.df2_name, ) diff --git a/datacompy/fugue.py b/datacompy/fugue.py index f2983c49..c4c27cbe 100644 --- a/datacompy/fugue.py +++ b/datacompy/fugue.py @@ -526,8 +526,8 @@ def _any(col: str) -> int: rpt += render( "column_summary.txt", len(first["intersect_columns"]), - len(first["df1_unq_columns"]), - len(first["df2_unq_columns"]), + f"{len(first['df1_unq_columns'])} {first['df1_unq_columns'].items}", + f"{len(first['df2_unq_columns'])} {first['df2_unq_columns'].items}", df1_name, df2_name, ) diff --git a/datacompy/polars.py b/datacompy/polars.py index c9758548..0256de99 100644 --- a/datacompy/polars.py +++ b/datacompy/polars.py @@ -615,8 +615,8 @@ def df_to_str(pdf: "pl.DataFrame") -> str: report += render( "column_summary.txt", len(self.intersect_columns()), - len(self.df1_unq_columns()), - len(self.df2_unq_columns()), + f"{len(self.df1_unq_columns())} {self.df1_unq_columns().items}", + f"{len(self.df2_unq_columns())} {self.df2_unq_columns().items}", self.df1_name, self.df2_name, ) diff --git a/datacompy/spark/pandas.py b/datacompy/spark/pandas.py index c946395d..6d27ecdf 100644 --- a/datacompy/spark/pandas.py +++ b/datacompy/spark/pandas.py @@ -666,8 +666,8 @@ def report( report += render( "column_summary.txt", len(self.intersect_columns()), - len(self.df1_unq_columns()), - len(self.df2_unq_columns()), + f"{len(self.df1_unq_columns())} {self.df1_unq_columns().items}", + f"{len(self.df2_unq_columns())} {self.df2_unq_columns().items}", self.df1_name, self.df2_name, ) diff --git a/datacompy/spark/sql.py b/datacompy/spark/sql.py index f99b7174..404efe76 100644 --- a/datacompy/spark/sql.py +++ b/datacompy/spark/sql.py @@ -755,8 +755,8 @@ def report( report += render( "column_summary.txt", len(self.intersect_columns()), - len(self.df1_unq_columns()), - len(self.df2_unq_columns()), + f"{len(self.df1_unq_columns())} {self.df1_unq_columns().items}", + f"{len(self.df2_unq_columns())} {self.df2_unq_columns().items}", self.df1_name, self.df2_name, ) From 1078a82ead024a2751669a848f2c54e56113b86b Mon Sep 17 00:00:00 2001 From: rhaffar <141745338+rhaffar@users.noreply.github.com> Date: Thu, 17 Oct 2024 10:44:24 -0400 Subject: [PATCH 2/3] add diff for join columns (#339) --- datacompy/core.py | 5 ++++- datacompy/polars.py | 5 ++++- datacompy/spark/pandas.py | 5 ++++- datacompy/spark/sql.py | 5 ++++- 4 files changed, 16 insertions(+), 4 deletions(-) diff --git a/datacompy/core.py b/datacompy/core.py index 51c3fc74..889fb901 100644 --- a/datacompy/core.py +++ b/datacompy/core.py @@ -179,7 +179,10 @@ def _validate_dataframe( dataframe.columns = pd.Index([str(col) for col in dataframe.columns]) # Check if join_columns are present in the dataframe if not set(self.join_columns).issubset(set(dataframe.columns)): - raise ValueError(f"{index} must have all columns from join_columns") + missing_cols = set(self.join_columns) - set(dataframe.columns) + raise ValueError( + f"{index} must have all columns from join_columns: {missing_cols}" + ) if len(set(dataframe.columns)) < len(dataframe.columns): raise ValueError(f"{index} must have unique column names") diff --git a/datacompy/polars.py b/datacompy/polars.py index 0256de99..6b335c46 100644 --- a/datacompy/polars.py +++ b/datacompy/polars.py @@ -176,7 +176,10 @@ def _validate_dataframe( # Check if join_columns are present in the dataframe if not set(self.join_columns).issubset(set(dataframe.columns)): - raise ValueError(f"{index} must have all columns from join_columns") + missing_cols = set(self.join_columns) - set(dataframe.columns) + raise ValueError( + f"{index} must have all columns from join_columns: {missing_cols}" + ) if len(set(dataframe.columns)) < len(dataframe.columns): raise ValueError(f"{index} must have unique column names") diff --git a/datacompy/spark/pandas.py b/datacompy/spark/pandas.py index 6d27ecdf..a7bfd1af 100644 --- a/datacompy/spark/pandas.py +++ b/datacompy/spark/pandas.py @@ -181,7 +181,10 @@ def _validate_dataframe( dataframe.columns = [str(col) for col in dataframe.columns] # Check if join_columns are present in the dataframe if not set(self.join_columns).issubset(set(dataframe.columns)): - raise ValueError(f"{index} must have all columns from join_columns") + missing_cols = set(self.join_columns) - set(dataframe.columns) + raise ValueError( + f"{index} must have all columns from join_columns: {missing_cols}" + ) if len(set(dataframe.columns)) < len(dataframe.columns): raise ValueError(f"{index} must have unique column names") diff --git a/datacompy/spark/sql.py b/datacompy/spark/sql.py index 404efe76..61a2871d 100644 --- a/datacompy/spark/sql.py +++ b/datacompy/spark/sql.py @@ -241,7 +241,10 @@ def _validate_dataframe( # Check if join_columns are present in the dataframe dataframe = getattr(self, index) # refresh if not set(self.join_columns).issubset(set(dataframe.columns)): - raise ValueError(f"{index} must have all columns from join_columns") + missing_cols = set(self.join_columns) - set(dataframe.columns) + raise ValueError( + f"{index} must have all columns from join_columns: {missing_cols}" + ) if len(set(dataframe.columns)) < len(dataframe.columns): raise ValueError(f"{index} must have unique column names") From 33bd5875a84779b0ab0343230dafba170408952a Mon Sep 17 00:00:00 2001 From: Faisal Date: Wed, 23 Oct 2024 17:12:06 -0300 Subject: [PATCH 3/3] Numpy 2.0 support and edgetest bumps (#328) * fix for numpy 2.0 support * version bump to 0.13.3 * updating testing matrix * updating testing matrix * updating testing matrix * updating docs and adding in pytest skips for py3.12 * updating matrix to include 3.12 --- .github/workflows/test-package.yml | 10 ++++- README.md | 24 ++++++----- datacompy/__init__.py | 19 +++++++-- datacompy/spark/pandas.py | 8 ++++ docs/source/spark_usage.rst | 58 ++++++++------------------- pyproject.toml | 2 +- tests/test_fugue/test_fugue_spark.py | 5 +++ tests/test_spark/test_legacy_spark.py | 4 ++ tests/test_spark/test_pandas_spark.py | 6 +++ tests/test_spark/test_sql_spark.py | 3 ++ 10 files changed, 83 insertions(+), 56 deletions(-) diff --git a/.github/workflows/test-package.yml b/.github/workflows/test-package.yml index 1462fc91..26b18def 100644 --- a/.github/workflows/test-package.yml +++ b/.github/workflows/test-package.yml @@ -33,14 +33,19 @@ jobs: strategy: fail-fast: false matrix: - python-version: [3.9, '3.10', '3.11'] + python-version: [3.9, '3.10', '3.11', '3.12'] spark-version: [3.2.4, 3.3.4, 3.4.2, 3.5.1] - pandas-version: [2.2.2, 1.5.3] + pandas-version: [2.2.3, 1.5.3] + numpy-version: [2.1.2, 1.26.4] exclude: - python-version: '3.11' spark-version: 3.2.4 - python-version: '3.11' spark-version: 3.3.4 + - python-version: 3.9 + numpy-version: 2.1.2 + - pandas-version: 1.5.3 + numpy-version: 2.1.2 env: PYTHON_VERSION: ${{ matrix.python-version }} SPARK_VERSION: ${{ matrix.spark-version }} @@ -65,6 +70,7 @@ jobs: python -m pip install pytest pytest-spark pypandoc python -m pip install pyspark[connect]==${{ matrix.spark-version }} python -m pip install pandas==${{ matrix.pandas-version }} + python -m pip install numpy==${{ matrix.numpy-version }} python -m pip install .[dev] - name: Test with pytest run: | diff --git a/README.md b/README.md index 4de6c86f..b60457ef 100644 --- a/README.md +++ b/README.md @@ -39,26 +39,32 @@ pip install datacompy[ray] ### Legacy Spark Deprecation -With version ``v0.12.0`` the original ``SparkCompare`` was replaced with a -Pandas on Spark implementation. The original ``SparkCompare`` implementation differs -from all the other native implementations. To align the API better, and keep behaviour +With version ``v0.12.0`` the original ``SparkCompare`` was replaced with a +Pandas on Spark implementation. The original ``SparkCompare`` implementation differs +from all the other native implementations. To align the API better, and keep behaviour consistent we are deprecating the original ``SparkCompare`` into a new module ``LegacySparkCompare`` Subsequently in ``v0.13.0`` a PySaprk DataFrame class has been introduced (``SparkSQLCompare``) -which accepts ``pyspark.sql.DataFrame`` and should provide better performance. With this version -the Pandas on Spark implementation has been renamed to ``SparkPandasCompare`` and all the spark +which accepts ``pyspark.sql.DataFrame`` and should provide better performance. With this version +the Pandas on Spark implementation has been renamed to ``SparkPandasCompare`` and all the spark logic is now under the ``spark`` submodule. If you wish to use the old SparkCompare moving forward you can import it like so: ```python from datacompy.spark.legacy import LegacySparkCompare -``` +``` + +### SparkPandasCompare Deprecation + +Starting with ``v0.14.1``, ``SparkPandasCompare`` is slated for deprecation. ``SparkSQLCompare`` is the prefered and much more performant. +It should be noted that if you continue to use ``SparkPandasCompare`` that ``numpy`` 2+ is not supported due to dependency issues. + #### Supported versions and dependncies -Different versions of Spark, Pandas, and Python interact differently. Below is a matrix of what we test with. -With the move to Pandas on Spark API and compatability issues with Pandas 2+ we will for the mean time note support Pandas 2 +Different versions of Spark, Pandas, and Python interact differently. Below is a matrix of what we test with. +With the move to Pandas on Spark API and compatability issues with Pandas 2+ we will for the mean time note support Pandas 2 with the Pandas on Spark implementation. Spark plans to support Pandas 2 in [Spark 4](https://issues.apache.org/jira/browse/SPARK-44101) @@ -80,7 +86,7 @@ with the Pandas on Spark implementation. Spark plans to support Pandas 2 in [Spa > [!NOTE] -> At the current time Python `3.12` is not supported by Spark and also Ray within Fugue. +> At the current time Python `3.12` is not supported by Spark and also Ray within Fugue. > If you are using Python `3.12` and above, please note that not all functioanlity will be supported. > Pandas and Polars support should work fine and are tested. diff --git a/datacompy/__init__.py b/datacompy/__init__.py index a1890224..a6d331e8 100644 --- a/datacompy/__init__.py +++ b/datacompy/__init__.py @@ -18,7 +18,7 @@ Then extended to carry that functionality over to Spark Dataframes. """ -__version__ = "0.14.0" +__version__ = "0.14.1" import platform from warnings import warn @@ -43,14 +43,12 @@ unq_columns, ) from datacompy.polars import PolarsCompare -from datacompy.spark.pandas import SparkPandasCompare from datacompy.spark.sql import SparkSQLCompare __all__ = [ "BaseCompare", "Compare", "PolarsCompare", - "SparkPandasCompare", "SparkSQLCompare", "all_columns_match", "all_rows_overlap", @@ -78,3 +76,18 @@ UserWarning, stacklevel=2, ) + +# numpy 2.0 check +from numpy import __version__ as np_version + +if np_version.split(".")[0] >= "2": + warn( + "SparkPandasCompare currently only supports Numpy < 2." + "Please note that the SparkPandasCompare functionality will not work and currently is not supported.", + UserWarning, + stacklevel=2, + ) +else: + from datacompy.spark.pandas import SparkPandasCompare # noqa: F401 + + __all__.append("SparkPandasCompare") diff --git a/datacompy/spark/pandas.py b/datacompy/spark/pandas.py index a7bfd1af..4ce48aad 100644 --- a/datacompy/spark/pandas.py +++ b/datacompy/spark/pandas.py @@ -24,6 +24,7 @@ import logging import os from typing import List, Optional, Union +from warnings import warn import pandas as pd from ordered_set import OrderedSet @@ -40,6 +41,13 @@ LOG = logging.getLogger(__name__) +warn( + f"The module {__name__} is deprecated. In future versions SparkPandasCompare will be completely removed.", + DeprecationWarning, + stacklevel=2, +) + + class SparkPandasCompare(BaseCompare): """Comparison class to be used to compare whether two Pandas on Spark dataframes are equal. diff --git a/docs/source/spark_usage.rst b/docs/source/spark_usage.rst index e064f0fb..7bd942cd 100644 --- a/docs/source/spark_usage.rst +++ b/docs/source/spark_usage.rst @@ -3,15 +3,15 @@ Spark Usage .. important:: - With version ``v0.12.0`` the original ``SparkCompare`` was replaced with a - Pandas on Spark implementation The original ``SparkCompare`` - implementation differs from all the other native implementations. To align the API better, - and keep behaviour consistent we are deprecating the original ``SparkCompare`` + With version ``v0.12.0`` the original ``SparkCompare`` was replaced with a + Pandas on Spark implementation The original ``SparkCompare`` + implementation differs from all the other native implementations. To align the API better, + and keep behaviour consistent we are deprecating the original ``SparkCompare`` into a new module ``LegacySparkCompare`` Subsequently in ``v0.13.0`` a PySaprk DataFrame class has been introduced (``SparkSQLCompare``) - which accepts ``pyspark.sql.DataFrame`` and should provide better performance. With this version - the Pandas on Spark implementation has been renamed to ``SparkPandasCompare`` and all the spark + which accepts ``pyspark.sql.DataFrame`` and should provide better performance. With this version + the Pandas on Spark implementation has been renamed to ``SparkPandasCompare`` and all the spark logic is now under the ``spark`` submodule. If you wish to use the old SparkCompare moving forward you can import it like so: @@ -19,29 +19,24 @@ Spark Usage .. code-block:: python from datacompy.spark.legacy import LegacySparkCompare - -For both ``SparkSQLCompare`` and ``SparkPandasCompare`` - -- ``on_index`` is not supported. -- Joining is done using ``<=>`` which is the equality test that is safe for null values. -- ``SparkPandasCompare`` compares ``pyspark.pandas.DataFrame``'s -- ``SparkSQLCompare`` compares ``pyspark.sql.DataFrame``'s - -Supported Version ------------------- .. important:: - Spark will not offically support Pandas 2 until Spark 4: https://issues.apache.org/jira/browse/SPARK-44101 + Starting with ``v0.14.1``, ``SparkPandasCompare`` is slated for deprecation. ``SparkSQLCompare`` + is the prefered and much more performant. It should be noted that if you continue to use ``SparkPandasCompare`` + that ``numpy`` 2+ is not supported due to dependnecy issues. + +For ``SparkSQLCompare`` -Until then we will not be supporting Pandas 2 for the Pandas on Spark API implementaion. -For Fugue, the Native Pandas (`Compare`), and `SparkSQLCompare` implementations, Pandas 2 is supported. +- ``on_index`` is not supported. +- Joining is done using ``<=>`` which is the equality test that is safe for null values. +- ``SparkSQLCompare`` compares ``pyspark.sql.DataFrame``'s -SparkPandasCompare and SparkSQLCompare Object Setup ---------------------------------------------------- +SparkSQLCompare +--------------- There is currently only one supported method for joining your dataframes - by join column(s). @@ -52,7 +47,7 @@ join column(s). from io import StringIO import pandas as pd import pyspark.pandas as ps - from datacompy import SparkPandasCompare, SparkSQLCompare + from datacompy import SparkSQLCompare from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() @@ -73,25 +68,6 @@ join column(s). 10000001238,1.05,Loose Seal Bluth,111 """ - # SparkPandasCompare - df1 = ps.from_pandas(pd.read_csv(StringIO(data1))) - df2 = ps.from_pandas(pd.read_csv(StringIO(data2))) - - compare = SparkPandasCompare( - df1, - df2, - join_columns='acct_id', # You can also specify a list of columns - abs_tol=0, # Optional, defaults to 0 - rel_tol=0, # Optional, defaults to 0 - df1_name='Original', # Optional, defaults to 'df1' - df2_name='New' # Optional, defaults to 'df2' - ) - compare.matches(ignore_extra_columns=False) - # False - # This method prints out a human-readable report summarizing and sampling differences - print(compare.report()) - - # SparkSQLCompare df1 = spark.createDataFrame(pd.read_csv(StringIO(data1))) df2 = spark.createDataFrame(pd.read_csv(StringIO(data2))) diff --git a/pyproject.toml b/pyproject.toml index 63a5464e..ed9c0f9b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,7 +11,7 @@ maintainers = [ { name="Faisal Dosani", email="faisal.dosani@capitalone.com" } ] license = {text = "Apache Software License"} -dependencies = ["pandas<=2.2.2,>=0.25.0", "numpy<=1.26.4,>=1.22.0", "ordered-set<=4.1.0,>=4.0.2", "fugue<=0.9.1,>=0.8.7", "polars<=1.7.0,>=0.20.4"] +dependencies = ["pandas<=2.2.3,>=0.25.0", "numpy<=2.1.2,>=1.22.0", "ordered-set<=4.1.0,>=4.0.2", "fugue<=0.9.1,>=0.8.7", "polars<=1.7.0,>=0.20.4"] requires-python = ">=3.9.0" classifiers = [ "Intended Audience :: Developers", diff --git a/tests/test_fugue/test_fugue_spark.py b/tests/test_fugue/test_fugue_spark.py index ae317eb8..c74d059e 100644 --- a/tests/test_fugue/test_fugue_spark.py +++ b/tests/test_fugue/test_fugue_spark.py @@ -14,6 +14,8 @@ # limitations under the License. """Test fugue and spark.""" +import sys + import pytest from datacompy import ( Compare, @@ -31,6 +33,9 @@ pyspark = pytest.importorskip("pyspark") +if sys.version_info >= (3, 12): + pytest.skip("unsupported python version", allow_module_level=True) + def test_is_match_spark( spark_session, diff --git a/tests/test_spark/test_legacy_spark.py b/tests/test_spark/test_legacy_spark.py index 74fa5668..cc71d90b 100644 --- a/tests/test_spark/test_legacy_spark.py +++ b/tests/test_spark/test_legacy_spark.py @@ -17,12 +17,16 @@ import io import logging import re +import sys from decimal import Decimal import pytest pytest.importorskip("pyspark") +if sys.version_info >= (3, 12): + pytest.skip("unsupported python version", allow_module_level=True) + from datacompy.spark.legacy import ( NUMERIC_SPARK_TYPES, LegacySparkCompare, diff --git a/tests/test_spark/test_pandas_spark.py b/tests/test_spark/test_pandas_spark.py index f31f8ae8..433c5762 100644 --- a/tests/test_spark/test_pandas_spark.py +++ b/tests/test_spark/test_pandas_spark.py @@ -32,6 +32,12 @@ pytest.importorskip("pyspark") +if np.__version__ >= "2.0.0": + pytest.skip("unsupported numpy version", allow_module_level=True) + +if sys.version_info >= (3, 12): + pytest.skip("unsupported python version", allow_module_level=True) + import pyspark.pandas as ps from datacompy.spark.pandas import ( SparkPandasCompare, diff --git a/tests/test_spark/test_sql_spark.py b/tests/test_spark/test_sql_spark.py index 50fbe901..e93fbafc 100644 --- a/tests/test_spark/test_sql_spark.py +++ b/tests/test_spark/test_sql_spark.py @@ -32,6 +32,9 @@ pytest.importorskip("pyspark") +if sys.version_info >= (3, 12): + pytest.skip("unsupported python version", allow_module_level=True) + from datacompy.spark.sql import ( SparkSQLCompare, _generate_id_within_group,