Skip to content

Commit

Permalink
Merge pull request #246 from capitalone/develop
Browse files Browse the repository at this point in the history
Release v0.10.5
  • Loading branch information
fdosani authored Nov 15, 2023
2 parents abcc6e1 + 3b078e6 commit 77711ee
Show file tree
Hide file tree
Showing 6 changed files with 219 additions and 16 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/edgetest.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# This workflow runs edgetest on datacompy @ 17:30 UTC Fridays.
# https://github.com/fdosani/run-edgetest-action
# https://github.com/edgetest-dev/run-edgetest-action

name: Run edgetest
on:
Expand All @@ -14,7 +14,7 @@ jobs:
with:
ref: develop
- id: run-edgetest
uses: fdosani/run-edgetest-action@v1.2
uses: edgetest-dev/run-edgetest-action@v1.4
with:
edgetest-flags: '-c pyproject.toml --export'
base-branch: 'develop'
Expand Down
3 changes: 2 additions & 1 deletion datacompy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

__version__ = "0.10.4"
__version__ = "0.10.5"

from datacompy.core import *
from datacompy.fugue import (
all_columns_match,
all_rows_overlap,
intersect_columns,
is_match,
report,
Expand Down
125 changes: 121 additions & 4 deletions datacompy/fugue.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,102 @@ def is_match(
return all(matches)


def all_rows_overlap(
df1: AnyDataFrame,
df2: AnyDataFrame,
join_columns: Union[str, List[str]],
abs_tol: float = 0,
rel_tol: float = 0,
df1_name: str = "df1",
df2_name: str = "df2",
ignore_spaces: bool = False,
ignore_case: bool = False,
cast_column_names_lower: bool = True,
parallelism: Optional[int] = None,
strict_schema: bool = False,
) -> bool:
"""Check if the rows are all present in both dataframes
Parameters
----------
df1 : ``AnyDataFrame``
First dataframe to check
df2 : ``AnyDataFrame``
Second dataframe to check
join_columns : list or str, optional
Column(s) to join dataframes on. If a string is passed in, that one
column will be used.
abs_tol : float, optional
Absolute tolerance between two values.
rel_tol : float, optional
Relative tolerance between two values.
df1_name : str, optional
A string name for the first dataframe. This allows the reporting to
print out an actual name instead of "df1", and allows human users to
more easily track the dataframes.
df2_name : str, optional
A string name for the second dataframe
ignore_spaces : bool, optional
Flag to strip whitespace (including newlines) from string columns (including any join
columns)
ignore_case : bool, optional
Flag to ignore the case of string columns
cast_column_names_lower: bool, optional
Boolean indicator that controls of column names will be cast into lower case
parallelism: int, optional
An integer representing the amount of parallelism. Entering a value for this
will force to use of Fugue over just vanilla Pandas
strict_schema: bool, optional
The schema must match exactly if set to ``True``. This includes the names and types. Allows for a fast fail.
Returns
-------
bool
True if all rows in df1 are in df2 and vice versa (based on
existence for join option)
"""
if (
isinstance(df1, pd.DataFrame)
and isinstance(df2, pd.DataFrame)
and parallelism is None # user did not specify parallelism
and fa.get_current_parallelism() == 1 # currently on a local execution engine
):
comp = Compare(
df1=df1,
df2=df2,
join_columns=join_columns,
abs_tol=abs_tol,
rel_tol=rel_tol,
df1_name=df1_name,
df2_name=df2_name,
ignore_spaces=ignore_spaces,
ignore_case=ignore_case,
cast_column_names_lower=cast_column_names_lower,
)
return comp.all_rows_overlap()

try:
overlap = _distributed_compare(
df1=df1,
df2=df2,
join_columns=join_columns,
return_obj_func=lambda comp: comp.all_rows_overlap(),
abs_tol=abs_tol,
rel_tol=rel_tol,
df1_name=df1_name,
df2_name=df2_name,
ignore_spaces=ignore_spaces,
ignore_case=ignore_case,
cast_column_names_lower=cast_column_names_lower,
parallelism=parallelism,
strict_schema=strict_schema,
)
except _StrictSchemaError:
return False

return all(overlap)


def report(
df1: AnyDataFrame,
df2: AnyDataFrame,
Expand All @@ -210,7 +306,7 @@ def report(
column_count=10,
html_file=None,
parallelism: Optional[int] = None,
) -> None:
) -> str:
"""Returns a string representation of a report. The representation can
then be printed or saved to a file.
Expand Down Expand Up @@ -460,7 +556,7 @@ def _distributed_compare(
parallelism: Optional[int] = None,
strict_schema: bool = False,
) -> List[Any]:
"""Compare the data distributedly using the core Compare class
"""Compare the data distributively using the core Compare class
Both df1 and df2 should be dataframes containing all of the join_columns,
with unique column names. Differences between values are compared to
Expand Down Expand Up @@ -541,6 +637,7 @@ def _distributed_compare(

def _serialize(dfs: Iterable[pd.DataFrame], left: bool) -> Iterable[Dict[str, Any]]:
for df in dfs:
df = df.convert_dtypes()
cols = {}
for name in df.columns:
col = df[name]
Expand Down Expand Up @@ -577,8 +674,28 @@ def _deserialize(
arr = [pickle.loads(r["data"]) for r in df if r["left"] == left]
if len(arr) > 0:
return pd.concat(arr).sort_values(schema.names).reset_index(drop=True)
return pd.DataFrame(
{k: pd.Series(dtype=v) for k, v in schema.pandas_dtype.items()}
# The following is how to construct an empty pandas dataframe with
# the correct schema, it avoids pandas schema inference which is wrong.
# This is not needed when upgrading to Fugue >= 0.8.7
sample_row: List[Any] = []
for field in schema.fields:
if pa.types.is_string(field.type):
sample_row.append("x")
elif pa.types.is_integer(field.type):
sample_row.append(1)
elif pa.types.is_floating(field.type):
sample_row.append(1.1)
elif pa.types.is_boolean(field.type):
sample_row.append(True)
elif pa.types.is_timestamp(field.type):
sample_row.append(pd.NaT)
else:
sample_row.append(None)
return (
pd.DataFrame([sample_row], columns=schema.names)
.astype(schema.pandas_dtype)
.convert_dtypes()
.head(0)
)

def _comp(df: List[Dict[str, Any]]) -> List[List[Any]]:
Expand Down
4 changes: 4 additions & 0 deletions docs/source/fugue_usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ Basic Usage

The Fugue implementation can be accessed via the:

- ``datacompy.unq_columns``
- ``datacompy.intersect_columns``
- ``datacompy.all_columns_match``
- ``datacompy.all_rows_overlap``
- ``datacompy.is_match``
- and ``datacompy.report`` functions

Expand Down
5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ dependencies = [
"pandas<=2.0.2,>=0.25.0",
"numpy<=1.26.0,>=1.22.0",
"ordered-set<=4.1.0,>=4.0.2",
"fugue<=0.9.0,>=0.8.6",
"fugue<=0.8.7,>=0.8.7",
]
requires-python = ">=3.8.0"
classifiers = [
Expand Down Expand Up @@ -94,12 +94,13 @@ dev = [
"datacompy[build]",
]

[isort]
[tool.isort]
multi_line_output = 3
include_trailing_comma = true
force_grid_wrap = 0
use_parentheses = true
line_length = 88
profile = "black"

[edgetest.envs.core]
python_version = "3.9"
Expand Down
94 changes: 87 additions & 7 deletions tests/test_fugue.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from datacompy import (
Compare,
all_columns_match,
all_rows_overlap,
intersect_columns,
is_match,
report,
Expand All @@ -50,7 +51,14 @@ def ref_df():
df1_copy = df1.copy()
df2 = df1.copy().drop(columns=["c"])
df3 = df1.copy().drop(columns=["a", "b"])
return [df1, df1_copy, df2, df3]
df4 = pd.DataFrame(
dict(
a=np.random.randint(1, 12, 100), # shift the join col
b=np.random.rand(100),
c=np.random.choice(["aaa", "b_c", "csd"], 100),
)
)
return [df1, df1_copy, df2, df3, df4]


@pytest.fixture
Expand Down Expand Up @@ -80,37 +88,39 @@ def upper_col_df(shuffle_df):

@pytest.fixture
def simple_diff_df1():
return pd.DataFrame(dict(aa=[0, 1, 0], bb=[2.1, 3.1, 4.1]))
return pd.DataFrame(dict(aa=[0, 1, 0], bb=[2.1, 3.1, 4.1])).convert_dtypes()


@pytest.fixture
def simple_diff_df2():
return pd.DataFrame(dict(aa=[1, 0, 1], bb=[3.1, 4.1, 5.1], cc=["a", "b", "c"]))
return pd.DataFrame(
dict(aa=[1, 0, 1], bb=[3.1, 4.1, 5.1], cc=["a", "b", "c"])
).convert_dtypes()


@pytest.fixture
def no_intersection_diff_df1():
np.random.seed(0)
return pd.DataFrame(dict(x=["a"], y=[0.1]))
return pd.DataFrame(dict(x=["a"], y=[0.1])).convert_dtypes()


@pytest.fixture
def no_intersection_diff_df2():
return pd.DataFrame(dict(x=["b"], y=[1.1]))
return pd.DataFrame(dict(x=["b"], y=[1.1])).convert_dtypes()


@pytest.fixture
def large_diff_df1():
np.random.seed(0)
data = np.random.randint(0, 7, size=10000)
return pd.DataFrame({"x": data, "y": np.array([9] * 10000)})
return pd.DataFrame({"x": data, "y": np.array([9] * 10000)}).convert_dtypes()


@pytest.fixture
def large_diff_df2():
np.random.seed(0)
data = np.random.randint(6, 11, size=10000)
return pd.DataFrame({"x": data, "y": np.array([9] * 10000)})
return pd.DataFrame({"x": data, "y": np.array([9] * 10000)}).convert_dtypes()


def test_is_match_native(
Expand Down Expand Up @@ -590,3 +600,73 @@ def test_all_columns_match_duckdb(ref_df):
assert all_columns_match(df1, df3) is False
assert all_columns_match(df1_copy, df1) is True
assert all_columns_match(df3, df2) is False


def test_all_rows_overlap_native(
ref_df,
shuffle_df,
):
# defaults to Compare class
assert all_rows_overlap(ref_df[0], ref_df[0].copy(), join_columns="a")
assert all_rows_overlap(ref_df[0], shuffle_df, join_columns="a")
assert not all_rows_overlap(ref_df[0], ref_df[4], join_columns="a")
# Fugue
assert all_rows_overlap(ref_df[0], shuffle_df, join_columns="a", parallelism=2)
assert not all_rows_overlap(ref_df[0], ref_df[4], join_columns="a", parallelism=2)


def test_all_rows_overlap_spark(
spark_session,
ref_df,
shuffle_df,
):
ref_df[0].iteritems = ref_df[0].items # pandas 2 compatibility
ref_df[4].iteritems = ref_df[4].items # pandas 2 compatibility
shuffle_df.iteritems = shuffle_df.items # pandas 2 compatibility
rdf = spark_session.createDataFrame(ref_df[0])
rdf_copy = spark_session.createDataFrame(ref_df[0])
rdf4 = spark_session.createDataFrame(ref_df[4])
sdf = spark_session.createDataFrame(shuffle_df)

assert all_rows_overlap(rdf, rdf_copy, join_columns="a")
assert all_rows_overlap(rdf, sdf, join_columns="a")
assert not all_rows_overlap(rdf, rdf4, join_columns="a")
assert all_rows_overlap(
spark_session.sql("SELECT 'a' AS a, 'b' AS b"),
spark_session.sql("SELECT 'a' AS a, 'b' AS b"),
join_columns="a",
)


def test_all_rows_overlap_polars(
ref_df,
shuffle_df,
):
rdf = pl.from_pandas(ref_df[0])
rdf_copy = pl.from_pandas(ref_df[0].copy())
rdf4 = pl.from_pandas(ref_df[4])
sdf = pl.from_pandas(shuffle_df)

assert all_rows_overlap(rdf, rdf_copy, join_columns="a")
assert all_rows_overlap(rdf, sdf, join_columns="a")
assert not all_rows_overlap(rdf, rdf4, join_columns="a")


def test_all_rows_overlap_duckdb(
ref_df,
shuffle_df,
):
with duckdb.connect():
rdf = duckdb.from_df(ref_df[0])
rdf_copy = duckdb.from_df(ref_df[0].copy())
rdf4 = duckdb.from_df(ref_df[4])
sdf = duckdb.from_df(shuffle_df)

assert all_rows_overlap(rdf, rdf_copy, join_columns="a")
assert all_rows_overlap(rdf, sdf, join_columns="a")
assert not all_rows_overlap(rdf, rdf4, join_columns="a")
assert all_rows_overlap(
duckdb.sql("SELECT 'a' AS a, 'b' AS b"),
duckdb.sql("SELECT 'a' AS a, 'b' AS b"),
join_columns="a",
)

0 comments on commit 77711ee

Please sign in to comment.