Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce LogicalPlan invariants, begin automatically checking them #13651

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
6d43dc2
minor(13525): perform LP validation before and after each possible mu…
wiedld Dec 4, 2024
a855811
minor(13525): validate unique field names on query and subquery schem…
wiedld Dec 4, 2024
0163a40
minor(13525): validate union after each optimizer passes
wiedld Dec 4, 2024
bee7e92
refactor: make explicit what is an invariant of the logical plan, ver…
wiedld Dec 16, 2024
4eee9c4
chore: add link to invariant docs
wiedld Dec 16, 2024
a7d9770
fix: add new invariants module
wiedld Dec 16, 2024
72718ad
Merge branch 'main' into 13525/invariant-checking-for-implicit-LP-cha…
wiedld Dec 17, 2024
2002b1a
refactor: move all LP invariant checking into LP, delineate executabl…
wiedld Dec 17, 2024
fbc9c46
test: update test for slight error message change
wiedld Dec 17, 2024
e52187e
fix: push_down_filter optimization pass can push a IN(<subquery>) int…
wiedld Dec 17, 2024
ba26f13
Merge branch 'main' into 13525/invariant-checking-for-implicit-LP-cha…
wiedld Dec 23, 2024
ad1a1f8
refactor: move collect_subquery_cols() to common utils crate
wiedld Dec 23, 2024
1164a7b
refactor: clarify the purpose of assert_valid_optimization(), runs af…
wiedld Dec 23, 2024
7ad0b74
refactor: based upon performance tests, run the maximum number of che…
wiedld Dec 24, 2024
911d4b8
chore: update error naming and terminology used in code comments
wiedld Dec 24, 2024
810246d
refactor: use proper error methods
wiedld Dec 24, 2024
9842d19
chore: more cleanup of error messages
wiedld Dec 24, 2024
00700ae
Merge branch 'main' into 13525/invariant-checking-for-implicit-LP-cha…
wiedld Dec 25, 2024
9bca470
chore: handle option trailer to error message
wiedld Dec 25, 2024
529ac3e
test: update sqllogictests tests to not use multiline
wiedld Dec 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions datafusion/expr/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ mod ddl;
pub mod display;
pub mod dml;
mod extension;
pub(crate) mod invariants;
wiedld marked this conversation as resolved.
Show resolved Hide resolved
pub use invariants::assert_expected_schema;
mod plan;
mod statement;
pub mod tree_node;
Expand Down
8 changes: 8 additions & 0 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::hash::{Hash, Hasher};
use std::sync::{Arc, OnceLock};

use super::dml::CopyTo;
use super::invariants::assert_unique_field_names;
use super::DdlStatement;
use crate::builder::{change_redundant_column, unnest_with_options};
use crate::expr::{Placeholder, Sort as SortExpr, WindowFunction};
Expand Down Expand Up @@ -1125,6 +1126,13 @@ impl LogicalPlan {
}
}

/// These are invariants to hold true for each logical plan.
wiedld marked this conversation as resolved.
Show resolved Hide resolved
pub fn assert_invariants(&self) -> Result<()> {
assert_unique_field_names(self)?;

wiedld marked this conversation as resolved.
Show resolved Hide resolved
Ok(())
}

/// Helper for [Self::with_new_exprs] to use when no expressions are expected.
#[inline]
#[allow(clippy::needless_pass_by_value)] // expr is moved intentionally to ensure it's not used again
Expand Down
36 changes: 32 additions & 4 deletions datafusion/optimizer/src/analyzer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,14 @@ impl Analyzer {
where
F: FnMut(&LogicalPlan, &dyn AnalyzerRule),
{
// verify at the start, before the first LP analyzer pass.
assert_valid_semantic_plan(&plan).map_err(|e| {
DataFusionError::Context(
"check_plan_before_analyzers".to_string(),
Box::new(e),
)
})?;

let start_time = Instant::now();
let mut new_plan = plan;

Expand Down Expand Up @@ -164,18 +172,38 @@ impl Analyzer {
log_plan(rule.name(), &new_plan);
observer(&new_plan, rule.as_ref());
}
// for easier display in explain output
check_plan(&new_plan).map_err(|e| {

// verify at the end, after the last LP analyzer pass.
assert_valid_semantic_plan(&new_plan).map_err(|e| {
DataFusionError::Context("check_analyzed_plan".to_string(), Box::new(e))
})?;

log_plan("Final analyzed plan", &new_plan);
debug!("Analyzer took {} ms", start_time.elapsed().as_millis());
Ok(new_plan)
}
}

/// Do necessary check and fail the invalid plan
fn check_plan(plan: &LogicalPlan) -> Result<()> {
/// These are invariants which should hold true before and after each analyzer.
///
/// This differs from [`LogicalPlan::assert_invariants`], which addresses if a singular
/// LogicalPlan is valid. Instead this address if the analyzer (before and after)
/// is valid based upon permitted changes.
///
/// Does not check elements which are mutated by the analyzers (e.g. the schema).
fn assert_valid_semantic_plan(plan: &LogicalPlan) -> Result<()> {
plan.assert_invariants()?;

// TODO: should this be moved to LogicalPlan::assert_invariants?
assert_subqueries_are_valid(plan)?;
wiedld marked this conversation as resolved.
Show resolved Hide resolved

Ok(())
}

/// Asserts that the subqueries are structured properly with valid node placement.
///
/// Refer to [`check_subquery_expr`] for more details.
fn assert_subqueries_are_valid(plan: &LogicalPlan) -> Result<()> {
plan.apply_with_subqueries(|plan: &LogicalPlan| {
plan.apply_expressions(|expr| {
// recursively look for subqueries
Expand Down
4 changes: 2 additions & 2 deletions datafusion/optimizer/src/analyzer/subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::analyzer::check_plan;
use crate::analyzer::assert_subqueries_are_valid;
use crate::utils::collect_subquery_cols;
use recursive::recursive;

Expand All @@ -37,7 +37,7 @@ pub fn check_subquery_expr(
inner_plan: &LogicalPlan,
expr: &Expr,
) -> Result<()> {
check_plan(inner_plan)?;
assert_subqueries_are_valid(inner_plan)?;
if let Expr::ScalarSubquery(subquery) = expr {
// Scalar subquery should only return one column
if subquery.subquery.schema().fields().len() > 1 {
Expand Down
60 changes: 38 additions & 22 deletions datafusion/optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::fmt::Debug;
use std::sync::Arc;

use chrono::{DateTime, Utc};
use datafusion_expr::assert_expected_schema;
use datafusion_expr::registry::FunctionRegistry;
use log::{debug, warn};

Expand Down Expand Up @@ -355,6 +356,14 @@ impl Optimizer {
where
F: FnMut(&LogicalPlan, &dyn OptimizerRule),
{
// verify LP is valid, before the first LP optimizer pass.
plan.assert_invariants().map_err(|e| {
DataFusionError::Context(
"check_plan_before_optimizers".to_string(),
Box::new(e),
)
})?;

let start_time = Instant::now();
let options = config.options();
let mut new_plan = plan;
Expand Down Expand Up @@ -384,9 +393,16 @@ impl Optimizer {
// rule handles recursion itself
None => optimize_plan_node(new_plan, rule.as_ref(), config),
}
// verify the rule didn't change the schema
.and_then(|tnr| {
assert_schema_is_the_same(rule.name(), &starting_schema, &tnr.data)?;
// verify after each optimizer pass.
assert_valid_optimization(rule.name(), &tnr.data, &starting_schema)
wiedld marked this conversation as resolved.
Show resolved Hide resolved
.map_err(|e| {
DataFusionError::Context(
"check_optimized_plan".to_string(),
Box::new(e),
)
})?;

Ok(tnr)
});

Expand Down Expand Up @@ -445,35 +461,35 @@ impl Optimizer {
}
i += 1;
}

// verify LP is valid, after the last optimizer pass.
wiedld marked this conversation as resolved.
Show resolved Hide resolved
new_plan.assert_invariants().map_err(|e| {
DataFusionError::Context(
"check_plan_after_optimizers".to_string(),
Box::new(e),
)
})?;

log_plan("Final optimized plan", &new_plan);
debug!("Optimizer took {} ms", start_time.elapsed().as_millis());
Ok(new_plan)
}
}

/// Returns an error if `new_plan`'s schema is different than `prev_schema`
/// These are invariants which should hold true before and after each optimization.
///
/// It ignores metadata and nullability.
pub(crate) fn assert_schema_is_the_same(
/// This differs from [`LogicalPlan::assert_invariants`], which addresses if a singular
/// LogicalPlan is valid. Instead this address if the optimization (before and after)
/// is valid based upon permitted changes.
fn assert_valid_optimization(
wiedld marked this conversation as resolved.
Show resolved Hide resolved
rule_name: &str,
prev_schema: &DFSchema,
new_plan: &LogicalPlan,
plan: &LogicalPlan,
prev_schema: &Arc<DFSchema>,
) -> Result<()> {
let equivalent = new_plan.schema().equivalent_names_and_types(prev_schema);

if !equivalent {
let e = DataFusionError::Internal(format!(
"Failed due to a difference in schemas, original schema: {:?}, new schema: {:?}",
prev_schema,
new_plan.schema()
));
Err(DataFusionError::Context(
String::from(rule_name),
Box::new(e),
))
} else {
Ok(())
}
// verify invariant: optimizer rule didn't change the schema
assert_expected_schema(rule_name, prev_schema, plan)?;

Ok(())
}

#[cfg(test)]
Expand Down
Loading