Skip to content

Commit

Permalink
Update Aggregate functions to take builder parameters (apache#859)
Browse files Browse the repository at this point in the history
* Add NullTreatment enum wrapper and add filter option to approx_distinct

* Small usability on aggregate

* Adding documentation and additional unit test for approx_median

* Update approx_percentil_cont with builder parameters it uses, which is filter but not distinct

* Update approx_percentil_cont_with_weight with builder parameters it uses, which is filter but not distinct

* Update array_agg to use aggregate options

* Update builder options for avg aggregate function

* move bit_and bit_or to use macro to generaty python fn

* Update builder arguments for bitwise operators

* Use macro for bool_and and bool_or

* Update python wrapper for arguments appropriate to bool operators

* Set corr to use macro for pyfunction

* Update unit test to make it easier to debug

* Update corr python wrapper to expose only builder parameters used

* Update count and count_star to use macro for exposing

* Update count and count_star with approprate aggregation options

* Move covar_pop and covar_samp to use macro for aggregates

* Updateing covar_pop and covar_samp with builder option

* Use macro for last_value and move first_value to be near it

* Update first_value and last_value with the builder parameters that are relevant

* Remove grouping since it is not actually implemented upstream

* Move median to use macro

* Expose builder options for median

* Expose nth value

* Updating linear regression functions to use filter and macro

* Update stddev and stddev_pop to use filter and macro

* Expose string_agg

* Add string_agg to python wrappers and add unit test

* Switch sum to use macro in rust side and expose correct options in python wrapper

* Use macro for exposing var_pop and var_samp

* Add unit tests for filtering on var_pop and var_samp

* Move approximation functions to use macro when possible

* Update user documentation to explain in detail the options for aggregate functions

* Update unit test to handle Python 3.10

* Clean up commented code
  • Loading branch information
timsaucer authored and emgeee committed Sep 9, 2024
1 parent 849bf3a commit 81567ef
Show file tree
Hide file tree
Showing 9 changed files with 1,470 additions and 619 deletions.
206 changes: 184 additions & 22 deletions docs/source/user-guide/common-operations/aggregations.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,43 +20,205 @@
Aggregation
============

An aggregate or aggregation is a function where the values of multiple rows are processed together to form a single summary value.
For performing an aggregation, DataFusion provides the :py:func:`~datafusion.dataframe.DataFrame.aggregate`
An aggregate or aggregation is a function where the values of multiple rows are processed together
to form a single summary value. For performing an aggregation, DataFusion provides the
:py:func:`~datafusion.dataframe.DataFrame.aggregate`

.. ipython:: python
import urllib.request
from datafusion import SessionContext
from datafusion import column, lit
from datafusion import col, lit
from datafusion import functions as f
import random
ctx = SessionContext()
df = ctx.from_pydict(
{
"a": ["foo", "bar", "foo", "bar", "foo", "bar", "foo", "foo"],
"b": ["one", "one", "two", "three", "two", "two", "one", "three"],
"c": [random.randint(0, 100) for _ in range(8)],
"d": [random.random() for _ in range(8)],
},
name="foo_bar"
urllib.request.urlretrieve(
"https://gist.githubusercontent.com/ritchie46/cac6b337ea52281aa23c049250a4ff03/raw/89a957ff3919d90e6ef2d34235e6bf22304f3366/pokemon.csv",
"pokemon.csv",
)
col_a = column("a")
col_b = column("b")
col_c = column("c")
col_d = column("d")
ctx = SessionContext()
df = ctx.read_csv("pokemon.csv")
col_type_1 = col('"Type 1"')
col_type_2 = col('"Type 2"')
col_speed = col('"Speed"')
col_attack = col('"Attack"')
df.aggregate([], [f.approx_distinct(col_c), f.approx_median(col_d), f.approx_percentile_cont(col_d, lit(0.5))])
df.aggregate([col_type_1], [
f.approx_distinct(col_speed).alias("Count"),
f.approx_median(col_speed).alias("Median Speed"),
f.approx_percentile_cont(col_speed, 0.9).alias("90% Speed")])
When the :code:`group_by` list is empty the aggregation is done over the whole :class:`.DataFrame`. For grouping
the :code:`group_by` list must contain at least one column
When the :code:`group_by` list is empty the aggregation is done over the whole :class:`.DataFrame`.
For grouping the :code:`group_by` list must contain at least one column.

.. ipython:: python
df.aggregate([col_a], [f.sum(col_c), f.max(col_d), f.min(col_d)])
df.aggregate([col_type_1], [
f.max(col_speed).alias("Max Speed"),
f.avg(col_speed).alias("Avg Speed"),
f.min(col_speed).alias("Min Speed")])
More than one column can be used for grouping

.. ipython:: python
df.aggregate([col_a, col_b], [f.sum(col_c), f.max(col_d), f.min(col_d)])
df.aggregate([col_type_1, col_type_2], [
f.max(col_speed).alias("Max Speed"),
f.avg(col_speed).alias("Avg Speed"),
f.min(col_speed).alias("Min Speed")])
Setting Parameters
------------------

Each of the built in aggregate functions provides arguments for the parameters that affect their
operation. These can also be overridden using the builder approach to setting any of the following
parameters. When you use the builder, you must call ``build()`` to finish. For example, these two
expressions are equivalent.

.. ipython:: python
first_1 = f.first_value(col("a"), order_by=[col("a")])
first_2 = f.first_value(col("a")).order_by(col("a")).build()
Ordering
^^^^^^^^

You can control the order in which rows are processed by window functions by providing
a list of ``order_by`` functions for the ``order_by`` parameter. In the following example, we
sort the Pokemon by their attack in increasing order and take the first value, which gives us the
Pokemon with the smallest attack value in each ``Type 1``.

.. ipython:: python
df.aggregate(
[col('"Type 1"')],
[f.first_value(
col('"Name"'),
order_by=[col('"Attack"').sort(ascending=True)]
).alias("Smallest Attack")
])
Distinct
^^^^^^^^

When you set the parameter ``distinct`` to ``True``, then unique values will only be evaluated one
time each. Suppose we want to create an array of all of the ``Type 2`` for each ``Type 1`` of our
Pokemon set. Since there will be many entries of ``Type 2`` we only one each distinct value.

.. ipython:: python
df.aggregate([col_type_1], [f.array_agg(col_type_2, distinct=True).alias("Type 2 List")])
In the output of the above we can see that there are some ``Type 1`` for which the ``Type 2`` entry
is ``null``. In reality, we probably want to filter those out. We can do this in two ways. First,
we can filter DataFrame rows that have no ``Type 2``. If we do this, we might have some ``Type 1``
entries entirely removed. The second is we can use the ``filter`` argument described below.

.. ipython:: python
df.filter(col_type_2.is_not_null()).aggregate([col_type_1], [f.array_agg(col_type_2, distinct=True).alias("Type 2 List")])
df.aggregate([col_type_1], [f.array_agg(col_type_2, distinct=True, filter=col_type_2.is_not_null()).alias("Type 2 List")])
Which approach you take should depend on your use case.

Null Treatment
^^^^^^^^^^^^^^

This option allows you to either respect or ignore null values.

One common usage for handling nulls is the case where you want to find the first value within a
partition. By setting the null treatment to ignore nulls, we can find the first non-null value
in our partition.


.. ipython:: python
from datafusion.common import NullTreatment
df.aggregate([col_type_1], [
f.first_value(
col_type_2,
order_by=[col_attack],
null_treatment=NullTreatment.RESPECT_NULLS
).alias("Lowest Attack Type 2")])
df.aggregate([col_type_1], [
f.first_value(
col_type_2,
order_by=[col_attack],
null_treatment=NullTreatment.IGNORE_NULLS
).alias("Lowest Attack Type 2")])
Filter
^^^^^^

Using the filter option is useful for filtering results to include in the aggregate function. It can
be seen in the example above on how this can be useful to only filter rows evaluated by the
aggregate function without filtering rows from the entire DataFrame.

Filter takes a single expression.

Suppose we want to find the speed values for only Pokemon that have low Attack values.

.. ipython:: python
df.aggregate([col_type_1], [
f.avg(col_speed).alias("Avg Speed All"),
f.avg(col_speed, filter=col_attack < lit(50)).alias("Avg Speed Low Attack")])
Aggregate Functions
-------------------

The available aggregate functions are:

1. Comparison Functions
- :py:func:`datafusion.functions.min`
- :py:func:`datafusion.functions.max`
2. Math Functions
- :py:func:`datafusion.functions.sum`
- :py:func:`datafusion.functions.avg`
- :py:func:`datafusion.functions.median`
3. Array Functions
- :py:func:`datafusion.functions.array_agg`
4. Logical Functions
- :py:func:`datafusion.functions.bit_and`
- :py:func:`datafusion.functions.bit_or`
- :py:func:`datafusion.functions.bit_xor`
- :py:func:`datafusion.functions.bool_and`
- :py:func:`datafusion.functions.bool_or`
5. Statistical Functions
- :py:func:`datafusion.functions.count`
- :py:func:`datafusion.functions.corr`
- :py:func:`datafusion.functions.covar_samp`
- :py:func:`datafusion.functions.covar_pop`
- :py:func:`datafusion.functions.stddev`
- :py:func:`datafusion.functions.stddev_pop`
- :py:func:`datafusion.functions.var_samp`
- :py:func:`datafusion.functions.var_pop`
6. Linear Regression Functions
- :py:func:`datafusion.functions.regr_count`
- :py:func:`datafusion.functions.regr_slope`
- :py:func:`datafusion.functions.regr_intercept`
- :py:func:`datafusion.functions.regr_r2`
- :py:func:`datafusion.functions.regr_avgx`
- :py:func:`datafusion.functions.regr_avgy`
- :py:func:`datafusion.functions.regr_sxx`
- :py:func:`datafusion.functions.regr_syy`
- :py:func:`datafusion.functions.regr_slope`
7. Positional Functions
- :py:func:`datafusion.functions.first_value`
- :py:func:`datafusion.functions.last_value`
- :py:func:`datafusion.functions.nth_value`
8. String Functions
- :py:func:`datafusion.functions.string_agg`
9. Approximation Functions
- :py:func:`datafusion.functions.approx_distinct`
- :py:func:`datafusion.functions.approx_median`
- :py:func:`datafusion.functions.approx_percentile_cont`
- :py:func:`datafusion.functions.approx_percentile_cont_with_weight`

15 changes: 14 additions & 1 deletion python/datafusion/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
"""Common data types used throughout the DataFusion project."""

from ._internal import common as common_internal
from enum import Enum

# TODO these should all have proper wrapper classes

DFSchema = common_internal.DFSchema
DataType = common_internal.DataType
DataTypeMap = common_internal.DataTypeMap
NullTreatment = common_internal.NullTreatment
PythonType = common_internal.PythonType
RexType = common_internal.RexType
SqlFunction = common_internal.SqlFunction
Expand All @@ -47,3 +47,16 @@
"SqlStatistics",
"SqlFunction",
]


class NullTreatment(Enum):
"""Describe how null values are to be treated by functions.
This is used primarily by aggregate and window functions. It can be set on
these functions using the builder approach described in
ref:`_window_functions` and ref:`_aggregation` in the online documentation.
"""

RESPECT_NULLS = common_internal.NullTreatment.RESPECT_NULLS
IGNORE_NULLS = common_internal.NullTreatment.IGNORE_NULLS
7 changes: 6 additions & 1 deletion python/datafusion/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,9 @@ def with_column_renamed(self, old_name: str, new_name: str) -> DataFrame:
"""
return DataFrame(self.df.with_column_renamed(old_name, new_name))

def aggregate(self, group_by: list[Expr], aggs: list[Expr]) -> DataFrame:
def aggregate(
self, group_by: list[Expr] | Expr, aggs: list[Expr] | Expr
) -> DataFrame:
"""Aggregates the rows of the current DataFrame.
Args:
Expand All @@ -190,6 +192,9 @@ def aggregate(self, group_by: list[Expr], aggs: list[Expr]) -> DataFrame:
Returns:
DataFrame after aggregation.
"""
group_by = group_by if isinstance(group_by, list) else [group_by]
aggs = aggs if isinstance(aggs, list) else [aggs]

group_by = [e.expr for e in group_by]
aggs = [e.expr for e in aggs]
return DataFrame(self.df.aggregate(group_by, aggs))
Expand Down
4 changes: 2 additions & 2 deletions python/datafusion/expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ def null_treatment(self, null_treatment: NullTreatment) -> ExprFuncBuilder:
set parameters for either window or aggregate functions. If used on any other
type of expression, an error will be generated when ``build()`` is called.
"""
return ExprFuncBuilder(self.expr.null_treatment(null_treatment))
return ExprFuncBuilder(self.expr.null_treatment(null_treatment.value))

def partition_by(self, *partition_by: Expr) -> ExprFuncBuilder:
"""Set the partitioning for a window function.
Expand Down Expand Up @@ -518,7 +518,7 @@ def distinct(self) -> ExprFuncBuilder:

def null_treatment(self, null_treatment: NullTreatment) -> ExprFuncBuilder:
"""Set how nulls are treated for either window or aggregate functions."""
return ExprFuncBuilder(self.builder.null_treatment(null_treatment))
return ExprFuncBuilder(self.builder.null_treatment(null_treatment.value))

def partition_by(self, *partition_by: Expr) -> ExprFuncBuilder:
"""Set partitioning for window functions."""
Expand Down
Loading

0 comments on commit 81567ef

Please sign in to comment.