Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce LogicalPlan invariants, begin automatically checking them #13651

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
6d43dc2
minor(13525): perform LP validation before and after each possible mu…
wiedld Dec 4, 2024
a855811
minor(13525): validate unique field names on query and subquery schem…
wiedld Dec 4, 2024
0163a40
minor(13525): validate union after each optimizer passes
wiedld Dec 4, 2024
bee7e92
refactor: make explicit what is an invariant of the logical plan, ver…
wiedld Dec 16, 2024
4eee9c4
chore: add link to invariant docs
wiedld Dec 16, 2024
a7d9770
fix: add new invariants module
wiedld Dec 16, 2024
72718ad
Merge branch 'main' into 13525/invariant-checking-for-implicit-LP-cha…
wiedld Dec 17, 2024
2002b1a
refactor: move all LP invariant checking into LP, delineate executabl…
wiedld Dec 17, 2024
fbc9c46
test: update test for slight error message change
wiedld Dec 17, 2024
e52187e
fix: push_down_filter optimization pass can push a IN(<subquery>) int…
wiedld Dec 17, 2024
ba26f13
Merge branch 'main' into 13525/invariant-checking-for-implicit-LP-cha…
wiedld Dec 23, 2024
ad1a1f8
refactor: move collect_subquery_cols() to common utils crate
wiedld Dec 23, 2024
1164a7b
refactor: clarify the purpose of assert_valid_optimization(), runs af…
wiedld Dec 23, 2024
7ad0b74
refactor: based upon performance tests, run the maximum number of che…
wiedld Dec 24, 2024
911d4b8
chore: update error naming and terminology used in code comments
wiedld Dec 24, 2024
810246d
refactor: use proper error methods
wiedld Dec 24, 2024
9842d19
chore: more cleanup of error messages
wiedld Dec 24, 2024
00700ae
Merge branch 'main' into 13525/invariant-checking-for-implicit-LP-cha…
wiedld Dec 25, 2024
9bca470
chore: handle option trailer to error message
wiedld Dec 25, 2024
529ac3e
test: update sqllogictests tests to not use multiline
wiedld Dec 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion datafusion/optimizer/src/analyzer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,19 @@ impl Analyzer {
where
F: FnMut(&LogicalPlan, &dyn AnalyzerRule),
{
/* current test failure:
External error: statement is expected to fail with error:
(regex) DataFusion error: check_analyzed_plan\ncaused by\nError during planning: Scalar subquery should only return one column, but found 2: t2.t2_id, t2.t2_name
but got error:
DataFusion error: Error during planning: Scalar subquery should only return one column, but found 2: t2.t2_id, t2.t2_name
[SQL] SELECT t1_id, t1_name, t1_int, (select t2_id, t2_name FROM t2 WHERE t2.t2_id = t1.t1_int) FROM t1
at test_files/subquery.slt:436
*/
// verify at the start, before the first LP analyzer pass.
// check_plan(&plan).map_err(|e| {
// DataFusionError::Context("check_plan_before_analyzers".to_string(), Box::new(e))
// })?;
wiedld marked this conversation as resolved.
Show resolved Hide resolved

let start_time = Instant::now();
let mut new_plan = plan;

Expand Down Expand Up @@ -174,7 +187,11 @@ impl Analyzer {
}
}

/// Do necessary check and fail the invalid plan
/// These are invariants to hold true for each logical plan.
/// Do necessary check and fail the invalid plan.
///
/// Checks for elements which are immutable across analyzer passes.
/// Does not check elements which are mutated by the analyzers (e.g. the schema).
fn check_plan(plan: &LogicalPlan) -> Result<()> {
plan.apply_with_subqueries(|plan: &LogicalPlan| {
plan.apply_expressions(|expr| {
Expand Down
82 changes: 79 additions & 3 deletions datafusion/optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ use std::sync::Arc;

use chrono::{DateTime, Utc};
use datafusion_expr::registry::FunctionRegistry;
use datafusion_expr::Union;
use log::{debug, warn};

use datafusion_common::alias::AliasGenerator;
use datafusion_common::config::ConfigOptions;
use datafusion_common::instant::Instant;
use datafusion_common::tree_node::{Transformed, TreeNodeRewriter};
use datafusion_common::tree_node::{Transformed, TreeNodeRecursion, TreeNodeRewriter};
use datafusion_common::{internal_err, DFSchema, DataFusionError, HashSet, Result};
use datafusion_expr::logical_plan::LogicalPlan;

Expand Down Expand Up @@ -355,6 +356,16 @@ impl Optimizer {
where
F: FnMut(&LogicalPlan, &dyn OptimizerRule),
{
// verify at the start, before the first LP optimizer pass.
check_plan("before_optimizers", &plan, Arc::clone(plan.schema())).map_err(
|e| {
DataFusionError::Context(
"check_plan_before_optimizers".to_string(),
Box::new(e),
)
},
)?;

let start_time = Instant::now();
let options = config.options();
let mut new_plan = plan;
Expand Down Expand Up @@ -384,9 +395,15 @@ impl Optimizer {
// rule handles recursion itself
None => optimize_plan_node(new_plan, rule.as_ref(), config),
}
// verify the rule didn't change the schema
.and_then(|tnr| {
assert_schema_is_the_same(rule.name(), &starting_schema, &tnr.data)?;
// verify after each optimizer pass.
check_plan(rule.name(), &tnr.data, starting_schema).map_err(|e| {
wiedld marked this conversation as resolved.
Show resolved Hide resolved
DataFusionError::Context(
"check_optimized_plan".to_string(),
Box::new(e),
)
})?;

Ok(tnr)
});

Expand Down Expand Up @@ -451,6 +468,33 @@ impl Optimizer {
}
}

/// These are invariants to hold true for each logical plan.
/// Do necessary check and fail the invalid plan.
///
/// Checks for elements which are immutable across optimizer passes.
fn check_plan(
check_name: &str,
plan: &LogicalPlan,
prev_schema: Arc<DFSchema>,
) -> Result<()> {
// verify invariant: optimizer rule didn't change the schema
assert_schema_is_the_same(check_name, &prev_schema, plan)?;

// verify invariant: fields must have unique names
assert_unique_field_names(plan)?;

/* This current fails for:
- execution::context::tests::cross_catalog_access
- at test_files/string/string.slt:46
External error: query failed: DataFusion error: Optimizer rule 'eliminate_nested_union' failed
*/
// verify invariant: equivalent schema across union inputs
// assert_unions_are_valid(check_name, plan)?;
wiedld marked this conversation as resolved.
Show resolved Hide resolved

// TODO: trait API and provide extension on the Optimizer to define own validations?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a mention of the extensibility of invariants. Options include:

  • for general invariants:
    • defined as being checked before/after each OptimizerRule, and applied here in check_plan() (or equivalent code)
    • we could provide Optimizer.invariants = Vec<Arc<dyn InvariantCheck>> for user-defined invariants
  • for invariants specific for a given OptimizerRule:
    • we could provide OptimizerRule::check_invariants() such that certain invariants are only checked for a given rule (instead of all rules)
    • for a user-defined OptimizerRule, users can also check their own invariants

Ditto for the AnalyzerRule passes. Altho I wasn't sure about how much is added complexity and planning time overhead - as @Omega359 mentions we could make it configurable (e.g. run for CI and debugging in downstream projects).

This WIP is about proposing different ideas of what we could do. 🤔

Copy link
Member

@jonahgao jonahgao Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it can be controlled through environment variables, similar to RUST_LOG or RUST_BACKTRACE. Enable it for debugging when problems are encountered or during an upgrade.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you have example use-case for user-defined plan invariants?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have some special invariants for our SortPreservingMerge replacement, ProgressiveEval (related to time ranges of parquet files) that would be great to be able to encode

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it can be controlled through environment variables, similar to RUST_LOG or RUST_BACKTRACE. Enable it for debugging when problems are encountered or during an upgrade.

We could also add it as a debug_assert! after each optimizer pass and call the real validation

  1. After analyze
  2. After all the optimizer passes

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have some special invariants for our SortPreservingMerge replacement, ProgressiveEval (related to time ranges of parquet files) that would be great to be able to encode

is this about LogicalPlan::Extension? I agree it makes sense to support validation of these if we validate the overall plan.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added this to the followup items list: #13652 (comment)

Ok(())
}

/// Returns an error if `new_plan`'s schema is different than `prev_schema`
///
/// It ignores metadata and nullability.
Expand All @@ -476,6 +520,38 @@ pub(crate) fn assert_schema_is_the_same(
}
}

/// Returns an error if plan, and subplans, do not have unique fields.
///
/// This invariant is subject to change.
/// refer: <https://github.com/apache/datafusion/issues/13525#issuecomment-2494046463>
fn assert_unique_field_names(plan: &LogicalPlan) -> Result<()> {
plan.schema().check_names()?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is check_names also called whenever creating new DFSchema?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, on every creation. But not after every merge. Which should be ok if no bug is introduced in the merge -- altho I would prefer to add the check there.


plan.apply_with_subqueries(|plan: &LogicalPlan| {
plan.schema().check_names()?;
Ok(TreeNodeRecursion::Continue)
})
.map(|_| ())
}

/// Returns an error if any union nodes are invalid.
#[allow(dead_code)]
wiedld marked this conversation as resolved.
Show resolved Hide resolved
fn assert_unions_are_valid(rule_name: &str, plan: &LogicalPlan) -> Result<()> {
plan.apply_with_subqueries(|plan: &LogicalPlan| {
if let LogicalPlan::Union(Union { schema, inputs }) = plan {
inputs.iter().try_for_each(|subplan| {
assert_schema_is_the_same(
format!("{rule_name}:union_check").as_str(),
schema,
subplan,
)
})?;
}
Ok(TreeNodeRecursion::Continue)
})
.map(|_| ())
}

#[cfg(test)]
mod tests {
use std::sync::{Arc, Mutex};
Expand Down
Loading