Skip to content

Commit

Permalink
Merge pull request #341 from capitalone/develop
Browse files Browse the repository at this point in the history
Release v0.14.1
  • Loading branch information
fdosani authored Oct 24, 2024
2 parents b1dc886 + 998ee4f commit 0e15a75
Show file tree
Hide file tree
Showing 14 changed files with 109 additions and 70 deletions.
10 changes: 8 additions & 2 deletions .github/workflows/test-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,19 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: [3.9, '3.10', '3.11']
python-version: [3.9, '3.10', '3.11', '3.12']
spark-version: [3.2.4, 3.3.4, 3.4.2, 3.5.1]
pandas-version: [2.2.2, 1.5.3]
pandas-version: [2.2.3, 1.5.3]
numpy-version: [2.1.2, 1.26.4]
exclude:
- python-version: '3.11'
spark-version: 3.2.4
- python-version: '3.11'
spark-version: 3.3.4
- python-version: 3.9
numpy-version: 2.1.2
- pandas-version: 1.5.3
numpy-version: 2.1.2
env:
PYTHON_VERSION: ${{ matrix.python-version }}
SPARK_VERSION: ${{ matrix.spark-version }}
Expand All @@ -65,6 +70,7 @@ jobs:
python -m pip install pytest pytest-spark pypandoc
python -m pip install pyspark[connect]==${{ matrix.spark-version }}
python -m pip install pandas==${{ matrix.pandas-version }}
python -m pip install numpy==${{ matrix.numpy-version }}
python -m pip install .[dev]
- name: Test with pytest
run: |
Expand Down
24 changes: 15 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,26 +39,32 @@ pip install datacompy[ray]

### Legacy Spark Deprecation

With version ``v0.12.0`` the original ``SparkCompare`` was replaced with a
Pandas on Spark implementation. The original ``SparkCompare`` implementation differs
from all the other native implementations. To align the API better, and keep behaviour
With version ``v0.12.0`` the original ``SparkCompare`` was replaced with a
Pandas on Spark implementation. The original ``SparkCompare`` implementation differs
from all the other native implementations. To align the API better, and keep behaviour
consistent we are deprecating the original ``SparkCompare`` into a new module ``LegacySparkCompare``

Subsequently in ``v0.13.0`` a PySaprk DataFrame class has been introduced (``SparkSQLCompare``)
which accepts ``pyspark.sql.DataFrame`` and should provide better performance. With this version
the Pandas on Spark implementation has been renamed to ``SparkPandasCompare`` and all the spark
which accepts ``pyspark.sql.DataFrame`` and should provide better performance. With this version
the Pandas on Spark implementation has been renamed to ``SparkPandasCompare`` and all the spark
logic is now under the ``spark`` submodule.

If you wish to use the old SparkCompare moving forward you can import it like so:

```python
from datacompy.spark.legacy import LegacySparkCompare
```
```

### SparkPandasCompare Deprecation

Starting with ``v0.14.1``, ``SparkPandasCompare`` is slated for deprecation. ``SparkSQLCompare`` is the prefered and much more performant.
It should be noted that if you continue to use ``SparkPandasCompare`` that ``numpy`` 2+ is not supported due to dependency issues.


#### Supported versions and dependncies

Different versions of Spark, Pandas, and Python interact differently. Below is a matrix of what we test with.
With the move to Pandas on Spark API and compatability issues with Pandas 2+ we will for the mean time note support Pandas 2
Different versions of Spark, Pandas, and Python interact differently. Below is a matrix of what we test with.
With the move to Pandas on Spark API and compatability issues with Pandas 2+ we will for the mean time note support Pandas 2
with the Pandas on Spark implementation. Spark plans to support Pandas 2 in [Spark 4](https://issues.apache.org/jira/browse/SPARK-44101)


Expand All @@ -80,7 +86,7 @@ with the Pandas on Spark implementation. Spark plans to support Pandas 2 in [Spa


> [!NOTE]
> At the current time Python `3.12` is not supported by Spark and also Ray within Fugue.
> At the current time Python `3.12` is not supported by Spark and also Ray within Fugue.
> If you are using Python `3.12` and above, please note that not all functioanlity will be supported.
> Pandas and Polars support should work fine and are tested.
Expand Down
19 changes: 16 additions & 3 deletions datacompy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
Then extended to carry that functionality over to Spark Dataframes.
"""

__version__ = "0.14.0"
__version__ = "0.14.1"

import platform
from warnings import warn
Expand All @@ -43,14 +43,12 @@
unq_columns,
)
from datacompy.polars import PolarsCompare
from datacompy.spark.pandas import SparkPandasCompare
from datacompy.spark.sql import SparkSQLCompare

__all__ = [
"BaseCompare",
"Compare",
"PolarsCompare",
"SparkPandasCompare",
"SparkSQLCompare",
"all_columns_match",
"all_rows_overlap",
Expand Down Expand Up @@ -78,3 +76,18 @@
UserWarning,
stacklevel=2,
)

# numpy 2.0 check
from numpy import __version__ as np_version

if np_version.split(".")[0] >= "2":
warn(
"SparkPandasCompare currently only supports Numpy < 2."
"Please note that the SparkPandasCompare functionality will not work and currently is not supported.",
UserWarning,
stacklevel=2,
)
else:
from datacompy.spark.pandas import SparkPandasCompare # noqa: F401

__all__.append("SparkPandasCompare")
9 changes: 6 additions & 3 deletions datacompy/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,10 @@ def _validate_dataframe(
dataframe.columns = pd.Index([str(col) for col in dataframe.columns])
# Check if join_columns are present in the dataframe
if not set(self.join_columns).issubset(set(dataframe.columns)):
raise ValueError(f"{index} must have all columns from join_columns")
missing_cols = set(self.join_columns) - set(dataframe.columns)
raise ValueError(
f"{index} must have all columns from join_columns: {missing_cols}"
)

if len(set(dataframe.columns)) < len(dataframe.columns):
raise ValueError(f"{index} must have unique column names")
Expand Down Expand Up @@ -606,8 +609,8 @@ def df_to_str(pdf: pd.DataFrame) -> str:
report += render(
"column_summary.txt",
len(self.intersect_columns()),
len(self.df1_unq_columns()),
len(self.df2_unq_columns()),
f"{len(self.df1_unq_columns())} {self.df1_unq_columns().items}",
f"{len(self.df2_unq_columns())} {self.df2_unq_columns().items}",
self.df1_name,
self.df2_name,
)
Expand Down
4 changes: 2 additions & 2 deletions datacompy/fugue.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,8 +526,8 @@ def _any(col: str) -> int:
rpt += render(
"column_summary.txt",
len(first["intersect_columns"]),
len(first["df1_unq_columns"]),
len(first["df2_unq_columns"]),
f"{len(first['df1_unq_columns'])} {first['df1_unq_columns'].items}",
f"{len(first['df2_unq_columns'])} {first['df2_unq_columns'].items}",
df1_name,
df2_name,
)
Expand Down
9 changes: 6 additions & 3 deletions datacompy/polars.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,10 @@ def _validate_dataframe(

# Check if join_columns are present in the dataframe
if not set(self.join_columns).issubset(set(dataframe.columns)):
raise ValueError(f"{index} must have all columns from join_columns")
missing_cols = set(self.join_columns) - set(dataframe.columns)
raise ValueError(
f"{index} must have all columns from join_columns: {missing_cols}"
)

if len(set(dataframe.columns)) < len(dataframe.columns):
raise ValueError(f"{index} must have unique column names")
Expand Down Expand Up @@ -615,8 +618,8 @@ def df_to_str(pdf: "pl.DataFrame") -> str:
report += render(
"column_summary.txt",
len(self.intersect_columns()),
len(self.df1_unq_columns()),
len(self.df2_unq_columns()),
f"{len(self.df1_unq_columns())} {self.df1_unq_columns().items}",
f"{len(self.df2_unq_columns())} {self.df2_unq_columns().items}",
self.df1_name,
self.df2_name,
)
Expand Down
17 changes: 14 additions & 3 deletions datacompy/spark/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import logging
import os
from typing import List, Optional, Union
from warnings import warn

import pandas as pd
from ordered_set import OrderedSet
Expand All @@ -40,6 +41,13 @@
LOG = logging.getLogger(__name__)


warn(
f"The module {__name__} is deprecated. In future versions SparkPandasCompare will be completely removed.",
DeprecationWarning,
stacklevel=2,
)


class SparkPandasCompare(BaseCompare):
"""Comparison class to be used to compare whether two Pandas on Spark dataframes are equal.
Expand Down Expand Up @@ -181,7 +189,10 @@ def _validate_dataframe(
dataframe.columns = [str(col) for col in dataframe.columns]
# Check if join_columns are present in the dataframe
if not set(self.join_columns).issubset(set(dataframe.columns)):
raise ValueError(f"{index} must have all columns from join_columns")
missing_cols = set(self.join_columns) - set(dataframe.columns)
raise ValueError(
f"{index} must have all columns from join_columns: {missing_cols}"
)

if len(set(dataframe.columns)) < len(dataframe.columns):
raise ValueError(f"{index} must have unique column names")
Expand Down Expand Up @@ -666,8 +677,8 @@ def report(
report += render(
"column_summary.txt",
len(self.intersect_columns()),
len(self.df1_unq_columns()),
len(self.df2_unq_columns()),
f"{len(self.df1_unq_columns())} {self.df1_unq_columns().items}",
f"{len(self.df2_unq_columns())} {self.df2_unq_columns().items}",
self.df1_name,
self.df2_name,
)
Expand Down
9 changes: 6 additions & 3 deletions datacompy/spark/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,10 @@ def _validate_dataframe(
# Check if join_columns are present in the dataframe
dataframe = getattr(self, index) # refresh
if not set(self.join_columns).issubset(set(dataframe.columns)):
raise ValueError(f"{index} must have all columns from join_columns")
missing_cols = set(self.join_columns) - set(dataframe.columns)
raise ValueError(
f"{index} must have all columns from join_columns: {missing_cols}"
)

if len(set(dataframe.columns)) < len(dataframe.columns):
raise ValueError(f"{index} must have unique column names")
Expand Down Expand Up @@ -755,8 +758,8 @@ def report(
report += render(
"column_summary.txt",
len(self.intersect_columns()),
len(self.df1_unq_columns()),
len(self.df2_unq_columns()),
f"{len(self.df1_unq_columns())} {self.df1_unq_columns().items}",
f"{len(self.df2_unq_columns())} {self.df2_unq_columns().items}",
self.df1_name,
self.df2_name,
)
Expand Down
58 changes: 17 additions & 41 deletions docs/source/spark_usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,45 +3,40 @@ Spark Usage

.. important::

With version ``v0.12.0`` the original ``SparkCompare`` was replaced with a
Pandas on Spark implementation The original ``SparkCompare``
implementation differs from all the other native implementations. To align the API better,
and keep behaviour consistent we are deprecating the original ``SparkCompare``
With version ``v0.12.0`` the original ``SparkCompare`` was replaced with a
Pandas on Spark implementation The original ``SparkCompare``
implementation differs from all the other native implementations. To align the API better,
and keep behaviour consistent we are deprecating the original ``SparkCompare``
into a new module ``LegacySparkCompare``

Subsequently in ``v0.13.0`` a PySaprk DataFrame class has been introduced (``SparkSQLCompare``)
which accepts ``pyspark.sql.DataFrame`` and should provide better performance. With this version
the Pandas on Spark implementation has been renamed to ``SparkPandasCompare`` and all the spark
which accepts ``pyspark.sql.DataFrame`` and should provide better performance. With this version
the Pandas on Spark implementation has been renamed to ``SparkPandasCompare`` and all the spark
logic is now under the ``spark`` submodule.

If you wish to use the old SparkCompare moving forward you can import it like so:

.. code-block:: python
from datacompy.spark.legacy import LegacySparkCompare
For both ``SparkSQLCompare`` and ``SparkPandasCompare``

- ``on_index`` is not supported.
- Joining is done using ``<=>`` which is the equality test that is safe for null values.
- ``SparkPandasCompare`` compares ``pyspark.pandas.DataFrame``'s
- ``SparkSQLCompare`` compares ``pyspark.sql.DataFrame``'s

Supported Version
------------------
.. important::

Spark will not offically support Pandas 2 until Spark 4: https://issues.apache.org/jira/browse/SPARK-44101
Starting with ``v0.14.1``, ``SparkPandasCompare`` is slated for deprecation. ``SparkSQLCompare``
is the prefered and much more performant. It should be noted that if you continue to use ``SparkPandasCompare``
that ``numpy`` 2+ is not supported due to dependnecy issues.


For ``SparkSQLCompare``

Until then we will not be supporting Pandas 2 for the Pandas on Spark API implementaion.
For Fugue, the Native Pandas (`Compare`), and `SparkSQLCompare` implementations, Pandas 2 is supported.
- ``on_index`` is not supported.
- Joining is done using ``<=>`` which is the equality test that is safe for null values.
- ``SparkSQLCompare`` compares ``pyspark.sql.DataFrame``'s


SparkPandasCompare and SparkSQLCompare Object Setup
---------------------------------------------------
SparkSQLCompare
---------------

There is currently only one supported method for joining your dataframes - by
join column(s).
Expand All @@ -52,7 +47,7 @@ join column(s).
from io import StringIO
import pandas as pd
import pyspark.pandas as ps
from datacompy import SparkPandasCompare, SparkSQLCompare
from datacompy import SparkSQLCompare
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
Expand All @@ -73,25 +68,6 @@ join column(s).
10000001238,1.05,Loose Seal Bluth,111
"""
# SparkPandasCompare
df1 = ps.from_pandas(pd.read_csv(StringIO(data1)))
df2 = ps.from_pandas(pd.read_csv(StringIO(data2)))
compare = SparkPandasCompare(
df1,
df2,
join_columns='acct_id', # You can also specify a list of columns
abs_tol=0, # Optional, defaults to 0
rel_tol=0, # Optional, defaults to 0
df1_name='Original', # Optional, defaults to 'df1'
df2_name='New' # Optional, defaults to 'df2'
)
compare.matches(ignore_extra_columns=False)
# False
# This method prints out a human-readable report summarizing and sampling differences
print(compare.report())
# SparkSQLCompare
df1 = spark.createDataFrame(pd.read_csv(StringIO(data1)))
df2 = spark.createDataFrame(pd.read_csv(StringIO(data2)))
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ maintainers = [
{ name="Faisal Dosani", email="faisal.dosani@capitalone.com" }
]
license = {text = "Apache Software License"}
dependencies = ["pandas<=2.2.2,>=0.25.0", "numpy<=1.26.4,>=1.22.0", "ordered-set<=4.1.0,>=4.0.2", "fugue<=0.9.1,>=0.8.7", "polars<=1.7.0,>=0.20.4"]
dependencies = ["pandas<=2.2.3,>=0.25.0", "numpy<=2.1.2,>=1.22.0", "ordered-set<=4.1.0,>=4.0.2", "fugue<=0.9.1,>=0.8.7", "polars<=1.7.0,>=0.20.4"]
requires-python = ">=3.9.0"
classifiers = [
"Intended Audience :: Developers",
Expand Down
5 changes: 5 additions & 0 deletions tests/test_fugue/test_fugue_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
# limitations under the License.
"""Test fugue and spark."""

import sys

import pytest
from datacompy import (
Compare,
Expand All @@ -31,6 +33,9 @@

pyspark = pytest.importorskip("pyspark")

if sys.version_info >= (3, 12):
pytest.skip("unsupported python version", allow_module_level=True)


def test_is_match_spark(
spark_session,
Expand Down
4 changes: 4 additions & 0 deletions tests/test_spark/test_legacy_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@
import io
import logging
import re
import sys
from decimal import Decimal

import pytest

pytest.importorskip("pyspark")

if sys.version_info >= (3, 12):
pytest.skip("unsupported python version", allow_module_level=True)

from datacompy.spark.legacy import (
NUMERIC_SPARK_TYPES,
LegacySparkCompare,
Expand Down
Loading

0 comments on commit 0e15a75

Please sign in to comment.