Skip to content

Commit

Permalink
fugue optional dependency (#360)
Browse files Browse the repository at this point in the history
* adding in logging and updating deps

* updating docs

* updating fugue extras
  • Loading branch information
fdosani authored Dec 10, 2024
1 parent e0e839b commit b84a1c9
Show file tree
Hide file tree
Showing 16 changed files with 137 additions and 69 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ jobs:
- name: Install datacompy
run: |
python -m pip install --upgrade pip
python -m pip install .[tests,duckdb,polars,dask,ray]
python -m pip install .[tests,fugue]
- name: Test with pytest
run: |
python -m pytest tests/ --ignore=tests/test_snowflake.py
4 changes: 1 addition & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ If you would like to use Spark or any other backends please make sure you instal

```shell
pip install datacompy[spark]
pip install datacompy[dask]
pip install datacompy[duckdb]
pip install datacompy[ray]
pip install datacompy[fugue]
pip install datacompy[snowflake]

```
Expand Down
5 changes: 3 additions & 2 deletions datacompy/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@
two dataframes.
"""

import logging
from abc import ABC, abstractmethod
from typing import Any, Optional

from ordered_set import OrderedSet

LOG = logging.getLogger(__name__)
from datacompy.logger import INFO, get_logger

LOG = get_logger(__name__, INFO)


class BaseCompare(ABC):
Expand Down
4 changes: 2 additions & 2 deletions datacompy/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
two dataframes.
"""

import logging
import os
from typing import Any, Dict, List, Optional, Union, cast

Expand All @@ -30,8 +29,9 @@
from ordered_set import OrderedSet

from datacompy.base import BaseCompare, temp_column_name
from datacompy.logger import INFO, get_logger

LOG = logging.getLogger(__name__)
LOG = get_logger(__name__, INFO)


class Compare(BaseCompare):
Expand Down
46 changes: 27 additions & 19 deletions datacompy/fugue.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,33 @@

"""Compare two DataFrames that are supported by Fugue."""

import logging
import pickle
from collections import defaultdict
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union, cast

import fugue.api as fa
import pandas as pd
import pyarrow as pa
from fugue import AnyDataFrame
from ordered_set import OrderedSet
from triad import Schema

from datacompy.core import Compare, render
from datacompy.logger import INFO, get_logger

LOG = logging.getLogger(__name__)
LOG = get_logger(__name__, INFO)
HASH_COL = "__datacompy__hash__"


def unq_columns(df1: AnyDataFrame, df2: AnyDataFrame) -> OrderedSet[str]:
try:
import fugue.api as fa
import pyarrow as pa
from fugue import AnyDataFrame
from triad import Schema
except ImportError:
LOG.warning(
"Please note that you are missing the optional dependency: fugue. "
"If you need to use this functionality it must be installed."
)


def unq_columns(df1: "AnyDataFrame", df2: "AnyDataFrame") -> OrderedSet[str]:
"""Get columns that are unique to df1.
Parameters
Expand All @@ -54,7 +62,7 @@ def unq_columns(df1: AnyDataFrame, df2: AnyDataFrame) -> OrderedSet[str]:
return cast(OrderedSet[str], OrderedSet(col1) - OrderedSet(col2))


def intersect_columns(df1: AnyDataFrame, df2: AnyDataFrame) -> OrderedSet[str]:
def intersect_columns(df1: "AnyDataFrame", df2: "AnyDataFrame") -> OrderedSet[str]:
"""Get columns that are shared between the two dataframes.
Parameters
Expand All @@ -75,7 +83,7 @@ def intersect_columns(df1: AnyDataFrame, df2: AnyDataFrame) -> OrderedSet[str]:
return OrderedSet(col1) & OrderedSet(col2)


def all_columns_match(df1: AnyDataFrame, df2: AnyDataFrame) -> bool:
def all_columns_match(df1: "AnyDataFrame", df2: "AnyDataFrame") -> bool:
"""Whether the columns all match in the dataframes.
Parameters
Expand All @@ -95,8 +103,8 @@ def all_columns_match(df1: AnyDataFrame, df2: AnyDataFrame) -> bool:


def is_match(
df1: AnyDataFrame,
df2: AnyDataFrame,
df1: "AnyDataFrame",
df2: "AnyDataFrame",
join_columns: Union[str, List[str]],
abs_tol: float = 0,
rel_tol: float = 0,
Expand Down Expand Up @@ -194,8 +202,8 @@ def is_match(


def all_rows_overlap(
df1: AnyDataFrame,
df2: AnyDataFrame,
df1: "AnyDataFrame",
df2: "AnyDataFrame",
join_columns: Union[str, List[str]],
abs_tol: float = 0,
rel_tol: float = 0,
Expand Down Expand Up @@ -290,8 +298,8 @@ def all_rows_overlap(


def count_matching_rows(
df1: AnyDataFrame,
df2: AnyDataFrame,
df1: "AnyDataFrame",
df2: "AnyDataFrame",
join_columns: Union[str, List[str]],
abs_tol: float = 0,
rel_tol: float = 0,
Expand Down Expand Up @@ -385,8 +393,8 @@ def count_matching_rows(


def report(
df1: AnyDataFrame,
df2: AnyDataFrame,
df1: "AnyDataFrame",
df2: "AnyDataFrame",
join_columns: Union[str, List[str]],
abs_tol: float = 0,
rel_tol: float = 0,
Expand Down Expand Up @@ -638,8 +646,8 @@ def _any(col: str) -> int:


def _distributed_compare(
df1: AnyDataFrame,
df2: AnyDataFrame,
df1: "AnyDataFrame",
df2: "AnyDataFrame",
join_columns: Union[str, List[str]],
return_obj_func: Callable[[Compare], Any],
abs_tol: float = 0,
Expand Down
61 changes: 61 additions & 0 deletions datacompy/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# SPDX-Copyright: Copyright (c) Capital One Services, LLC
# SPDX-License-Identifier: Apache-2.0
# Copyright 2024 Capital One Services, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Logging Module.
Module which sets up the basic logging infrustrcuture for the application.
"""

import logging
import sys

# logger formating
BRIEF_FORMAT = "%(levelname)s %(asctime)s - %(name)s: %(message)s"
VERBOSE_FORMAT = (
"%(levelname)s|%(asctime)s|%(name)s|%(filename)s|"
"%(funcName)s|%(lineno)d: %(message)s"
)
FORMAT_TO_USE = VERBOSE_FORMAT

# logger levels
DEBUG = logging.DEBUG
INFO = logging.INFO
WARN = logging.WARNING
ERROR = logging.ERROR
CRITICAL = logging.CRITICAL


def get_logger(name=None, log_level=logging.DEBUG):
"""Set the basic logging features for the application.
Parameters
----------
name : str, optional
The name of the logger. Defaults to ``None``
log_level : int, optional
The logging level. Defaults to ``logging.INFO``
Returns
-------
logging.Logger
Returns a Logger obejct which is set with the passed in paramters.
Please see the following for more details:
https://docs.python.org/2/library/logging.html
"""
logging.basicConfig(format=FORMAT_TO_USE, stream=sys.stdout, level=log_level)
logging.captureWarnings(True)
logger = logging.getLogger(name)
return logger
12 changes: 4 additions & 8 deletions datacompy/polars.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,19 @@
two dataframes.
"""

import logging
import os
from copy import deepcopy
from typing import Any, Dict, List, Optional, Union, cast

import numpy as np
import polars as pl
from ordered_set import OrderedSet
from polars.exceptions import ComputeError, InvalidOperationError

from datacompy.base import BaseCompare, temp_column_name
from datacompy.logger import INFO, get_logger

try:
import polars as pl
from polars.exceptions import ComputeError, InvalidOperationError
except ImportError:
pass # Let non-Polars people at least enjoy the loveliness of the pandas datacompy functionality

LOG = logging.getLogger(__name__)
LOG = get_logger(__name__, INFO)

STRING_TYPE = ["String", "Utf8"]
DATE_TYPE = ["Date", "Datetime"]
Expand Down
16 changes: 10 additions & 6 deletions datacompy/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
two dataframes.
"""

import logging
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from copy import deepcopy
Expand All @@ -30,6 +29,12 @@
import pandas as pd
from ordered_set import OrderedSet

from datacompy.base import BaseCompare
from datacompy.logger import INFO, get_logger
from datacompy.spark.sql import decimal_comparator

LOG = get_logger(__name__, INFO)

try:
import snowflake.snowpark as sp
from snowflake.connector.errors import DatabaseError, ProgrammingError
Expand All @@ -48,11 +53,10 @@
)

except ImportError:
pass # for non-snowflake users
from datacompy.base import BaseCompare
from datacompy.spark.sql import decimal_comparator

LOG = logging.getLogger(__name__)
LOG.warning(
"Please note that you are missing the optional dependency: snowflake. "
"If you need to use this functionality it must be installed."
)


NUMERIC_SNOWPARK_TYPES = [
Expand Down
12 changes: 7 additions & 5 deletions datacompy/spark/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
two dataframes.
"""

import logging
import os
from copy import deepcopy
from typing import List, Optional, Tuple, Union
Expand All @@ -30,6 +29,9 @@
from ordered_set import OrderedSet

from datacompy.base import BaseCompare, temp_column_name
from datacompy.logger import INFO, get_logger

LOG = get_logger(__name__, INFO)

try:
import pyspark.sql
Expand All @@ -49,10 +51,10 @@
when,
)
except ImportError:
pass # Let non-Spark people at least enjoy the loveliness of the spark sql datacompy functionality


LOG = logging.getLogger(__name__)
LOG.warning(
"Please note that you are missing the optional dependency: spark. "
"If you need to use this functionality it must be installed."
)


def decimal_comparator():
Expand Down
24 changes: 9 additions & 15 deletions docs/source/fugue_usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@ Fugue Detail
for data processing on Pandas, DuckDB, Polars, Arrow, Spark, Dask, Ray, and many other backends.
DataComPy integrates with Fugue to provide a simple way to compare data across these backends.


Installation
------------

::

pip install datacompy[fugue]


Basic Usage
-----------

Expand Down Expand Up @@ -90,13 +99,6 @@ to compare a Pandas dataframe with a Spark dataframe:
join_columns='acct_id',
)
Notice that in order to use a specific backend, you need to have the corresponding library installed.
For example, if you want compare Ray datasets, you must do

::

pip install datacompy[ray]

How it works
------------
Expand All @@ -106,11 +108,3 @@ using the Pandas-based ``Compare``. The comparison results are then aggregated t
Different from the join operation used in ``SparkCompare``, the Fugue version uses the ``cogroup -> map``
like semantic (not exactly the same, Fugue adopts a coarse version to achieve great performance), which
guarantees full data comparison with consistent result compared to Pandas-based ``Compare``.


Future releases
---------------

We are hoping to pilot Fugue for the community in future releases (0.10+) and gather feedback. With Fugue we get the
benefits of not having to maintain Framework specific code, and also having cross-framework compatibility. We may in
future depending on feedback deprecate ``SparkCompare`` in favour of just using Fugue to manage non-Pandas use cases.
Loading

0 comments on commit b84a1c9

Please sign in to comment.