Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Dask cuDF legacy code #17205

Merged
merged 22 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
da9b6fb
initial refactor
rjzamora Oct 29, 2024
196c0c0
rename _collection.py to core.py
rjzamora Oct 29, 2024
5be712b
keep _collection name
rjzamora Oct 29, 2024
957299b
move legacy io
rjzamora Oct 29, 2024
86b3fa6
split out _expr.py logic
rjzamora Oct 29, 2024
60853f6
fix patching
rjzamora Oct 29, 2024
56644e8
Merge branch 'branch-24.12' into refactor-dask-cudf
rjzamora Oct 29, 2024
db4556f
tweak _patch_dask_expr
rjzamora Oct 29, 2024
8dec940
Merge remote-tracking branch 'upstream/branch-24.12' into refactor-da…
rjzamora Oct 29, 2024
e54807f
Merge branch 'refactor-dask-cudf' of github.com:rjzamora/cudf into re…
rjzamora Oct 29, 2024
ec139d2
copy upstream changes
rjzamora Nov 1, 2024
b5f92c9
Merge remote-tracking branch 'upstream/branch-24.12' into refactor-da…
rjzamora Nov 1, 2024
8fb4657
use Mads' suggestion
rjzamora Nov 1, 2024
1d7c84e
remove core - not worth the tech debt
rjzamora Nov 1, 2024
ab56534
Merge remote-tracking branch 'upstream/branch-24.12' into refactor-da…
rjzamora Nov 1, 2024
788cb24
add back core.py
rjzamora Nov 1, 2024
1f54219
add deprecation warnings for various IO methods
rjzamora Nov 1, 2024
95a6697
Merge remote-tracking branch 'upstream/branch-24.12' into refactor-da…
rjzamora Nov 4, 2024
1b36183
add test coverage for _deprecated_api usage
rjzamora Nov 4, 2024
bc9897b
fix parquet test mistake
rjzamora Nov 4, 2024
a84128a
Merge remote-tracking branch 'upstream/branch-24.12' into refactor-da…
rjzamora Nov 4, 2024
ba3032a
Merge branch 'branch-24.12' into refactor-dask-cudf
rjzamora Nov 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 10 additions & 12 deletions python/dask_cudf/dask_cudf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,17 @@

from dask import config

# For dask>2024.2.0, we can silence the loud deprecation
# warning before importing `dask.dataframe` (this won't
# do anything for dask==2024.2.0)
config.set({"dataframe.query-planning-warning": False})
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No longer relevant.


import dask.dataframe as dd # noqa: E402
from dask.dataframe import from_delayed # noqa: E402

import cudf # noqa: E402

from . import backends # noqa: E402, F401
from ._version import __git_commit__, __version__ # noqa: E402, F401
from .core import concat, from_cudf, from_dask_dataframe # noqa: E402
from .expr import QUERY_PLANNING_ON # noqa: E402
from .legacy.core import concat, from_cudf # noqa: F401


QUERY_PLANNING_ON = dd.DASK_EXPR_ENABLED


def read_csv(*args, **kwargs):
Expand Down Expand Up @@ -48,24 +45,25 @@ def inner_func(*args, **kwargs):


if QUERY_PLANNING_ON:
from .expr._collection import DataFrame, Index, Series
from ._collection import DataFrame, Index, Series # noqa: E402
from ._expr import _patch_dask_expr

groupby_agg = raise_not_implemented_error("groupby_agg")
read_text = DataFrame.read_text
to_orc = raise_not_implemented_error("to_orc")
_patch_dask_expr()

else:
from .core import DataFrame, Index, Series # noqa: F401
from .groupby import groupby_agg # noqa: F401
from .io import read_text, to_orc # noqa: F401
from .legacy.core import DataFrame, Index, Series # noqa: F401
from .legacy.groupby import groupby_agg # noqa: F401
from .legacy.io import read_text, to_orc # noqa: F401


__all__ = [
"DataFrame",
"Series",
"Index",
"from_cudf",
"from_dask_dataframe",
"concat",
"from_delayed",
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,6 @@


class CudfFrameBase(FrameBase):
def to_dask_dataframe(self, **kwargs):
"""Create a dask.dataframe object from a dask_cudf object

WARNING: This API is deprecated, and may not work properly.
Please use `*.to_backend("pandas")` to convert the
underlying data to pandas.
"""

warnings.warn(
"The `to_dask_dataframe` API is now deprecated. "
"Please use `*.to_backend('pandas')` instead.",
FutureWarning,
)

return self.to_backend("pandas", **kwargs)

def _prepare_cov_corr(self, min_periods, numeric_only):
# Upstream version of this method sets min_periods
# to 2 by default (which is not supported by cudf)
Expand Down Expand Up @@ -94,7 +78,7 @@ def var(
def rename_axis(
self, mapper=no_default, index=no_default, columns=no_default, axis=0
):
from dask_cudf.expr._expr import RenameAxisCudf
from dask_cudf._expr import RenameAxisCudf

return new_collection(
RenameAxisCudf(
Expand Down Expand Up @@ -136,7 +120,7 @@ def groupby(
dropna=None,
**kwargs,
):
from dask_cudf.expr._groupby import GroupBy
from dask_cudf._groupby import GroupBy

if isinstance(by, FrameBase) and not isinstance(by, DXSeries):
raise ValueError(
Expand Down Expand Up @@ -175,27 +159,27 @@ def to_orc(self, *args, **kwargs):
def read_text(*args, **kwargs):
from dask_expr import from_legacy_dataframe

from dask_cudf.io.text import read_text as legacy_read_text
from dask_cudf.legacy.io.text import read_text as legacy_read_text

ddf = legacy_read_text(*args, **kwargs)
return from_legacy_dataframe(ddf)


class Series(DXSeries, CudfFrameBase):
def groupby(self, by, **kwargs):
from dask_cudf.expr._groupby import SeriesGroupBy
from dask_cudf._groupby import SeriesGroupBy

return SeriesGroupBy(self, by, **kwargs)

@cached_property
def list(self):
from dask_cudf.accessors import ListMethods
from dask_cudf._accessors import ListMethods

return ListMethods(self)

@cached_property
def struct(self):
from dask_cudf.accessors import StructMethods
from dask_cudf._accessors import StructMethods

return StructMethods(self)

Expand Down
210 changes: 210 additions & 0 deletions python/dask_cudf/dask_cudf/_expr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
import functools

import dask_expr._shuffle as _shuffle_module
from dask_expr import new_collection
from dask_expr._cumulative import CumulativeBlockwise
from dask_expr._expr import Elemwise, Expr, RenameAxis, VarColumns
from dask_expr._reductions import Reduction, Var

from dask.dataframe.core import (
is_dataframe_like,
make_meta,
meta_nonempty,
)
from dask.dataframe.dispatch import is_categorical_dtype
from dask.typing import no_default

import cudf

##
## Custom expressions
##


class RenameAxisCudf(RenameAxis):
# TODO: Remove this after rename_axis is supported in cudf
# (See: https://github.com/rapidsai/cudf/issues/16895)
@staticmethod
def operation(df, index=no_default, **kwargs):
if index != no_default:
df.index.name = index
return df
raise NotImplementedError(
"Only `index` is supported for the cudf backend"
)


class ToCudfBackend(Elemwise):
# TODO: Inherit from ToBackend when rapids-dask-dependency
# is pinned to dask>=2024.8.1
_parameters = ["frame", "options"]
_projection_passthrough = True
_filter_passthrough = True
_preserves_partitioning_information = True

@staticmethod
def operation(df, options):
from dask_cudf.backends import to_cudf_dispatch

return to_cudf_dispatch(df, **options)

def _simplify_down(self):
if isinstance(
self.frame._meta, (cudf.DataFrame, cudf.Series, cudf.Index)
):
# We already have cudf data
return self.frame


##
## Custom expression patching
##


# This can be removed after cudf#15176 is addressed.
# See: https://github.com/rapidsai/cudf/issues/15176
class PatchCumulativeBlockwise(CumulativeBlockwise):
@property
def _args(self) -> list:
return self.operands[:1]

@property
def _kwargs(self) -> dict:
# Must pass axis and skipna as kwargs in cudf
return {"axis": self.axis, "skipna": self.skipna}


# The upstream Var code uses `Series.values`, and relies on numpy
# for most of the logic. Unfortunately, cudf -> cupy conversion
# is not supported for data containing null values. Therefore,
# we must implement our own version of Var for now. This logic
# is mostly copied from dask-cudf.


class VarCudf(Reduction):
# Uses the parallel version of Welford's online algorithm (Chan '79)
# (http://i.stanford.edu/pub/cstr/reports/cs/tr/79/773/CS-TR-79-773.pdf)
_parameters = [
"frame",
"skipna",
"ddof",
"numeric_only",
"split_every",
]
_defaults = {
"skipna": True,
"ddof": 1,
"numeric_only": False,
"split_every": False,
}

@functools.cached_property
def _meta(self):
return make_meta(
meta_nonempty(self.frame._meta).var(
skipna=self.skipna, numeric_only=self.numeric_only
)
)

@property
def chunk_kwargs(self):
return dict(skipna=self.skipna, numeric_only=self.numeric_only)

@property
def combine_kwargs(self):
return {}

@property
def aggregate_kwargs(self):
return dict(ddof=self.ddof)

@classmethod
def reduction_chunk(cls, x, skipna=True, numeric_only=False):
kwargs = {"numeric_only": numeric_only} if is_dataframe_like(x) else {}
if skipna or numeric_only:
n = x.count(**kwargs)
kwargs["skipna"] = skipna
avg = x.mean(**kwargs)
else:
# Not skipping nulls, so might as well
# avoid the full `count` operation
n = len(x)
kwargs["skipna"] = skipna
avg = x.sum(**kwargs) / n
if numeric_only:
# Workaround for cudf bug
# (see: https://github.com/rapidsai/cudf/issues/13731)
x = x[n.index]
m2 = ((x - avg) ** 2).sum(**kwargs)
return n, avg, m2

@classmethod
def reduction_combine(cls, parts):
n, avg, m2 = parts[0]
for i in range(1, len(parts)):
n_a, avg_a, m2_a = n, avg, m2
n_b, avg_b, m2_b = parts[i]
n = n_a + n_b
avg = (n_a * avg_a + n_b * avg_b) / n
delta = avg_b - avg_a
m2 = m2_a + m2_b + delta**2 * n_a * n_b / n
return n, avg, m2

@classmethod
def reduction_aggregate(cls, vals, ddof=1):
vals = cls.reduction_combine(vals)
n, _, m2 = vals
return m2 / (n - ddof)


def _patched_var(
self,
axis=0,
skipna=True,
ddof=1,
numeric_only=False,
split_every=False,
):
if axis == 0:
if hasattr(self._meta, "to_pandas"):
return VarCudf(self, skipna, ddof, numeric_only, split_every)
else:
return Var(self, skipna, ddof, numeric_only, split_every)
elif axis == 1:
return VarColumns(self, skipna, ddof, numeric_only)
else:
raise ValueError(f"axis={axis} not supported. Please specify 0 or 1")


# Temporary work-around for missing cudf + categorical support
# See: https://github.com/rapidsai/cudf/issues/11795
# TODO: Fix RepartitionQuantiles and remove this in cudf>24.06

_original_get_divisions = _shuffle_module._get_divisions


def _patched_get_divisions(frame, other, *args, **kwargs):
# NOTE: The following two lines contains the "patch"
# (we simply convert the partitioning column to pandas)
if is_categorical_dtype(other._meta.dtype) and hasattr(
other.frame._meta, "to_pandas"
):
other = new_collection(other).to_backend("pandas")._expr

# Call "original" function
return _original_get_divisions(frame, other, *args, **kwargs)


_PATCHED = False


def _patch_dask_expr():
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of this PR is just moving around existing code. This change is actually new - We used to implicitly "patch" dask-expr when the expr module was imported. I'm trying to make the patching a bit more explicit (but I only want to allow this function to do its thing once).

global _PATCHED

if not _PATCHED:
CumulativeBlockwise._args = PatchCumulativeBlockwise._args
CumulativeBlockwise._kwargs = PatchCumulativeBlockwise._kwargs
Expr.var = _patched_var
_shuffle_module._get_divisions = _patched_get_divisions
_PATCHED = True
Loading
Loading