Skip to content

Commit

Permalink
0.6.3 (#91)
Browse files Browse the repository at this point in the history
* ✨ Allow `base.c()` to handle groupby data

* 🚑 Allow `base.diff()` to work with groupby data

* ✨ Allow `forcats.fct_inorder()` to work with groupby data

* Add SeriesGroupBy as available type for forcats verbs

* 🚑 Fix `base.diff()` not keep empty groups
✨Allow `base.rep()`'s arguments `length` and `each` to work with grouped data
✨Allow `base.c()` to work with grouped data
🐛 Fix recycling non-ordered grouped data
🐛 Force `&/|` operators to return boolean data
🚑 Make `dplyr.n()` return groupoed data
🩹 Fix `dplyr.count()/tally()`'s warning about the new name
🐛 Make `dplyr.slice()` work better with rows/indices from grouped data

* ✨ Add `datar.attrgetter()`,  `datar.pd_str()`, `datar.pd_cat()` and  `datar.pd_dt()`

* 🚑 Fix `base.c()` with grouped data

* 📝 Update docs for `datar.datar`

* 🔖 0.6.3

* Update readme.ipynb
  • Loading branch information
pwwang authored Mar 15, 2022
1 parent 4d8d3d5 commit 4db4f0a
Show file tree
Hide file tree
Showing 32 changed files with 1,339 additions and 303 deletions.
2 changes: 1 addition & 1 deletion datar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
)

__all__ = ("f", "get_versions")
__version__ = "0.6.2"
__version__ = "0.6.3"


def get_versions(prnt: bool = True) -> _VersionsTuple:
Expand Down
2 changes: 1 addition & 1 deletion datar/base/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,12 @@
from .na import NA, NaN, any_na, is_na, Inf, is_finite, is_infinite, is_nan
from .null import NULL, as_null, is_null
from .random import set_seed
from .rep import rep
from .seq import (
c,
length,
lengths,
order,
rep,
rev,
sample,
seq,
Expand Down
25 changes: 22 additions & 3 deletions datar/base/funs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
import itertools

import numpy as np
import pandas
import pandas as pd
from pandas.api.types import is_scalar
from pandas.core.groupby import SeriesGroupBy
from pipda import register_func

from ..core.middlewares import WithDataEnv
Expand Down Expand Up @@ -56,7 +57,7 @@ def cut(
if labels is None:
ordered_result = True

return pandas.cut(
return pd.cut(
x,
breaks,
labels=labels,
Expand All @@ -67,7 +68,7 @@ def cut(
)


@func_factory("agg", "x")
@func_factory("apply", "x")
def diff(x, lag: int = 1, differences: int = 1):
"""Calculates suitably lagged and iterated differences.
Expand All @@ -94,11 +95,29 @@ def diff(x, lag: int = 1, differences: int = 1):
If `differences > 1`, the rule applies `differences` times on `x`
"""
x = x.values
if lag * differences >= x.size:
return np.array([], dtype=x.dtype)

for _ in range(differences):
x = x[lag:] - x[:-lag]
return x


def _diff_sgb_post(out, x, lag=1, differences=1):
"""Post process diff on SeriesGroupBy object"""
non_na_out = out[out.transform(len) > 0]
non_na_out = non_na_out.explode()
grouping = pd.Categorical(non_na_out.index, categories=out.index.unique())
return (
non_na_out.explode()
.reset_index(drop=True)
.groupby(grouping, observed=False)
)


diff.register(SeriesGroupBy, func=None, post=_diff_sgb_post)


@register_func(None, context=Context.EVAL)
def identity(x):
"""Return whatever passed in
Expand Down
165 changes: 165 additions & 0 deletions datar/base/rep.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
from functools import singledispatch

import numpy as np
import pandas as pd
from pandas import DataFrame, Series, Categorical
from pandas.api.types import is_scalar, is_integer
from pandas.core.groupby import SeriesGroupBy
from pipda import register_func

from ..core.contexts import Context
from ..core.tibble import TibbleGrouped, reconstruct_tibble
from ..core.utils import ensure_nparray, logger


def _rep(x, times, length, each):
"""Repeat sequence x"""
x = ensure_nparray(x)
times = ensure_nparray(times)
length = ensure_nparray(length)
each = ensure_nparray(each)
if times.size == 1:
times = times[0]
if length.size >= 1:
if length.size > 1:
logger.warning(
"In rep(...) : first element used of 'length' argument"
)
length = length[0]
if each.size == 1:
each = each[0]

if not is_scalar(times):
if times.size != x.size:
raise ValueError(
"Invalid times argument, expect length "
f"{x.size}, got {times.size}"
)

if not is_integer(each) or each != 1:
raise ValueError(
"Unexpected each argument when times is an iterable."
)

if is_integer(times) and is_scalar(times):
x = np.tile(np.repeat(x, each), times)
else:
x = np.repeat(x, times)

if length is None:
return x

repeats = length // x.size + 1
x = np.tile(x, repeats)

return x[:length]


@singledispatch
def _rep_dispatched(x, times, length, each):
"""Repeat sequence x"""
times_sgb = isinstance(times, SeriesGroupBy)
length_sgb = isinstance(length, SeriesGroupBy)
each_sgb = isinstance(each, SeriesGroupBy)
values = {}
if times_sgb:
values["times"] = times
if length_sgb:
values["length"] = length
if each_sgb:
values["each"] = each

if values:
from ..tibble import tibble
df = tibble(**values)
out = df._datar["grouped"].apply(
lambda subdf: _rep(
x,
times=subdf["times"] if times_sgb else times,
length=subdf["length"] if length_sgb else length,
each=subdf["each"] if each_sgb else each,
)
)
non_na_out = out[out.transform(len) > 0]
non_na_out = non_na_out.explode()
grouping = Categorical(non_na_out.index, categories=out.index.unique())
return (
non_na_out.explode()
.reset_index(drop=True)
.groupby(grouping, observed=False)
)

return _rep(x, times, length, each)


@_rep_dispatched.register(Series)
def _(x, times, length, each):
return _rep_dispatched.dispatch(object)(x.values, times, length, each)


@_rep_dispatched.register(SeriesGroupBy)
def _(x, times, length, each):
from ..tibble import tibble
df = tibble(x=x)
times_sgb = isinstance(times, SeriesGroupBy)
length_sgb = isinstance(length, SeriesGroupBy)
each_sgb = isinstance(each, SeriesGroupBy)
if times_sgb:
df["times"] = times
if length_sgb:
df["length"] = length
if each_sgb:
df["each"] = each

out = df._datar["grouped"].apply(
lambda subdf: _rep(
subdf["x"],
times=subdf["times"] if times_sgb else times,
length=subdf["length"] if length_sgb else length,
each=subdf["each"] if each_sgb else each,
)
).explode().astype(x.obj.dtype)
grouping = out.index
return out.reset_index(drop=True).groupby(grouping)


@_rep_dispatched.register(DataFrame)
def _(x, times, length, each):
if not is_integer(each) or each != 1:
raise ValueError(
"`each` has to be 1 to replicate a data frame."
)

out = pd.concat([x] * times, ignore_index=True)
if length is not None:
out = out.iloc[:length, :]

return out


@_rep_dispatched.register(TibbleGrouped)
def _(x, times, length, each):
out = _rep_dispatched.dispatch(DataFrame)(x, times, length, each)
return reconstruct_tibble(x, out)


@register_func(None, context=Context.EVAL)
def rep(
x,
times=1,
length=None,
each=1,
):
"""replicates the values in x
Args:
x: a vector or scaler
times: number of times to repeat each element if of length len(x),
or to repeat the whole vector if of length 1
length: non-negative integer. The desired length of the output vector
each: non-negative integer. Each element of x is repeated each times.
Returns:
An array of repeated elements in x.
"""
return _rep_dispatched(x, times, length, each)
122 changes: 47 additions & 75 deletions datar/base/seq.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import numpy as np
from pandas import Series
from pandas.api.types import is_scalar, is_integer
from pandas import DataFrame, Series
from pandas.api.types import is_scalar
from pandas.core.groupby import SeriesGroupBy, GroupBy
from pipda import register_func

from ..core.utils import ensure_nparray, logger, regcall
from ..core.utils import logger, regcall
from ..core.factory import func_factory
from ..core.contexts import Context
from ..core.collections import Collection
from ..core.tibble import TibbleGrouped, reconstruct_tibble
from ..core.tibble import TibbleGrouped


@register_func(None, context=Context.EVAL)
Expand Down Expand Up @@ -80,77 +80,6 @@ def seq(
return np.array([from_ + n * by for n in range(int(length_out))])


@register_func(None, context=Context.UNSET)
def c(*elems):
"""Mimic R's concatenation. Named one is not supported yet
All elements passed in will be flattened.
Args:
*elems: The elements
Returns:
A collection of elements
"""
return Collection(*elems)


@func_factory("apply", "x")
def rep(
x,
times=1,
length=None,
each=1
):
"""replicates the values in x
Args:
x: a vector or scaler
times: number of times to repeat each element if of length len(x),
or to repeat the whole vector if of length 1
length: non-negative integer. The desired length of the output vector
each: non-negative integer. Each element of x is repeated each times.
Returns:
A list of repeated elements in x.
"""
x = ensure_nparray(x)
if not is_scalar(times):
if len(times) != len(x):
raise ValueError(
"Invalid times argument, expect length "
f"{len(times)}, got {len(x)}"
)
if each != 1:
raise ValueError(
"Unexpected each argument when times is an iterable."
)

if is_integer(times) and is_scalar(times):
x = np.tile(x.repeat(each), times)
else:
x = x.repeat(times)
if length is None:
return x

repeats = length // len(x) + 1
x = np.tile(x, repeats)
return x[:length]


rep.register(
SeriesGroupBy,
func=None,
post=lambda out, x, *args, **kwargs: out.explode().astype(x.obj.dtype)
)


rep.register(
TibbleGrouped,
func=None,
post=lambda out, x, *args, **kwargs: reconstruct_tibble(x, out)
)


@func_factory("agg", "x")
def length(x):
"""Get length of elements"""
Expand Down Expand Up @@ -321,3 +250,46 @@ def match_dummy(xx, tab):
return Series(match_dummy(x, table), index=x.index)

return match_dummy(x, table)


@register_func(None, context=Context.UNSET)
def c(*elems):
"""Mimic R's concatenation. Named one is not supported yet
All elements passed in will be flattened.
Args:
*elems: The elements
Returns:
A collection of elements
"""
if not any(isinstance(elem, SeriesGroupBy) for elem in elems):
return Collection(*elems)

from ..tibble import tibble

values = []
for elem in elems:
if isinstance(elem, SeriesGroupBy):
values.append(elem.agg(list))
elif is_scalar(elem):
values.append(elem)
else:
values.extend(elem)

df = tibble(*values)
# pandas 1.3.0 expand list into columns after aggregation
# pandas 1.3.2 has this fixed
# https://github.com/pandas-dev/pandas/issues/42727
out = df.agg(
lambda row: Collection(*row),
axis=1,
)
if isinstance(out, DataFrame):
# pandas < 1.3.2
out = Series(out.values.tolist(), index=out.index, dtype=object)

out = out.explode().convert_dtypes()
grouping = out.index
out = out.reset_index(drop=True).groupby(grouping)
return out
Loading

0 comments on commit 4db4f0a

Please sign in to comment.