From 2357125e091f4e393f8f3425a0e9cee1a3222b29 Mon Sep 17 00:00:00 2001 From: Gijs Burghoorn Date: Thu, 23 May 2024 17:21:17 +0200 Subject: [PATCH] fix(python): add cluster_with_columns optimization toggle in python (#16446) --- py-polars/polars/functions/lazy.py | 12 +++++++++++ py-polars/polars/lazyframe/frame.py | 33 +++++++++++++++++++++++++++++ py-polars/src/lazyframe/mod.rs | 2 ++ py-polars/tests/unit/test_cwc.py | 15 +++++++++++++ 4 files changed, 62 insertions(+) diff --git a/py-polars/polars/functions/lazy.py b/py-polars/polars/functions/lazy.py index ecbb5fbb7bee..302614a7ac19 100644 --- a/py-polars/polars/functions/lazy.py +++ b/py-polars/polars/functions/lazy.py @@ -1747,6 +1747,7 @@ def collect_all( slice_pushdown: bool = True, comm_subplan_elim: bool = True, comm_subexpr_elim: bool = True, + cluster_with_columns: bool = True, streaming: bool = False, ) -> list[DataFrame]: """ @@ -1774,6 +1775,8 @@ def collect_all( Will try to cache branching subplans that occur on self-joins or unions. comm_subexpr_elim Common subexpressions will be cached and reused. + cluster_with_columns + Combine sequential independent calls to with_columns streaming Process the query in batches to handle larger-than-memory data. If set to `False` (default), the entire query is processed in a single @@ -1798,6 +1801,7 @@ def collect_all( slice_pushdown = False comm_subplan_elim = False comm_subexpr_elim = False + cluster_with_columns = False if streaming: issue_unstable_warning("Streaming mode is considered unstable.") @@ -1814,6 +1818,7 @@ def collect_all( slice_pushdown, comm_subplan_elim, comm_subexpr_elim, + cluster_with_columns, streaming, _eager=False, ) @@ -1840,6 +1845,7 @@ def collect_all_async( slice_pushdown: bool = True, comm_subplan_elim: bool = True, comm_subexpr_elim: bool = True, + cluster_with_columns: bool = True, streaming: bool = True, ) -> _GeventDataFrameResult[list[DataFrame]]: ... @@ -1857,6 +1863,7 @@ def collect_all_async( slice_pushdown: bool = True, comm_subplan_elim: bool = True, comm_subexpr_elim: bool = True, + cluster_with_columns: bool = True, streaming: bool = False, ) -> Awaitable[list[DataFrame]]: ... @@ -1874,6 +1881,7 @@ def collect_all_async( slice_pushdown: bool = True, comm_subplan_elim: bool = True, comm_subexpr_elim: bool = True, + cluster_with_columns: bool = True, streaming: bool = False, ) -> Awaitable[list[DataFrame]] | _GeventDataFrameResult[list[DataFrame]]: """ @@ -1912,6 +1920,8 @@ def collect_all_async( Will try to cache branching subplans that occur on self-joins or unions. comm_subexpr_elim Common subexpressions will be cached and reused. + cluster_with_columns + Combine sequential independent calls to with_columns streaming Process the query in batches to handle larger-than-memory data. If set to `False` (default), the entire query is processed in a single @@ -1948,6 +1958,7 @@ def collect_all_async( slice_pushdown = False comm_subplan_elim = False comm_subexpr_elim = False + cluster_with_columns = False if streaming: issue_unstable_warning("Streaming mode is considered unstable.") @@ -1964,6 +1975,7 @@ def collect_all_async( slice_pushdown, comm_subplan_elim, comm_subexpr_elim, + cluster_with_columns, streaming, _eager=False, ) diff --git a/py-polars/polars/lazyframe/frame.py b/py-polars/polars/lazyframe/frame.py index abfd869c5792..055b954a979d 100644 --- a/py-polars/polars/lazyframe/frame.py +++ b/py-polars/polars/lazyframe/frame.py @@ -889,6 +889,7 @@ def explain( slice_pushdown: bool = True, comm_subplan_elim: bool = True, comm_subexpr_elim: bool = True, + cluster_with_columns: bool = True, streaming: bool = False, tree_format: bool = False, ) -> str: @@ -918,6 +919,8 @@ def explain( Will try to cache branching subplans that occur on self-joins or unions. comm_subexpr_elim Common subexpressions will be cached and reused. + cluster_with_columns + Combine sequential independent calls to with_columns streaming Run parts of the query in a streaming fashion (this is in an alpha state) tree_format @@ -945,6 +948,7 @@ def explain( slice_pushdown, comm_subplan_elim, comm_subexpr_elim, + cluster_with_columns, streaming, _eager=False, ) @@ -971,6 +975,7 @@ def show_graph( slice_pushdown: bool = True, comm_subplan_elim: bool = True, comm_subexpr_elim: bool = True, + cluster_with_columns: bool = True, streaming: bool = False, ) -> str | None: """ @@ -1005,6 +1010,8 @@ def show_graph( Will try to cache branching subplans that occur on self-joins or unions. comm_subexpr_elim Common subexpressions will be cached and reused. + cluster_with_columns + Combine sequential independent calls to with_columns streaming Run parts of the query in a streaming fashion (this is in an alpha state) @@ -1029,6 +1036,7 @@ def show_graph( slice_pushdown, comm_subplan_elim, comm_subexpr_elim, + cluster_with_columns, streaming, _eager=False, ) @@ -1513,6 +1521,7 @@ def profile( slice_pushdown: bool = True, comm_subplan_elim: bool = True, comm_subexpr_elim: bool = True, + cluster_with_columns: bool = True, show_plot: bool = False, truncate_nodes: int = 0, figsize: tuple[int, int] = (18, 8), @@ -1545,6 +1554,8 @@ def profile( Will try to cache branching subplans that occur on self-joins or unions. comm_subexpr_elim Common subexpressions will be cached and reused. + cluster_with_columns + Combine sequential independent calls to with_columns show_plot Show a gantt chart of the profiling result truncate_nodes @@ -1593,6 +1604,7 @@ def profile( projection_pushdown = False comm_subplan_elim = False comm_subexpr_elim = False + cluster_with_columns = False ldf = self._ldf.optimization_toggle( type_coercion, @@ -1602,6 +1614,7 @@ def profile( slice_pushdown, comm_subplan_elim, comm_subexpr_elim, + cluster_with_columns, streaming, _eager=False, ) @@ -1658,6 +1671,7 @@ def collect( slice_pushdown: bool = True, comm_subplan_elim: bool = True, comm_subexpr_elim: bool = True, + cluster_with_columns: bool = True, no_optimization: bool = False, streaming: bool = False, background: Literal[True], @@ -1675,6 +1689,7 @@ def collect( slice_pushdown: bool = True, comm_subplan_elim: bool = True, comm_subexpr_elim: bool = True, + cluster_with_columns: bool = True, no_optimization: bool = False, streaming: bool = False, background: Literal[False] = False, @@ -1691,6 +1706,7 @@ def collect( slice_pushdown: bool = True, comm_subplan_elim: bool = True, comm_subexpr_elim: bool = True, + cluster_with_columns: bool = True, no_optimization: bool = False, streaming: bool = False, background: bool = False, @@ -1719,6 +1735,8 @@ def collect( Will try to cache branching subplans that occur on self-joins or unions. comm_subexpr_elim Common subexpressions will be cached and reused. + cluster_with_columns + Combine sequential independent calls to with_columns no_optimization Turn off (certain) optimizations. streaming @@ -1792,6 +1810,7 @@ def collect( slice_pushdown = False comm_subplan_elim = False comm_subexpr_elim = False + cluster_with_columns = False if streaming: issue_unstable_warning("Streaming mode is considered unstable.") @@ -1805,6 +1824,7 @@ def collect( slice_pushdown, comm_subplan_elim, comm_subexpr_elim, + cluster_with_columns, streaming, _eager, ) @@ -1829,6 +1849,7 @@ def collect_async( slice_pushdown: bool = True, comm_subplan_elim: bool = True, comm_subexpr_elim: bool = True, + cluster_with_columns: bool = True, streaming: bool = True, ) -> _GeventDataFrameResult[DataFrame]: ... @@ -1845,6 +1866,7 @@ def collect_async( slice_pushdown: bool = True, comm_subplan_elim: bool = True, comm_subexpr_elim: bool = True, + cluster_with_columns: bool = True, streaming: bool = True, ) -> Awaitable[DataFrame]: ... @@ -1860,6 +1882,7 @@ def collect_async( slice_pushdown: bool = True, comm_subplan_elim: bool = True, comm_subexpr_elim: bool = True, + cluster_with_columns: bool = True, streaming: bool = False, ) -> Awaitable[DataFrame] | _GeventDataFrameResult[DataFrame]: """ @@ -1896,6 +1919,8 @@ def collect_async( Will try to cache branching subplans that occur on self-joins or unions. comm_subexpr_elim Common subexpressions will be cached and reused. + cluster_with_columns + Combine sequential independent calls to with_columns streaming Process the query in batches to handle larger-than-memory data. If set to `False` (default), the entire query is processed in a single @@ -1960,6 +1985,7 @@ def collect_async( slice_pushdown = False comm_subplan_elim = False comm_subexpr_elim = False + cluster_with_columns = False if streaming: issue_unstable_warning("Streaming mode is considered unstable.") @@ -1973,6 +1999,7 @@ def collect_async( slice_pushdown, comm_subplan_elim, comm_subexpr_elim, + cluster_with_columns, streaming, _eager=False, ) @@ -2379,6 +2406,7 @@ def _set_sink_optimizations( slice_pushdown, comm_subplan_elim=False, comm_subexpr_elim=False, + cluster_with_columns=False, streaming=True, _eager=False, ) @@ -2395,6 +2423,7 @@ def fetch( slice_pushdown: bool = True, comm_subplan_elim: bool = True, comm_subexpr_elim: bool = True, + cluster_with_columns: bool = True, streaming: bool = False, ) -> DataFrame: """ @@ -2420,6 +2449,8 @@ def fetch( Will try to cache branching subplans that occur on self-joins or unions. comm_subexpr_elim Common subexpressions will be cached and reused. + cluster_with_columns + Combine sequential independent calls to with_columns streaming Run parts of the query in a streaming fashion (this is in an alpha state) @@ -2467,6 +2498,7 @@ def fetch( slice_pushdown = False comm_subplan_elim = False comm_subexpr_elim = False + cluster_with_columns = False lf = self._ldf.optimization_toggle( type_coercion, @@ -2476,6 +2508,7 @@ def fetch( slice_pushdown, comm_subplan_elim, comm_subexpr_elim, + cluster_with_columns, streaming, _eager=False, ) diff --git a/py-polars/src/lazyframe/mod.rs b/py-polars/src/lazyframe/mod.rs index 072b02a6cff1..42d831a315c0 100644 --- a/py-polars/src/lazyframe/mod.rs +++ b/py-polars/src/lazyframe/mod.rs @@ -457,6 +457,7 @@ impl PyLazyFrame { slice_pushdown: bool, comm_subplan_elim: bool, comm_subexpr_elim: bool, + cluster_with_columns: bool, streaming: bool, _eager: bool, ) -> Self { @@ -466,6 +467,7 @@ impl PyLazyFrame { .with_predicate_pushdown(predicate_pushdown) .with_simplify_expr(simplify_expr) .with_slice_pushdown(slice_pushdown) + .with_cluster_with_columns(cluster_with_columns) .with_streaming(streaming) ._with_eager(_eager) .with_projection_pushdown(projection_pushdown); diff --git a/py-polars/tests/unit/test_cwc.py b/py-polars/tests/unit/test_cwc.py index fe27f0652e99..19d59292fca7 100644 --- a/py-polars/tests/unit/test_cwc.py +++ b/py-polars/tests/unit/test_cwc.py @@ -18,6 +18,21 @@ def test_basic_cwc() -> None: ) +def test_disable_cwc() -> None: + df = ( + pl.LazyFrame({"a": [1, 2]}) + .with_columns(pl.col("a").alias("b") * 2) + .with_columns(pl.col("a").alias("c") * 3) + .with_columns(pl.col("a").alias("d") * 4) + ) + + explain = df.explain(cluster_with_columns=False) + + assert """[[(col("a")) * (2)].alias("b")]""" in explain + assert """[[(col("a")) * (3)].alias("c")]""" in explain + assert """[[(col("a")) * (4)].alias("d")]""" in explain + + def test_refuse_with_deps() -> None: df = ( pl.LazyFrame({"a": [1, 2]})