Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[minor] make recursive package dependency optional #13778

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ Default features:
- `parquet`: support for reading the [Apache Parquet] format
- `regex_expressions`: regular expression functions, such as `regexp_match`
- `unicode_expressions`: Include unicode aware functions such as `character_length`
- `unparser` : enables support to reverse LogicalPlans back into SQL
- `unparser`: enables support to reverse LogicalPlans back into SQL
- `recursive-protection`: uses [recursive](https://docs.rs/recursive/latest/recursive/) for stack overflow protection.

Optional features:

Expand Down
1 change: 0 additions & 1 deletion datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@ name = "datafusion_common"
path = "src/lib.rs"

[features]
default = ["recursive-protection"]
avro = ["apache-avro"]
backtrace = []
pyarrow = ["pyo3", "arrow/pyarrow", "parquet"]
force_hash_collisions = []
recursive-protection = ["dep:recursive"]

[dependencies]
ahash = { workspace = true }
Expand All @@ -61,7 +63,7 @@ object_store = { workspace = true, optional = true }
parquet = { workspace = true, optional = true, default-features = true }
paste = "1.0.15"
pyo3 = { version = "0.22.0", optional = true }
recursive = { workspace = true }
recursive = { workspace = true, optional = true }
sqlparser = { workspace = true }
tokio = { workspace = true }

Expand Down
14 changes: 7 additions & 7 deletions datafusion/common/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
//! [`TreeNode`] for visiting and rewriting expression and plan trees

use crate::Result;
use recursive::recursive;
use std::collections::HashMap;
use std::hash::Hash;
use std::sync::Arc;
Expand Down Expand Up @@ -125,7 +124,7 @@ pub trait TreeNode: Sized {
/// TreeNodeVisitor::f_up(ChildNode2)
/// TreeNodeVisitor::f_up(ParentNode)
/// ```
#[recursive]
#[cfg_attr(feature = "recursive-protection", recursive::recursive)]
fn visit<'n, V: TreeNodeVisitor<'n, Node = Self>>(
&'n self,
visitor: &mut V,
Expand Down Expand Up @@ -175,7 +174,7 @@ pub trait TreeNode: Sized {
/// TreeNodeRewriter::f_up(ChildNode2)
/// TreeNodeRewriter::f_up(ParentNode)
/// ```
#[recursive]
#[cfg_attr(feature = "recursive-protection", recursive::recursive)]
fn rewrite<R: TreeNodeRewriter<Node = Self>>(
self,
rewriter: &mut R,
Expand All @@ -198,7 +197,7 @@ pub trait TreeNode: Sized {
&'n self,
mut f: F,
) -> Result<TreeNodeRecursion> {
#[recursive]
#[cfg_attr(feature = "recursive-protection", recursive::recursive)]
fn apply_impl<'n, N: TreeNode, F: FnMut(&'n N) -> Result<TreeNodeRecursion>>(
node: &'n N,
f: &mut F,
Expand Down Expand Up @@ -233,7 +232,7 @@ pub trait TreeNode: Sized {
self,
mut f: F,
) -> Result<Transformed<Self>> {
#[recursive]
#[cfg_attr(feature = "recursive-protection", recursive::recursive)]
fn transform_down_impl<N: TreeNode, F: FnMut(N) -> Result<Transformed<N>>>(
node: N,
f: &mut F,
Expand All @@ -257,7 +256,7 @@ pub trait TreeNode: Sized {
self,
mut f: F,
) -> Result<Transformed<Self>> {
#[recursive]
#[cfg_attr(feature = "recursive-protection", recursive::recursive)]
fn transform_up_impl<N: TreeNode, F: FnMut(N) -> Result<Transformed<N>>>(
node: N,
f: &mut F,
Expand Down Expand Up @@ -372,7 +371,7 @@ pub trait TreeNode: Sized {
mut f_down: FD,
mut f_up: FU,
) -> Result<Transformed<Self>> {
#[recursive]
#[cfg_attr(feature = "recursive-protection", recursive::recursive)]
fn transform_down_up_impl<
N: TreeNode,
FD: FnMut(N) -> Result<Transformed<N>>,
Expand Down Expand Up @@ -2350,6 +2349,7 @@ pub(crate) mod tests {
Ok(())
}

#[cfg(feature = "recursive-protection")]
#[test]
fn test_large_tree() {
let mut item = TestTreeNode::new_leaf("initial".to_string());
Expand Down
4 changes: 3 additions & 1 deletion datafusion/expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ name = "datafusion_expr"
path = "src/lib.rs"

[features]
default = ["recursive-protection"]
recursive-protection = ["dep:recursive"]

[dependencies]
arrow = { workspace = true }
Expand All @@ -48,7 +50,7 @@ datafusion-functions-window-common = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
indexmap = { workspace = true }
paste = "^1.0"
recursive = { workspace = true }
recursive = { workspace = true, optional = true }
serde_json = { workspace = true }
sqlparser = { workspace = true }

Expand Down
3 changes: 1 addition & 2 deletions datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use datafusion_common::{
TableReference,
};
use datafusion_functions_window_common::field::WindowUDFFieldArgs;
use recursive::recursive;
use std::collections::HashMap;
use std::sync::Arc;

Expand Down Expand Up @@ -100,7 +99,7 @@ impl ExprSchemable for Expr {
/// expression refers to a column that does not exist in the
/// schema, or when the expression is incorrectly typed
/// (e.g. `[utf8] + [bool]`).
#[recursive]
#[cfg_attr(feature = "recursive-protection", recursive::recursive)]
fn get_type(&self, schema: &dyn ExprSchema) -> Result<DataType> {
match self {
Expr::Alias(Alias { expr, name, .. }) => match &**expr {
Expand Down
13 changes: 6 additions & 7 deletions datafusion/expr/src/logical_plan/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ use crate::{
UserDefinedLogicalNode, Values, Window,
};
use datafusion_common::tree_node::TreeNodeRefContainer;
use recursive::recursive;

use crate::expr::{Exists, InSubquery};
use datafusion_common::tree_node::{
Expand Down Expand Up @@ -669,7 +668,7 @@ impl LogicalPlan {

/// Visits a plan similarly to [`Self::visit`], including subqueries that
/// may appear in expressions such as `IN (SELECT ...)`.
#[recursive]
#[cfg_attr(feature = "recursive-protection", recursive::recursive)]
pub fn visit_with_subqueries<V: for<'n> TreeNodeVisitor<'n, Node = Self>>(
&self,
visitor: &mut V,
Expand All @@ -688,7 +687,7 @@ impl LogicalPlan {
/// Similarly to [`Self::rewrite`], rewrites this node and its inputs using `f`,
/// including subqueries that may appear in expressions such as `IN (SELECT
/// ...)`.
#[recursive]
#[cfg_attr(feature = "recursive-protection", recursive::recursive)]
pub fn rewrite_with_subqueries<R: TreeNodeRewriter<Node = Self>>(
self,
rewriter: &mut R,
Expand All @@ -707,7 +706,7 @@ impl LogicalPlan {
&self,
mut f: F,
) -> Result<TreeNodeRecursion> {
#[recursive]
#[cfg_attr(feature = "recursive-protection", recursive::recursive)]
fn apply_with_subqueries_impl<
F: FnMut(&LogicalPlan) -> Result<TreeNodeRecursion>,
>(
Expand Down Expand Up @@ -742,7 +741,7 @@ impl LogicalPlan {
self,
mut f: F,
) -> Result<Transformed<Self>> {
#[recursive]
#[cfg_attr(feature = "recursive-protection", recursive::recursive)]
fn transform_down_with_subqueries_impl<
F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
>(
Expand All @@ -767,7 +766,7 @@ impl LogicalPlan {
self,
mut f: F,
) -> Result<Transformed<Self>> {
#[recursive]
#[cfg_attr(feature = "recursive-protection", recursive::recursive)]
fn transform_up_with_subqueries_impl<
F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
>(
Expand Down Expand Up @@ -795,7 +794,7 @@ impl LogicalPlan {
mut f_down: FD,
mut f_up: FU,
) -> Result<Transformed<Self>> {
#[recursive]
#[cfg_attr(feature = "recursive-protection", recursive::recursive)]
fn transform_down_up_with_subqueries_impl<
FD: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
FU: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
Expand Down
6 changes: 5 additions & 1 deletion datafusion/optimizer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ workspace = true
name = "datafusion_optimizer"
path = "src/lib.rs"

[features]
default = ["recursive-protection"]
recursive-protection = ["dep:recursive"]

[dependencies]
arrow = { workspace = true }
chrono = { workspace = true }
Expand All @@ -44,7 +48,7 @@ datafusion-physical-expr = { workspace = true }
indexmap = { workspace = true }
itertools = { workspace = true }
log = { workspace = true }
recursive = { workspace = true }
recursive = { workspace = true, optional = true }
regex = { workspace = true }
regex-syntax = "0.8.0"

Expand Down
7 changes: 3 additions & 4 deletions datafusion/optimizer/src/analyzer/subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

use crate::analyzer::check_plan;
use crate::utils::collect_subquery_cols;
use recursive::recursive;

use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::{plan_err, Result};
Expand Down Expand Up @@ -79,7 +78,7 @@ pub fn check_subquery_expr(
match outer_plan {
LogicalPlan::Projection(_)
| LogicalPlan::Filter(_) => Ok(()),
LogicalPlan::Aggregate(Aggregate {group_expr, aggr_expr,..}) => {
LogicalPlan::Aggregate(Aggregate { group_expr, aggr_expr, .. }) => {
if group_expr.contains(expr) && !aggr_expr.contains(expr) {
// TODO revisit this validation logic
plan_err!(
Expand All @@ -88,7 +87,7 @@ pub fn check_subquery_expr(
} else {
Ok(())
}
},
}
_ => plan_err!(
"Correlated scalar subquery can only be used in Projection, Filter, Aggregate plan nodes"
)
Expand Down Expand Up @@ -129,7 +128,7 @@ fn check_correlations_in_subquery(inner_plan: &LogicalPlan) -> Result<()> {
}

// Recursively check the unsupported outer references in the sub query plan.
#[recursive]
#[cfg_attr(feature = "recursive-protection", recursive::recursive)]
fn check_inner_plan(inner_plan: &LogicalPlan, can_contain_outer_ref: bool) -> Result<()> {
if !can_contain_outer_ref && inner_plan.contains_outer_reference() {
return plan_err!("Accessing outer reference columns is not allowed in the plan");
Expand Down
5 changes: 2 additions & 3 deletions datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use std::fmt::Debug;
use std::sync::Arc;

use crate::{OptimizerConfig, OptimizerRule};
use recursive::recursive;

use crate::optimizer::ApplyOrder;
use crate::utils::NamePreserver;
Expand Down Expand Up @@ -532,7 +531,7 @@ impl OptimizerRule for CommonSubexprEliminate {
None
}

#[recursive]
#[cfg_attr(feature = "recursive-protection", recursive::recursive)]
fn rewrite(
&self,
plan: LogicalPlan,
Expand Down Expand Up @@ -951,7 +950,7 @@ mod test {
)?
.build()?;

let expected ="Aggregate: groupBy=[[]], aggr=[[avg(__common_expr_1) AS col1, my_agg(__common_expr_1) AS col2]]\
let expected = "Aggregate: groupBy=[[]], aggr=[[avg(__common_expr_1) AS col1, my_agg(__common_expr_1) AS col2]]\
\n Projection: UInt32(1) + test.a AS __common_expr_1, test.a, test.b, test.c\
\n TableScan: test";

Expand Down
21 changes: 10 additions & 11 deletions datafusion/optimizer/src/eliminate_cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

//! [`EliminateCrossJoin`] converts `CROSS JOIN` to `INNER JOIN` if join predicates are available.
use crate::{OptimizerConfig, OptimizerRule};
use recursive::recursive;
use std::sync::Arc;

use crate::join_key_set::JoinKeySet;
Expand Down Expand Up @@ -80,7 +79,7 @@ impl OptimizerRule for EliminateCrossJoin {
true
}

#[recursive]
#[cfg_attr(feature = "recursive-protection", recursive::recursive)]
fn rewrite(
&self,
plan: LogicalPlan,
Expand Down Expand Up @@ -651,7 +650,7 @@ mod tests {
" Inner Join: t1.a = t2.a [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]",
" Inner Join: t1.a = t3.a [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]",
" TableScan: t1 [a:UInt32, b:UInt32, c:UInt32]",
" TableScan: t3 [a:UInt32, b:UInt32, c:UInt32]",
" TableScan: t3 [a:UInt32, b:UInt32, c:UInt32]",
" TableScan: t2 [a:UInt32, b:UInt32, c:UInt32]"
];

Expand Down Expand Up @@ -1237,10 +1236,10 @@ mod tests {
.build()?;

let expected = vec![
"Filter: t1.a + UInt32(100) = t2.a * UInt32(2) OR t2.b = t1.a [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]",
" Cross Join: [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]",
" TableScan: t1 [a:UInt32, b:UInt32, c:UInt32]",
" TableScan: t2 [a:UInt32, b:UInt32, c:UInt32]",
"Filter: t1.a + UInt32(100) = t2.a * UInt32(2) OR t2.b = t1.a [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]",
" Cross Join: [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]",
" TableScan: t1 [a:UInt32, b:UInt32, c:UInt32]",
" TableScan: t2 [a:UInt32, b:UInt32, c:UInt32]",
];

assert_optimized_plan_eq(plan, expected);
Expand Down Expand Up @@ -1293,10 +1292,10 @@ mod tests {
.build()?;

let expected = vec![
"Filter: t2.c < UInt32(15) OR t2.c = UInt32(688) [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]",
" Inner Join: t1.a + UInt32(100) = t2.a * UInt32(2) [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]",
" TableScan: t1 [a:UInt32, b:UInt32, c:UInt32]",
" TableScan: t2 [a:UInt32, b:UInt32, c:UInt32]",
"Filter: t2.c < UInt32(15) OR t2.c = UInt32(688) [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]",
" Inner Join: t1.a + UInt32(100) = t2.a * UInt32(2) [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this was caused by cargo fmt when I've added feature above

" TableScan: t1 [a:UInt32, b:UInt32, c:UInt32]",
" TableScan: t2 [a:UInt32, b:UInt32, c:UInt32]",
];

assert_optimized_plan_eq(plan, expected);
Expand Down
3 changes: 1 addition & 2 deletions datafusion/optimizer/src/optimize_projections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ mod required_indices;

use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};
use recursive::recursive;
use std::collections::HashSet;
use std::sync::Arc;

Expand Down Expand Up @@ -110,7 +109,7 @@ impl OptimizerRule for OptimizeProjections {
/// columns.
/// - `Ok(None)`: Signal that the given logical plan did not require any change.
/// - `Err(error)`: An error occurred during the optimization process.
#[recursive]
#[cfg_attr(feature = "recursive-protection", recursive::recursive)]
fn optimize_projections(
plan: LogicalPlan,
config: &dyn OptimizerConfig,
Expand Down
6 changes: 5 additions & 1 deletion datafusion/physical-optimizer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ rust-version = { workspace = true }
[lints]
workspace = true

[features]
default = ["recursive-protection"]
recursive-protection = ["dep:recursive"]

[dependencies]
arrow = { workspace = true }
datafusion-common = { workspace = true, default-features = true }
Expand All @@ -40,7 +44,7 @@ datafusion-physical-expr = { workspace = true }
datafusion-physical-plan = { workspace = true }
itertools = { workspace = true }
log = { workspace = true }
recursive = { workspace = true }
recursive = { workspace = true, optional = true }

[dev-dependencies]
datafusion-expr = { workspace = true }
Expand Down
3 changes: 1 addition & 2 deletions datafusion/physical-optimizer/src/aggregate_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
use datafusion_physical_plan::projection::ProjectionExec;
use datafusion_physical_plan::udaf::{AggregateFunctionExpr, StatisticsArgs};
use datafusion_physical_plan::{expressions, ExecutionPlan};
use recursive::recursive;
use std::sync::Arc;

use crate::PhysicalOptimizerRule;
Expand All @@ -42,7 +41,7 @@ impl AggregateStatistics {
}

impl PhysicalOptimizerRule for AggregateStatistics {
#[recursive]
#[cfg_attr(feature = "recursive-protection", recursive::recursive)]
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
Expand Down
Loading
Loading