Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Proposed interface for physical plan invariant checking. #13986

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

wiedld
Copy link
Contributor

@wiedld wiedld commented Jan 2, 2025

Which issue does this PR close?

Rationale for this change

The original discussion mentioned implicit changes which can cause problems when trying to upgrade Datafusion (DF). These implicit changes are often the result of how DF core components interact with user-defined extensions which add, and mutate, different plan nodes.

We previously introduced the concept of invariants, as a way to help faster isolate when an implicit change may conflict with user-defined plan extensions. A previous PR introduced the logical plan invariants. This PR introduces physical plan invariants.

What changes are included in this PR?

This WIP proposes the interface for the execution plan invariant checks. It was done a bit differently from the logical plan (LP) invariants.

The LP is a common enum with the same invokable function for checking invariants (altho the level of validation may vary). In contrast, each ExecutionPlan node is its own implementation. Therefore the approach was chosen to have the invariant checking be defined on the implementations (with a default set of invariants defined on the trait).

As with the LP invariants, the physical plan invariants are checked as part of the default planner. Also same as the LP, we have the more costly check only run in debug mode.

Are these changes tested?

Yes

Are there any user-facing changes?

User defined ExecutionPlan extension can define their own set of invariants. When a DF upgrade is failing, they can run in debug mode and have their ExecutionPlan::check_node_invariants run after each optimizer pass. For example, this can isolate if an upstream DF optimizer change has produced inputs which fails for the user's ExecutionPlan extensions.

@github-actions github-actions bot added physical-expr Physical Expressions core Core DataFusion crate labels Jan 2, 2025
/// A default set of invariants is provided in the default implementation.
/// Extension nodes can provide their own invariants.
fn check_node_invariants(&self) -> Result<()> {
// TODO
Copy link
Contributor Author

@wiedld wiedld Jan 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure what should be the default set. The SanityCheckPlan does exactly what I had been thinking:

/// The SanityCheckPlan rule rejects the following query plans:
/// 1. Invalid plans containing nodes whose order and/or distribution requirements
/// are not satisfied by their children.
/// 2. Plans that use pipeline-breaking operators on infinite input(s),
/// it is impossible to execute such queries (they will never generate output nor finish)
#[derive(Default, Debug)]
pub struct SanityCheckPlan {}

Also, I think this optimizer pass does not mutate anything and instead validates?

Copy link
Contributor Author

@wiedld wiedld Jan 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we change the SanityPlanChecker be an invariant checker instead, and then (a) run after the other optimizer rules are applied (current behavior) as well as (b) after each optimizer rule in debug mode -- would this be useful?

The added debug mode check could help isolate when a user-defined optimizer rule extension, or a user defined ExecutionPlan node, does not work well with the DF upgrade (e.g. changes in DF plan nodes or optimizer rules).

Copy link
Contributor

@ozankabak ozankabak Jan 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conceptually, sanity checking is a "more general" process -- it verifies that any two operators that exchange data (i.e. one's output feeds the other's input) are compatible. So I don't think we can "change" it to be an invariant checker, but we can extend it to also check "invariants" of each individual operator (however they are defined by an ExecutionPlan) as it traverses the plan tree.

However, we can not blindly run sanity checking after every rule. Why? Because rules have the following types regarding their input/output plan validity:

  • Some rules only take in valid plans and output valid plans (e.g. ProjectionPushdown). These are typically applied at later stages in the optimization/plan construction process.
  • Some take in invalid or valid plans, and always create valid plans (e.g. EnforceSorting and EnforceDistribution). These can be applied any time, but are typically applied in the middle of the optimization/plan construction process.
  • Some take invalid plans and yield still invalid plans (IIRC JoinSelection is this way). These are typically applied early in the optimization/plan construction process.

As of this writing, we don't have a formal cut-off point in our list of rules whereafter plans remain valid, but I suspect they do after EnforceSorting. In debug/upgrade mode, we can apply SanityCheckPlan after every rule after that point.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the logical planner we have a split between

  • AnalyzerRules that make plans Executable (e.g. by coercing types, etc)
  • OptimizerRules that don't change the plan semantics (e.g. output types are the same, etc)

It seems like maybe we could make the same separation for physical optimizer rules as well ("not yet executable") and ("read to execute"),

Some take invalid plans and yield still invalid plans (IIRC JoinSelection is this way). These are typically applied early in the optimization/plan construction process.

This was surprising to me (I am not doubting it). It looked at the other passes, and it seems there are a few others

Arc::new(OutputRequirements::new_add_mode()),
Arc::new(AggregateStatistics::new()),
// Statistics-based join selection will change the Auto mode to a real join implementation,
// like collect left, or hash join, or future sort merge join, which will influence the
// EnforceDistribution and EnforceSorting rules as they decide whether to add additional
// repartitioning and local sorting steps to meet distribution and ordering requirements.
// Therefore, it should run before EnforceDistribution and EnforceSorting.
Arc::new(JoinSelection::new()),
// The LimitedDistinctAggregation rule should be applied before the EnforceDistribution rule,
// as that rule may inject other operations in between the different AggregateExecs.
// Applying the rule early means only directly-connected AggregateExecs must be examined.
Arc::new(LimitedDistinctAggregation::new()),
// The EnforceDistribution rule is for adding essential repartitioning to satisfy distribution
// requirements. Please make sure that the whole plan tree is determined before this rule.
// This rule increases parallelism if doing so is beneficial to the physical plan; i.e. at
// least one of the operators in the plan benefits from increased parallelism.
Arc::new(EnforceDistribution::new()),

🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conceptually, sanity checking is a "more general" process -- it verifies that any two operators that exchange data (i.e. one's output feeds the other's input) are compatible. So I don't think we can "change" it to be an invariant checker, but we can extend it to also check "invariants" of each individual operator (however they are defined by an ExecutionPlan) as it traverses the plan tree.

I agree with this sentiment. It seems to me that the "SanityChecker" is verifying invariants that should be true for all nodes (regardless of what they do -- for example that the declared required input sort is the same as the produced output sort)

Thus, focusing on ExecutionPlan specific invariants might be a good first step.

Some simple invariants to start with I could imagine are:

  1. Number of inputs (e.g. that unions have more than zero inputs, for example)

…, and perform check as part of the default physical planner
pub fn check(
&mut self,
plan: &Arc<dyn ExecutionPlan>,
rule: &Arc<dyn PhysicalOptimizerRule + Send + Sync>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for API design we should not pass the rule to the invariant checker (as the checker shouldn't logically depend on the rule). Perhaps just the rule name could be passed in to help with debug messages

/// A default set of invariants is provided in the default implementation.
/// Extension nodes can provide their own invariants.
fn check_node_invariants(&self) -> Result<()> {
// TODO
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conceptually, sanity checking is a "more general" process -- it verifies that any two operators that exchange data (i.e. one's output feeds the other's input) are compatible. So I don't think we can "change" it to be an invariant checker, but we can extend it to also check "invariants" of each individual operator (however they are defined by an ExecutionPlan) as it traverses the plan tree.

I agree with this sentiment. It seems to me that the "SanityChecker" is verifying invariants that should be true for all nodes (regardless of what they do -- for example that the declared required input sort is the same as the produced output sort)

Thus, focusing on ExecutionPlan specific invariants might be a good first step.

Some simple invariants to start with I could imagine are:

  1. Number of inputs (e.g. that unions have more than zero inputs, for example)

@@ -110,6 +110,16 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
/// trait, which is implemented for all `ExecutionPlan`s.
fn properties(&self) -> &PlanProperties;

/// Returns an error if this individual node does not conform to its invariants.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps to take into account the different types of "executableness" we can use a similar enum as we did for LogicalPlans:

Then the signature might look like

    fn check_node_invariants(&self, invariant_level: InvariantLevel) -> Result<()> 
      Ok(())
    }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate physical-expr Physical Expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants