From 76d68732b643f8cdb3df9f0020eb4a7f42365ce9 Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Tue, 24 Dec 2024 14:21:46 +0300 Subject: [PATCH] Update join_selection.rs --- .../src/physical_optimizer/join_selection.rs | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs index 009757f3a938..7b977899b05c 100644 --- a/datafusion/core/src/physical_optimizer/join_selection.rs +++ b/datafusion/core/src/physical_optimizer/join_selection.rs @@ -61,7 +61,7 @@ impl JoinSelection { // TODO: We need some performance test for Right Semi/Right Join swap to Left Semi/Left Join in case that the right side is smaller but not much smaller. // TODO: In PrestoSQL, the optimizer flips join sides only if one side is much smaller than the other by more than SIZE_DIFFERENCE_THRESHOLD times, by default is 8 times. /// Checks statistics for join swap. -fn should_swap_join_order( +pub(crate) fn should_swap_join_order( left: &dyn ExecutionPlan, right: &dyn ExecutionPlan, ) -> Result { @@ -108,7 +108,7 @@ fn supports_collect_by_thresholds( } /// Predicate that checks whether the given join type supports input swapping. -fn supports_swap(join_type: JoinType) -> bool { +pub(crate) fn supports_swap(join_type: JoinType) -> bool { matches!( join_type, JoinType::Inner @@ -176,7 +176,7 @@ fn swap_join_projection( /// This function swaps the inputs of the given join operator. /// This function is public so other downstream projects can use it /// to construct `HashJoinExec` with right side as the build side. -pub fn swap_hash_join( +pub(crate) fn swap_hash_join( hash_join: &HashJoinExec, partition_mode: PartitionMode, ) -> Result> { @@ -222,7 +222,7 @@ pub fn swap_hash_join( } /// Swaps inputs of `NestedLoopJoinExec` and wraps it into `ProjectionExec` is required -fn swap_nl_join(join: &NestedLoopJoinExec) -> Result> { +pub(crate) fn swap_nl_join(join: &NestedLoopJoinExec) -> Result> { let new_filter = swap_join_filter(join.filter()); let new_join_type = &swap_join_type(*join.join_type()); @@ -359,7 +359,7 @@ impl PhysicalOptimizerRule for JoinSelection { /// `CollectLeft` mode is applicable. Otherwise, it will try to swap the join sides. /// When the `ignore_threshold` is false, this function will also check left /// and right sizes in bytes or rows. -fn try_collect_left( +pub(crate) fn try_collect_left( hash_join: &HashJoinExec, ignore_threshold: bool, threshold_byte_size: usize, @@ -421,7 +421,14 @@ fn try_collect_left( } } -fn partitioned_hash_join(hash_join: &HashJoinExec) -> Result> { +/// Creates a partitioned hash join execution plan, swapping inputs if beneficial. +/// +/// Checks if the join order should be swapped based on the join type and input statistics. +/// If swapping is optimal and supported, creates a swapped partitioned hash join; otherwise, +/// creates a standard partitioned hash join. +pub(crate) fn partitioned_hash_join( + hash_join: &HashJoinExec, +) -> Result> { let left = hash_join.left(); let right = hash_join.right(); if supports_swap(*hash_join.join_type()) && should_swap_join_order(&**left, &**right)?