diff --git a/datafusion/optimizer/src/analyzer/subquery.rs b/datafusion/expr/src/logical_plan/invariants.rs similarity index 78% rename from datafusion/optimizer/src/analyzer/subquery.rs rename to datafusion/expr/src/logical_plan/invariants.rs index 7129da85f375..bde4acaae562 100644 --- a/datafusion/optimizer/src/analyzer/subquery.rs +++ b/datafusion/expr/src/logical_plan/invariants.rs @@ -15,14 +15,98 @@ // specific language governing permissions and limitations // under the License. -use crate::analyzer::check_plan; -use crate::utils::collect_subquery_cols; +use datafusion_common::{ + internal_err, plan_err, + tree_node::{TreeNode, TreeNodeRecursion}, + DFSchemaRef, Result, +}; -use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; -use datafusion_common::{plan_err, Result}; -use datafusion_expr::expr_rewriter::strip_outer_reference; -use datafusion_expr::utils::split_conjunction; -use datafusion_expr::{Aggregate, Expr, Filter, Join, JoinType, LogicalPlan, Window}; +use crate::{ + expr::{Exists, InSubquery}, + expr_rewriter::strip_outer_reference, + utils::{collect_subquery_cols, split_conjunction}, + Aggregate, Expr, Filter, Join, JoinType, LogicalPlan, Window, +}; + +pub enum InvariantLevel { + /// Invariants that are always true in DataFusion `LogicalPlan`s + /// such as the number of expected children and no duplicated output fields + Always, + /// Invariants that must hold true for the plan to be "executable" + /// such as the type and number of function arguments are correct and + /// that wildcards have been expanded + /// + /// To ensure a LogicalPlan satisfies the `Executable` invariants, run the + /// `Analyzer` + Executable, +} + +pub fn assert_always_invariants(plan: &LogicalPlan) -> Result<()> { + // Refer to + assert_unique_field_names(plan)?; + + Ok(()) +} + +pub fn assert_executable_invariants(plan: &LogicalPlan) -> Result<()> { + assert_always_invariants(plan)?; + assert_valid_semantic_plan(plan)?; + Ok(()) +} + +/// Returns an error if plan, and subplans, do not have unique fields. +/// +/// This invariant is subject to change. +/// refer: +fn assert_unique_field_names(plan: &LogicalPlan) -> Result<()> { + plan.schema().check_names() +} + +/// Returns an error if the plan is not sematically valid. +fn assert_valid_semantic_plan(plan: &LogicalPlan) -> Result<()> { + assert_subqueries_are_valid(plan)?; + + Ok(()) +} + +/// Returns an error if the plan does not have the expected schema. +/// Ignores metadata and nullability. +pub fn assert_expected_schema(schema: &DFSchemaRef, plan: &LogicalPlan) -> Result<()> { + let equivalent = plan.schema().equivalent_names_and_types(schema); + + if !equivalent { + internal_err!( + "Failed due to a difference in schemas, original schema: {:?}, new schema: {:?}", + schema, + plan.schema() + ) + } else { + Ok(()) + } +} + +/// Asserts that the subqueries are structured properly with valid node placement. +/// +/// Refer to [`check_subquery_expr`] for more details. +fn assert_subqueries_are_valid(plan: &LogicalPlan) -> Result<()> { + plan.apply_with_subqueries(|plan: &LogicalPlan| { + plan.apply_expressions(|expr| { + // recursively look for subqueries + expr.apply(|expr| { + match expr { + Expr::Exists(Exists { subquery, .. }) + | Expr::InSubquery(InSubquery { subquery, .. }) + | Expr::ScalarSubquery(subquery) => { + check_subquery_expr(plan, &subquery.subquery, expr)?; + } + _ => {} + }; + Ok(TreeNodeRecursion::Continue) + }) + }) + }) + .map(|_| ()) +} /// Do necessary check on subquery expressions and fail the invalid plan /// 1) Check whether the outer plan is in the allowed outer plans list to use subquery expressions, @@ -36,7 +120,7 @@ pub fn check_subquery_expr( inner_plan: &LogicalPlan, expr: &Expr, ) -> Result<()> { - check_plan(inner_plan)?; + assert_subqueries_are_valid(inner_plan)?; if let Expr::ScalarSubquery(subquery) = expr { // Scalar subquery should only return one column if subquery.subquery.schema().fields().len() > 1 { @@ -108,12 +192,13 @@ pub fn check_subquery_expr( match outer_plan { LogicalPlan::Projection(_) | LogicalPlan::Filter(_) + | LogicalPlan::TableScan(_) | LogicalPlan::Window(_) | LogicalPlan::Aggregate(_) | LogicalPlan::Join(_) => Ok(()), _ => plan_err!( "In/Exist subquery can only be used in \ - Projection, Filter, Window functions, Aggregate and Join plan nodes, \ + Projection, Filter, TableScan, Window functions, Aggregate and Join plan nodes, \ but was used in [{}]", outer_plan.display() ), @@ -285,8 +370,8 @@ mod test { use std::cmp::Ordering; use std::sync::Arc; + use crate::{Extension, UserDefinedLogicalNodeCore}; use datafusion_common::{DFSchema, DFSchemaRef}; - use datafusion_expr::{Extension, UserDefinedLogicalNodeCore}; use super::*; diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs index 5d613d4e80db..404941378663 100644 --- a/datafusion/expr/src/logical_plan/mod.rs +++ b/datafusion/expr/src/logical_plan/mod.rs @@ -20,6 +20,8 @@ mod ddl; pub mod display; pub mod dml; mod extension; +pub(crate) mod invariants; +pub use invariants::{assert_expected_schema, check_subquery_expr, InvariantLevel}; mod plan; mod statement; pub mod tree_node; diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 47d9aac3caf2..cc922709c8a9 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -24,6 +24,9 @@ use std::hash::{Hash, Hasher}; use std::sync::{Arc, LazyLock}; use super::dml::CopyTo; +use super::invariants::{ + assert_always_invariants, assert_executable_invariants, InvariantLevel, +}; use super::DdlStatement; use crate::builder::{change_redundant_column, unnest_with_options}; use crate::expr::{Placeholder, Sort as SortExpr, WindowFunction}; @@ -1127,6 +1130,14 @@ impl LogicalPlan { } } + /// checks that the plan conforms to the listed invariant level, returning an Error if not + pub fn check_invariants(&self, check: InvariantLevel) -> Result<()> { + match check { + InvariantLevel::Always => assert_always_invariants(self), + InvariantLevel::Executable => assert_executable_invariants(self), + } + } + /// Helper for [Self::with_new_exprs] to use when no expressions are expected. #[inline] #[allow(clippy::needless_pass_by_value)] // expr is moved intentionally to ensure it's not used again diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 9d0a2b5b95f6..b1e36e02925b 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -18,7 +18,7 @@ //! Expression utilities use std::cmp::Ordering; -use std::collections::HashSet; +use std::collections::{BTreeSet, HashSet}; use std::ops::Deref; use std::sync::Arc; @@ -1402,6 +1402,24 @@ pub fn format_state_name(name: &str, state_name: &str) -> String { format!("{name}[{state_name}]") } +/// Determine the set of [`Column`]s produced by the subquery. +pub fn collect_subquery_cols( + exprs: &[Expr], + subquery_schema: &DFSchema, +) -> Result> { + exprs.iter().try_fold(BTreeSet::new(), |mut cols, expr| { + let mut using_cols: Vec = vec![]; + for col in expr.column_refs().into_iter() { + if subquery_schema.has_column(col) { + using_cols.push(col.clone()); + } + } + + cols.extend(using_cols); + Result::<_>::Ok(cols) + }) +} + #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/optimizer/src/analyzer/mod.rs b/datafusion/optimizer/src/analyzer/mod.rs index f2fd61dfa806..9d0ac6b54cf4 100644 --- a/datafusion/optimizer/src/analyzer/mod.rs +++ b/datafusion/optimizer/src/analyzer/mod.rs @@ -24,18 +24,14 @@ use log::debug; use datafusion_common::config::ConfigOptions; use datafusion_common::instant::Instant; -use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; -use datafusion_common::{DataFusionError, Result}; -use datafusion_expr::expr::Exists; -use datafusion_expr::expr::InSubquery; +use datafusion_common::Result; use datafusion_expr::expr_rewriter::FunctionRewrite; -use datafusion_expr::{Expr, LogicalPlan}; +use datafusion_expr::{InvariantLevel, LogicalPlan}; use crate::analyzer::count_wildcard_rule::CountWildcardRule; use crate::analyzer::expand_wildcard_rule::ExpandWildcardRule; use crate::analyzer::inline_table_scan::InlineTableScan; use crate::analyzer::resolve_grouping_function::ResolveGroupingFunction; -use crate::analyzer::subquery::check_subquery_expr; use crate::analyzer::type_coercion::TypeCoercion; use crate::utils::log_plan; @@ -46,9 +42,16 @@ pub mod expand_wildcard_rule; pub mod function_rewrite; pub mod inline_table_scan; pub mod resolve_grouping_function; -pub mod subquery; pub mod type_coercion; +pub mod subquery { + #[deprecated( + since = "44.0.0", + note = "please use `datafusion_expr::check_subquery_expr` instead" + )] + pub use datafusion_expr::check_subquery_expr; +} + /// [`AnalyzerRule`]s transform [`LogicalPlan`]s in some way to make /// the plan valid prior to the rest of the DataFusion optimization process. /// @@ -56,7 +59,7 @@ pub mod type_coercion; /// which must preserve the semantics of the `LogicalPlan`, while computing /// results in a more optimal way. /// -/// For example, an `AnalyzerRule` may resolve [`Expr`]s into more specific +/// For example, an `AnalyzerRule` may resolve [`Expr`](datafusion_expr::Expr)s into more specific /// forms such as a subquery reference, or do type coercion to ensure the types /// of operands are correct. /// @@ -140,6 +143,10 @@ impl Analyzer { where F: FnMut(&LogicalPlan, &dyn AnalyzerRule), { + // verify the logical plan required invariants at the start, before analyzer + plan.check_invariants(InvariantLevel::Always) + .map_err(|e| e.context("Invalid input plan passed to Analyzer"))?; + let start_time = Instant::now(); let mut new_plan = plan; @@ -161,39 +168,20 @@ impl Analyzer { // TODO add common rule executor for Analyzer and Optimizer for rule in rules { - new_plan = rule.analyze(new_plan, config).map_err(|e| { - DataFusionError::Context(rule.name().to_string(), Box::new(e)) - })?; + new_plan = rule + .analyze(new_plan, config) + .map_err(|e| e.context(rule.name()))?; log_plan(rule.name(), &new_plan); observer(&new_plan, rule.as_ref()); } - // for easier display in explain output - check_plan(&new_plan).map_err(|e| { - DataFusionError::Context("check_analyzed_plan".to_string(), Box::new(e)) - })?; + + // verify at the end, after the last LP analyzer pass, that the plan is executable. + new_plan + .check_invariants(InvariantLevel::Executable) + .map_err(|e| e.context("Invalid (non-executable) plan after Analyzer"))?; + log_plan("Final analyzed plan", &new_plan); debug!("Analyzer took {} ms", start_time.elapsed().as_millis()); Ok(new_plan) } } - -/// Do necessary check and fail the invalid plan -fn check_plan(plan: &LogicalPlan) -> Result<()> { - plan.apply_with_subqueries(|plan: &LogicalPlan| { - plan.apply_expressions(|expr| { - // recursively look for subqueries - expr.apply(|expr| { - match expr { - Expr::Exists(Exists { subquery, .. }) - | Expr::InSubquery(InSubquery { subquery, .. }) - | Expr::ScalarSubquery(subquery) => { - check_subquery_expr(plan, &subquery.subquery, expr)?; - } - _ => {} - }; - Ok(TreeNodeRecursion::Continue) - }) - }) - }) - .map(|_| ()) -} diff --git a/datafusion/optimizer/src/decorrelate.rs b/datafusion/optimizer/src/decorrelate.rs index b5726d999137..ee6ea08b43bf 100644 --- a/datafusion/optimizer/src/decorrelate.rs +++ b/datafusion/optimizer/src/decorrelate.rs @@ -22,7 +22,6 @@ use std::ops::Deref; use std::sync::Arc; use crate::simplify_expressions::ExprSimplifier; -use crate::utils::collect_subquery_cols; use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter, @@ -30,7 +29,9 @@ use datafusion_common::tree_node::{ use datafusion_common::{plan_err, Column, DFSchemaRef, HashMap, Result, ScalarValue}; use datafusion_expr::expr::Alias; use datafusion_expr::simplify::SimplifyContext; -use datafusion_expr::utils::{conjunction, find_join_exprs, split_conjunction}; +use datafusion_expr::utils::{ + collect_subquery_cols, conjunction, find_join_exprs, split_conjunction, +}; use datafusion_expr::{ expr, lit, BinaryExpr, Cast, EmptyRelation, Expr, FetchType, LogicalPlan, LogicalPlanBuilder, Operator, diff --git a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs index 3e5a85ea02db..a87688c1a317 100644 --- a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs +++ b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs @@ -835,7 +835,7 @@ mod tests { .build()?; // Maybe okay if the table only has a single column? - let expected = "check_analyzed_plan\ + let expected = "Invalid (non-executable) plan after Analyzer\ \ncaused by\ \nError during planning: InSubquery should only return one column, but found 4"; assert_analyzer_check_err(vec![], plan, expected); @@ -930,7 +930,7 @@ mod tests { .project(vec![col("customer.c_custkey")])? .build()?; - let expected = "check_analyzed_plan\ + let expected = "Invalid (non-executable) plan after Analyzer\ \ncaused by\ \nError during planning: InSubquery should only return one column"; assert_analyzer_check_err(vec![], plan, expected); diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index dfdd0c110c22..49bce3c1ce82 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -22,6 +22,7 @@ use std::sync::Arc; use chrono::{DateTime, Utc}; use datafusion_expr::registry::FunctionRegistry; +use datafusion_expr::{assert_expected_schema, InvariantLevel}; use log::{debug, warn}; use datafusion_common::alias::AliasGenerator; @@ -355,6 +356,10 @@ impl Optimizer { where F: FnMut(&LogicalPlan, &dyn OptimizerRule), { + // verify LP is valid, before the first LP optimizer pass. + plan.check_invariants(InvariantLevel::Executable) + .map_err(|e| e.context("Invalid input plan before LP Optimizers"))?; + let start_time = Instant::now(); let options = config.options(); let mut new_plan = plan; @@ -362,6 +367,8 @@ impl Optimizer { let mut previous_plans = HashSet::with_capacity(16); previous_plans.insert(LogicalPlanSignature::new(&new_plan)); + let starting_schema = Arc::clone(new_plan.schema()); + let mut i = 0; while i < options.optimizer.max_passes { log_plan(&format!("Optimizer input (pass {i})"), &new_plan); @@ -384,9 +391,16 @@ impl Optimizer { // rule handles recursion itself None => optimize_plan_node(new_plan, rule.as_ref(), config), } - // verify the rule didn't change the schema .and_then(|tnr| { - assert_schema_is_the_same(rule.name(), &starting_schema, &tnr.data)?; + // run checks optimizer invariant checks, per optimizer rule applied + assert_valid_optimization(&tnr.data, &starting_schema) + .map_err(|e| e.context(format!("Check optimizer-specific invariants after optimizer rule: {}", rule.name())))?; + + // run LP invariant checks only in debug mode for performance reasons + #[cfg(debug_assertions)] + tnr.data.check_invariants(InvariantLevel::Executable) + .map_err(|e| e.context(format!("Invalid (non-executable) plan after Optimizer rule: {}", rule.name())))?; + Ok(tnr) }); @@ -445,35 +459,38 @@ impl Optimizer { } i += 1; } + + // verify that the optimizer passes only mutated what was permitted. + assert_valid_optimization(&new_plan, &starting_schema).map_err(|e| { + e.context("Check optimizer-specific invariants after all passes") + })?; + + // verify LP is valid, after the last optimizer pass. + new_plan + .check_invariants(InvariantLevel::Executable) + .map_err(|e| { + e.context("Invalid (non-executable) plan after LP Optimizers") + })?; + log_plan("Final optimized plan", &new_plan); debug!("Optimizer took {} ms", start_time.elapsed().as_millis()); Ok(new_plan) } } -/// Returns an error if `new_plan`'s schema is different than `prev_schema` +/// These are invariants which should hold true before and after [`LogicalPlan`] optimization. /// -/// It ignores metadata and nullability. -pub(crate) fn assert_schema_is_the_same( - rule_name: &str, - prev_schema: &DFSchema, - new_plan: &LogicalPlan, +/// This differs from [`LogicalPlan::check_invariants`], which addresses if a singular +/// LogicalPlan is valid. Instead this address if the optimization was valid based upon permitted changes. +fn assert_valid_optimization( + plan: &LogicalPlan, + prev_schema: &Arc, ) -> Result<()> { - let equivalent = new_plan.schema().equivalent_names_and_types(prev_schema); + // verify invariant: optimizer passes should not change the schema + // Refer to + assert_expected_schema(prev_schema, plan)?; - if !equivalent { - let e = DataFusionError::Internal(format!( - "Failed due to a difference in schemas, original schema: {:?}, new schema: {:?}", - prev_schema, - new_plan.schema() - )); - Err(DataFusionError::Context( - String::from(rule_name), - Box::new(e), - )) - } else { - Ok(()) - } + Ok(()) } #[cfg(test)] @@ -527,9 +544,11 @@ mod tests { schema: Arc::new(DFSchema::empty()), }); let err = opt.optimize(plan, &config, &observe).unwrap_err(); - assert_eq!( + assert!(err.strip_backtrace().starts_with( "Optimizer rule 'get table_scan rule' failed\n\ - caused by\nget table_scan rule\ncaused by\n\ + caused by\n\ + Check optimizer-specific invariants after optimizer rule: get table_scan rule\n\ + caused by\n\ Internal error: Failed due to a difference in schemas, \ original schema: DFSchema { inner: Schema { \ fields: [], \ @@ -545,10 +564,8 @@ mod tests { ], \ metadata: {} }, \ field_qualifiers: [Some(Bare { table: \"test\" }), Some(Bare { table: \"test\" }), Some(Bare { table: \"test\" })], \ - functional_dependencies: FunctionalDependencies { deps: [] } }.\n\ - This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker", - err.strip_backtrace() - ); + functional_dependencies: FunctionalDependencies { deps: [] } }", + )); } #[test] diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs b/datafusion/optimizer/src/scalar_subquery_to_join.rs index 9e7f8eed8a25..3a8aef267be5 100644 --- a/datafusion/optimizer/src/scalar_subquery_to_join.rs +++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs @@ -731,7 +731,7 @@ mod tests { .project(vec![col("customer.c_custkey")])? .build()?; - let expected = "check_analyzed_plan\ + let expected = "Invalid (non-executable) plan after Analyzer\ \ncaused by\ \nError during planning: Scalar subquery should only return one column"; assert_analyzer_check_err(vec![], plan, expected); @@ -793,7 +793,7 @@ mod tests { .project(vec![col("customer.c_custkey")])? .build()?; - let expected = "check_analyzed_plan\ + let expected = "Invalid (non-executable) plan after Analyzer\ \ncaused by\ \nError during planning: Scalar subquery should only return one column"; assert_analyzer_check_err(vec![], plan, expected); diff --git a/datafusion/optimizer/src/utils.rs b/datafusion/optimizer/src/utils.rs index 9f325bc01b1d..39f8cf285d17 100644 --- a/datafusion/optimizer/src/utils.rs +++ b/datafusion/optimizer/src/utils.rs @@ -87,23 +87,6 @@ pub(crate) fn has_all_column_refs(expr: &Expr, schema_cols: &HashSet) -> == column_refs.len() } -pub(crate) fn collect_subquery_cols( - exprs: &[Expr], - subquery_schema: &DFSchema, -) -> Result> { - exprs.iter().try_fold(BTreeSet::new(), |mut cols, expr| { - let mut using_cols: Vec = vec![]; - for col in expr.column_refs().into_iter() { - if subquery_schema.has_column(col) { - using_cols.push(col.clone()); - } - } - - cols.extend(using_cols); - Result::<_>::Ok(cols) - }) -} - pub(crate) fn replace_qualified_name( expr: Expr, cols: &BTreeSet, diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index 027b5ca8dcfb..25fe4c7b0390 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -433,16 +433,16 @@ logical_plan 08)----------TableScan: t1 projection=[t1_int] #invalid_scalar_subquery -statement error DataFusion error: check_analyzed_plan\ncaused by\nError during planning: Scalar subquery should only return one column, but found 2: t2.t2_id, t2.t2_name +statement error DataFusion error: Invalid \(non-executable\) plan after Analyzer\ncaused by\nError during planning: Scalar subquery should only return one column, but found 2: t2.t2_id, t2.t2_name SELECT t1_id, t1_name, t1_int, (select t2_id, t2_name FROM t2 WHERE t2.t2_id = t1.t1_int) FROM t1 #subquery_not_allowed #In/Exist Subquery is not allowed in ORDER BY clause. -statement error DataFusion error: check_analyzed_plan\ncaused by\nError during planning: In/Exist subquery can only be used in Projection, Filter, Window functions, Aggregate and Join plan nodes, but was used in \[Sort: t1.t1_int IN \(\) ASC NULLS LAST\] +statement error DataFusion error: Invalid \(non-executable\) plan after Analyzer\ncaused by\nError during planning: In/Exist subquery can only be used in Projection, Filter, TableScan, Window functions, Aggregate and Join plan nodes, but was used in \[Sort: t1.t1_int IN \(\) ASC NULLS LAST\] SELECT t1_id, t1_name, t1_int FROM t1 order by t1_int in (SELECT t2_int FROM t2 WHERE t1.t1_id > t1.t1_int) #non_aggregated_correlated_scalar_subquery -statement error DataFusion error: check_analyzed_plan\ncaused by\nError during planning: Correlated scalar subquery must be aggregated to return at most one row +statement error DataFusion error: Invalid \(non-executable\) plan after Analyzer\ncaused by\nError during planning: Correlated scalar subquery must be aggregated to return at most one row SELECT t1_id, (SELECT t2_int FROM t2 WHERE t2.t2_int = t1.t1_int) as t2_int from t1 #non_aggregated_correlated_scalar_subquery_unique @@ -456,11 +456,11 @@ SELECT t1_id, (SELECT t3_int FROM t3 WHERE t3.t3_id = t1.t1_id) as t3_int from t #non_aggregated_correlated_scalar_subquery -statement error DataFusion error: check_analyzed_plan\ncaused by\nError during planning: Correlated scalar subquery must be aggregated to return at most one row +statement error DataFusion error: Invalid \(non-executable\) plan after Analyzer\ncaused by\nError during planning: Correlated scalar subquery must be aggregated to return at most one row SELECT t1_id, (SELECT t2_int FROM t2 WHERE t2.t2_int = t1_int group by t2_int) as t2_int from t1 #non_aggregated_correlated_scalar_subquery_with_limit -statement error DataFusion error: check_analyzed_plan\ncaused by\nError during planning: Correlated scalar subquery must be aggregated to return at most one row +statement error DataFusion error: Invalid \(non-executable\) plan after Analyzer\ncaused by\nError during planning: Correlated scalar subquery must be aggregated to return at most one row SELECT t1_id, (SELECT t2_int FROM t2 WHERE t2.t2_int = t1.t1_int limit 2) as t2_int from t1 #non_aggregated_correlated_scalar_subquery_with_single_row @@ -523,7 +523,7 @@ logical_plan 07)--TableScan: t1 projection=[t1_id] #aggregated_correlated_scalar_subquery_with_extra_group_by_columns -statement error DataFusion error: check_analyzed_plan\ncaused by\nError during planning: A GROUP BY clause in a scalar correlated subquery cannot contain non-correlated columns +statement error DataFusion error: Invalid \(non-executable\) plan after Analyzer\ncaused by\nError during planning: A GROUP BY clause in a scalar correlated subquery cannot contain non-correlated columns SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id group by t2_name) as t2_sum from t1 #support_agg_correlated_columns