Skip to content

Commit

Permalink
♻️ Refactor mapply() to split along opposite axis (#1)
Browse files Browse the repository at this point in the history
* ♻️ Refactor mapply() to split along opposite axis

* 🥅 Catch bad combination of passing Series and axis=1

* ✅ Add test for Series + axis=1 ValueError
  • Loading branch information
ddelange authored Oct 29, 2020
1 parent d709dac commit 14a4f01
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 44 deletions.
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ $ pip install mapply

[![readthedocs](https://readthedocs.org/projects/mapply/badge/?version=latest)](https://mapply.readthedocs.io)

For documentation, see [mapply.readthedocs.io](https://mapply.readthedocs.io).
For documentation, see [mapply.readthedocs.io](https://mapply.readthedocs.io/en/stable/_code_reference/mapply.html).

```py
import pandas as pd
Expand All @@ -36,13 +36,15 @@ import mapply
mapply.init(
n_workers=-1,
chunk_size=100,
max_chunks_per_worker=10,
max_chunks_per_worker=8,
progressbar=False
)

df = pd.DataFrame({"a": list(range(100))})

# Avoid unnecessary multiprocessing: due to chunk_size=100, this will act as regular apply
# avoid unnecessary multiprocessing:
# due to chunk_size=100, this will act as regular apply.
# set chunk_size=1 to skip this check and let max_chunks_per_worker decide.
df["squared"] = df.mapply(lambda x: x ** 2)
```

Expand Down
7 changes: 5 additions & 2 deletions src/mapply/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,11 @@ def init(
Subsequent calls to this function will create/overwrite methods with new settings.
Args:
n_workers: Amount of workers (processes) to spawn.
chunk_size: Minimum amount of items per chunk. Determines upper limit for n_chunks.
n_workers: Maximum amount of workers (processes) to spawn. Might be lowered
depending on chunk_size and max_chunks_per_worker. Will throw a warning if
set higher than is sensible (see :meth:`mapply.parallel.sensible_cpu_count`).
chunk_size: Minimum amount of columns/rows per chunk. Higher value means a higher
threshold to go multi-core. Set to 1 to let max_chunks_per_worker decide.
max_chunks_per_worker: Upper limit on amount of chunks per worker. Will lower
n_chunks determined by chunk_size if necessary. Set to 0 to skip this check.
progressbar: Whether to wrap the chunks in a :meth:`tqdm.auto.tqdm`.
Expand Down
61 changes: 33 additions & 28 deletions src/mapply/mapply.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
df["squared"] = mapply(df, lambda x: x ** 2, progressbar=False)
"""
from functools import partial
from typing import Any, Callable, Union
from typing import Any, Callable, Tuple, Union

from mapply.parallel import (
DEFAULT_CHUNK_SIZE,
Expand All @@ -22,14 +22,15 @@


def _choose_n_chunks(
df_or_series: Any,
shape: Tuple[int, ...],
axis: int,
n_workers: int,
chunk_size: int,
max_chunks_per_worker: int,
):
"""Choose final amount of chunks to be sent to the ProcessPool."""
# no sense running parallel if data is too small
n_chunks = int(len(df_or_series) / chunk_size)
n_chunks = int(shape[axis] / chunk_size)

if max_chunks_per_worker:
# no sense making too many chunks
Expand Down Expand Up @@ -62,8 +63,11 @@ def mapply(
df_or_series: Argument reserved to the class instance, a.k.a. 'self'.
func: func to apply to each column or row.
axis: Axis along which func is applied.
n_workers: Amount of workers (processes) to spawn.
chunk_size: Minimum amount of items per chunk. Determines upper limit for n_chunks.
n_workers: Maximum amount of workers (processes) to spawn. Might be lowered
depending on chunk_size and max_chunks_per_worker. Will throw a warning if
set higher than is sensible (see :meth:`mapply.parallel.sensible_cpu_count`).
chunk_size: Minimum amount of columns/rows per chunk. Higher value means a higher
threshold to go multi-core. Set to 1 to let max_chunks_per_worker decide.
max_chunks_per_worker: Upper limit on amount of chunks per worker. Will lower
n_chunks determined by chunk_size if necessary. Set to 0 to skip this check.
progressbar: Whether to wrap the chunks in a :meth:`tqdm.auto.tqdm`.
Expand All @@ -72,29 +76,38 @@ def mapply(
Returns:
Series or DataFrame resulting from applying func along given axis.
Raises:
ValueError: if a Series is passed in combination with axis=1
"""
from numpy import array_split
from pandas import Series, concat

if isinstance(axis, str):
axis = ["index", "columns"].index(axis)

isseries = int(isinstance(df_or_series, Series))

if isseries and axis == 1:
raise ValueError("Passing axis=1 is not allowed for Series")

opposite_axis = 1 - (isseries or axis)

n_chunks = _choose_n_chunks(
df_or_series,
df_or_series.shape,
opposite_axis,
n_workers,
chunk_size,
max_chunks_per_worker,
)

if isinstance(axis, str):
axis = ["index", "columns"].index(axis.lower())
dfs = array_split(df_or_series, n_chunks, axis=opposite_axis)

if axis == 1:
# axis argument pre-processing
df_or_series = df_or_series.T
def run_apply(func, df_or_series, args=(), **kwargs):
return df_or_series.apply(func, args=args, **kwargs)

dfs = array_split(df_or_series, n_chunks, axis=axis)

def run_apply(func, df, args=(), **kwargs):
# axis argument is handled such that always axis=0 here
return df.apply(func, args=args, **kwargs) # pragma: no cover
if not isseries:
kwargs["axis"] = axis

results = multiprocessing_imap(
partial(run_apply, func, args=args, **kwargs),
Expand All @@ -103,15 +116,7 @@ def run_apply(func, df, args=(), **kwargs):
progressbar=progressbar,
)

if (
len(results) > 1
and isinstance(results[0], Series)
and results[0].index.equals(results[1].index)
):
# one more aggregation needed for final df, e.g. df.parallel_apply(sum)
return concat(results, axis=1).apply(func, axis=1, args=args, **kwargs)

if axis == 1:
# axis argument pre-processing
results = (df.T for df in results) # type: ignore
return concat(results)
if not isseries and results[0].index.equals(df_or_series.index):
return concat(results, axis=1, copy=False)

return concat(results, copy=False)
8 changes: 7 additions & 1 deletion src/mapply/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ def multiprocessing_imap(

if n_chunks == 1 or n_workers == 1:
# no sense spawning pool
pool = None
stage = map(func, iterable)
else:
n_workers = _choose_n_workers(n_chunks, n_workers)
Expand All @@ -92,4 +93,9 @@ def multiprocessing_imap(
if progressbar:
stage = tqdm(stage, total=n_chunks)

return list(stage)
try:
return list(stage)
finally:
if pool:
logger.debug("Closing ProcessPool")
pool.clear()
66 changes: 56 additions & 10 deletions tests/test_mapply.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,79 @@
import numpy as np
import pandas as pd
import pytest

import mapply


def test_mapply():
mapply.init(progressbar=False)
def test_df_mapply():
mapply.init(progressbar=False, chunk_size=1)

np.random.seed(1)
df = pd.DataFrame(
pd.np.random.randint(0, 300, size=(int(2000), 4)), columns=list("ABCD")
np.random.randint(0, 300, size=(int(2000), 4)), columns=list("ABCD")
)

# axis as positional arg
df["totals"] = df.mapply(lambda x: x.A + x.B, "columns")
df.mapply(lambda x: x ** 2)

# same output along both axes
pd.testing.assert_frame_equal(
df.apply(lambda x: x ** 2),
df.mapply(lambda x: x ** 2),
)
pd.testing.assert_frame_equal(
df.mapply(lambda x: x ** 2, axis=0),
df.mapply(lambda x: x ** 2, axis=1),
)

# vectorized
pd.testing.assert_series_equal(
df.mapply(sum, max_chunks_per_worker=10),
df.apply(sum),
df.mapply(np.sum, raw=True),
)
pd.testing.assert_series_equal(
df.apply(sum, axis=1),
df.mapply(np.sum, raw=True, axis=1),
)

# result_type kwarg
fn = lambda x: [x.A, x.B] # noqa:E731
pd.testing.assert_frame_equal(
df.mapply(fn, axis=1, result_type="expand"),
df.apply(fn, axis=1, result_type="expand"),
df.mapply(fn, axis=1, result_type="expand"),
)

# max_chunks_per_worker=0
mapply.init(progressbar=False, chunk_size=1, max_chunks_per_worker=0)
pd.testing.assert_frame_equal(
df.apply(lambda x: x ** 2),
df.mapply(lambda x: x ** 2),
)

# n_workers=1
mapply.init(progressbar=False, chunk_size=1, n_workers=1)
pd.testing.assert_frame_equal(
df.apply(lambda x: x ** 2),
df.mapply(lambda x: x ** 2),
)


def test_series_mapply():
# chunk_size>1
mapply.init(progressbar=False, chunk_size=5)

fn = lambda x: x ** 2 # noqa:E731
series = pd.Series(range(100))

with pytest.raises(ValueError, match="Passing axis=1 is not allowed for Series"):
series.mapply(fn, axis=1)

# convert_dtype=False
pd.testing.assert_series_equal(
series.apply(fn, convert_dtype=False),
series.mapply(fn, convert_dtype=False),
)

mapply.init(progressbar=False, max_chunks_per_worker=0)
df.mapply(lambda x: x ** 2)
series = pd.Series({"a": list(range(100))})

mapply.init(progressbar=False, n_workers=1)
df.mapply(lambda x: x ** 2)
assert isinstance(series.mapply(lambda x: sum(x))[0], np.int64)

0 comments on commit 14a4f01

Please sign in to comment.