diff --git a/velox/exec/fuzzer/JoinFuzzer.cpp b/velox/exec/fuzzer/JoinFuzzer.cpp index 23b28de70682..7eed6d0a3a65 100644 --- a/velox/exec/fuzzer/JoinFuzzer.cpp +++ b/velox/exec/fuzzer/JoinFuzzer.cpp @@ -155,21 +155,19 @@ class JoinFuzzer { const std::vector& filterList); JoinFuzzer::PlanWithSplits makeMergeJoinPlan( - core::JoinType joinType, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeInput, - const std::vector& buildInput, - const std::vector& outputColumns, - const std::string& filter); + const std::vector& joinTypes, + const std::vector>& probeKeysList, + const std::vector>& buildKeysList, + const std::vector>& inputs, + const std::vector>& outputColumnsList, + const std::vector& filterList); // Returns a PlanWithSplits for NestedLoopJoin with inputs from Values nodes. JoinFuzzer::PlanWithSplits makeNestedLoopJoinPlan( - core::JoinType joinType, - const std::vector& probeInput, - const std::vector& buildInput, - const std::vector& outputColumns, - const std::string& joinCondition); + const std::vector& joinTypes, + const std::vector>& inputs, + const std::vector>& outputColumnsList, + const std::vector& joinConditionList); // Makes the default query plan with table scan as inputs for all of the // inputs. @@ -848,46 +846,65 @@ void addFlippedJoinPlan( } JoinFuzzer::PlanWithSplits JoinFuzzer::makeMergeJoinPlan( - core::JoinType joinType, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeInput, - const std::vector& buildInput, - const std::vector& outputColumns, - const std::string& filter) { + const std::vector& joinTypes, + const std::vector>& probeKeysList, + const std::vector>& buildKeysList, + const std::vector>& inputs, + const std::vector>& outputColumnsList, + const std::vector& filterList) { + VELOX_CHECK(inputs.size() > 1); auto planNodeIdGenerator = std::make_shared(); - return JoinFuzzer::PlanWithSplits{PlanBuilder(planNodeIdGenerator) - .values(probeInput) - .orderBy(probeKeys, false) - .mergeJoin( - probeKeys, - buildKeys, - PlanBuilder(planNodeIdGenerator) - .values(buildInput) - .orderBy(buildKeys, false) - .planNode(), - filter, - outputColumns, - joinType) - .planNode()}; + PlanBuilder plan = PlanBuilder(planNodeIdGenerator) + .values(inputs[0]) + .orderBy(probeKeysList[0], false) + .mergeJoin( + probeKeysList[0], + buildKeysList[0], + PlanBuilder(planNodeIdGenerator) + .values(inputs[1]) + .orderBy(buildKeysList[0], false) + .planNode(), + filterList[0], + outputColumnsList[0], + joinTypes[0]); + for (auto i = 1; i < inputs.size() - 1; i++) { + plan = plan.mergeJoin( + probeKeysList[i], + buildKeysList[i], + PlanBuilder(planNodeIdGenerator) + .values(inputs[i + 1]) + .orderBy(buildKeysList[i], false) + .planNode(), + filterList[i], + outputColumnsList[i], + joinTypes[i]); + } + return PlanWithSplits{plan.planNode()}; } JoinFuzzer::PlanWithSplits JoinFuzzer::makeNestedLoopJoinPlan( - core::JoinType joinType, - const std::vector& probeInput, - const std::vector& buildInput, - const std::vector& outputColumns, - const std::string& joinCondition) { + const std::vector& joinTypes, + const std::vector>& inputs, + const std::vector>& outputColumnsList, + const std::vector& joinConditionList) { + VELOX_CHECK(inputs.size() > 1); auto planNodeIdGenerator = std::make_shared(); - return JoinFuzzer::PlanWithSplits{ + PlanBuilder plan = PlanBuilder(planNodeIdGenerator) - .values(probeInput) + .values(inputs[0]) .nestedLoopJoin( - PlanBuilder(planNodeIdGenerator).values(buildInput).planNode(), - joinCondition, - outputColumns, - joinType) - .planNode()}; + PlanBuilder(planNodeIdGenerator).values(inputs[1]).planNode(), + joinConditionList[0], + outputColumnsList[0], + joinTypes[0]); + for (auto i = 1; i < inputs.size() - 1; i++) { + plan = plan.nestedLoopJoin( + PlanBuilder(planNodeIdGenerator).values(inputs[i + 1]).planNode(), + joinConditionList[i], + outputColumnsList[i], + joinTypes[i]); + } + return PlanWithSplits{plan.planNode()}; } void JoinFuzzer::makeAlternativePlans( @@ -930,13 +947,12 @@ void JoinFuzzer::makeAlternativePlans( // Use OrderBy + MergeJoin if (core::MergeJoinNode::isSupported(joinNode->joinType())) { auto planWithSplits = makeMergeJoinPlan( - joinType, - probeKeys, - buildKeys, - probeInput, - buildInput, - outputColumns, - filter); + {joinType}, + {probeKeys}, + {buildKeys}, + {probeInput, buildInput}, + {outputColumns}, + {filter}); plans.push_back(planWithSplits); addFlippedJoinPlan(planWithSplits, plans); @@ -949,7 +965,7 @@ void JoinFuzzer::makeAlternativePlans( : fmt::format( "{} AND {}", makeJoinFilter(probeKeys, buildKeys), filter); auto planWithSplits = makeNestedLoopJoinPlan( - joinType, probeInput, buildInput, outputColumns, joinCondition); + {joinType}, {probeInput, buildInput}, {outputColumns}, {joinCondition}); plans.push_back(planWithSplits); addFlippedJoinPlan(planWithSplits, plans); @@ -992,11 +1008,10 @@ RowVectorPtr JoinFuzzer::testCrossProduct( ->names(); auto plan = makeNestedLoopJoinPlan( - joinType, - probeInput, - buildInput, - outputColumns, - /*filter=*/""); + {joinType}, + {probeInput, buildInput}, + {outputColumns}, + /*filterList=*/{""}); const auto expected = execute(plan, /*injectSpill=*/false); // If OOM injection is not enabled verify the results against Reference query