Skip to content

Commit

Permalink
Projection Pushdown rule and test changes (#8073)
Browse files Browse the repository at this point in the history
  • Loading branch information
berkaysynnada authored Nov 9, 2023
1 parent 4512805 commit 1c17c47
Show file tree
Hide file tree
Showing 17 changed files with 2,395 additions and 203 deletions.
1 change: 1 addition & 0 deletions datafusion/core/src/physical_optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub mod join_selection;
pub mod optimizer;
pub mod output_requirements;
pub mod pipeline_checker;
mod projection_pushdown;
pub mod pruning;
pub mod replace_with_order_preserving_variants;
mod sort_pushdown;
Expand Down
8 changes: 8 additions & 0 deletions datafusion/core/src/physical_optimizer/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use std::sync::Arc;

use super::projection_pushdown::ProjectionPushdown;
use crate::config::ConfigOptions;
use crate::physical_optimizer::aggregate_statistics::AggregateStatistics;
use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
Expand Down Expand Up @@ -107,6 +108,13 @@ impl PhysicalOptimizer {
// into an `order by max(x) limit y`. In this case it will copy the limit value down
// to the aggregation, allowing it to use only y number of accumulators.
Arc::new(TopKAggregation::new()),
// The ProjectionPushdown rule tries to push projections towards
// the sources in the execution plan. As a result of this process,
// a projection can disappear if it reaches the source providers, and
// sequential projections can merge into one. Even if these two cases
// are not present, the load of executors such as join or union will be
// reduced by narrowing their input tables.
Arc::new(ProjectionPushdown::new()),
];

Self::with_rules(rules)
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/physical_optimizer/output_requirements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,14 @@ enum RuleMode {
///
/// See [`OutputRequirements`] for more details
#[derive(Debug)]
struct OutputRequirementExec {
pub(crate) struct OutputRequirementExec {
input: Arc<dyn ExecutionPlan>,
order_requirement: Option<LexRequirement>,
dist_requirement: Distribution,
}

impl OutputRequirementExec {
fn new(
pub(crate) fn new(
input: Arc<dyn ExecutionPlan>,
requirements: Option<LexRequirement>,
dist_requirement: Distribution,
Expand All @@ -107,7 +107,7 @@ impl OutputRequirementExec {
}
}

fn input(&self) -> Arc<dyn ExecutionPlan> {
pub(crate) fn input(&self) -> Arc<dyn ExecutionPlan> {
self.input.clone()
}
}
Expand Down
Loading

0 comments on commit 1c17c47

Please sign in to comment.