From 9cfbb2949a838eee3de612ae4d0b108aabe94804 Mon Sep 17 00:00:00 2001 From: Florent Poinsard Date: Tue, 3 Sep 2024 09:59:01 -0600 Subject: [PATCH] wip Signed-off-by: Florent Poinsard --- .../planbuilder/operators/apply_join.go | 16 --- .../vtgate/planbuilder/operators/hash_join.go | 16 --- go/vt/vtgate/planbuilder/operators/join.go | 16 --- .../vtgate/planbuilder/operators/operator.go | 16 +++ go/vt/vtgate/planbuilder/operators/phases.go | 4 +- .../planbuilder/operators/query_planning.go | 31 ++++- .../operators/subquery_planning.go | 2 +- .../planbuilder/operators/values_join.go | 120 ++++++++++++++++++ .../vtgate/planbuilder/testdata/onecase.json | 2 +- proto/query.proto | 2 +- 10 files changed, 171 insertions(+), 54 deletions(-) create mode 100644 go/vt/vtgate/planbuilder/operators/values_join.go diff --git a/go/vt/vtgate/planbuilder/operators/apply_join.go b/go/vt/vtgate/planbuilder/operators/apply_join.go index 80bf74708a8..c9be092ce89 100644 --- a/go/vt/vtgate/planbuilder/operators/apply_join.go +++ b/go/vt/vtgate/planbuilder/operators/apply_join.go @@ -112,22 +112,6 @@ func (aj *ApplyJoin) AddPredicate(ctx *plancontext.PlanningContext, expr sqlpars return AddPredicate(ctx, aj, expr, false, newFilterSinglePredicate) } -func (aj *ApplyJoin) GetLHS() Operator { - return aj.LHS -} - -func (aj *ApplyJoin) GetRHS() Operator { - return aj.RHS -} - -func (aj *ApplyJoin) SetLHS(operator Operator) { - aj.LHS = operator -} - -func (aj *ApplyJoin) SetRHS(operator Operator) { - aj.RHS = operator -} - func (aj *ApplyJoin) MakeInner() { if aj.IsInner() { return diff --git a/go/vt/vtgate/planbuilder/operators/hash_join.go b/go/vt/vtgate/planbuilder/operators/hash_join.go index 3761c4b87a6..3e26fa2b00c 100644 --- a/go/vt/vtgate/planbuilder/operators/hash_join.go +++ b/go/vt/vtgate/planbuilder/operators/hash_join.go @@ -229,22 +229,6 @@ func (hj *HashJoin) GetOrdering(ctx *plancontext.PlanningContext) []OrderBy { return nil // hash joins will never promise an output order } -func (hj *HashJoin) GetLHS() Operator { - return hj.LHS -} - -func (hj *HashJoin) GetRHS() Operator { - return hj.RHS -} - -func (hj *HashJoin) SetLHS(op Operator) { - hj.LHS = op -} - -func (hj *HashJoin) SetRHS(op Operator) { - hj.RHS = op -} - func (hj *HashJoin) MakeInner() { hj.LeftJoin = false } diff --git a/go/vt/vtgate/planbuilder/operators/join.go b/go/vt/vtgate/planbuilder/operators/join.go index ff4915527a7..8061f05f1ed 100644 --- a/go/vt/vtgate/planbuilder/operators/join.go +++ b/go/vt/vtgate/planbuilder/operators/join.go @@ -199,22 +199,6 @@ func (j *Join) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Exp var _ JoinOp = (*Join)(nil) -func (j *Join) GetLHS() Operator { - return j.LHS -} - -func (j *Join) GetRHS() Operator { - return j.RHS -} - -func (j *Join) SetLHS(operator Operator) { - j.LHS = operator -} - -func (j *Join) SetRHS(operator Operator) { - j.RHS = operator -} - func (j *Join) MakeInner() { if j.IsInner() { return diff --git a/go/vt/vtgate/planbuilder/operators/operator.go b/go/vt/vtgate/planbuilder/operators/operator.go index 42658e4c52e..d48e8e353c2 100644 --- a/go/vt/vtgate/planbuilder/operators/operator.go +++ b/go/vt/vtgate/planbuilder/operators/operator.go @@ -126,6 +126,22 @@ func (b *binaryOperator) SetInputs(operators []Operator) { b.RHS = operators[1] } +func (b *binaryOperator) GetLHS() Operator { + return b.LHS +} + +func (b *binaryOperator) GetRHS() Operator { + return b.RHS +} + +func (b *binaryOperator) SetLHS(operator Operator) { + b.LHS = operator +} + +func (b *binaryOperator) SetRHS(operator Operator) { + b.RHS = operator +} + // Map takes in a mapping function and applies it to both the expression in OrderBy. func (ob OrderBy) Map(mappingFunc func(sqlparser.Expr) sqlparser.Expr) OrderBy { return OrderBy{ diff --git a/go/vt/vtgate/planbuilder/operators/phases.go b/go/vt/vtgate/planbuilder/operators/phases.go index eb6c42b8724..6b843a5330a 100644 --- a/go/vt/vtgate/planbuilder/operators/phases.go +++ b/go/vt/vtgate/planbuilder/operators/phases.go @@ -33,7 +33,7 @@ type ( const ( physicalTransform Phase = iota - initialPlanning + horizonPlanning pullDistinctFromUnion delegateAggregation recursiveCTEHorizons @@ -48,7 +48,7 @@ func (p Phase) String() string { switch p { case physicalTransform: return "physicalTransform" - case initialPlanning: + case horizonPlanning: return "initial horizon planning optimization" case pullDistinctFromUnion: return "pull distinct from UNION" diff --git a/go/vt/vtgate/planbuilder/operators/query_planning.go b/go/vt/vtgate/planbuilder/operators/query_planning.go index 5fe0c7773c1..791fad815e1 100644 --- a/go/vt/vtgate/planbuilder/operators/query_planning.go +++ b/go/vt/vtgate/planbuilder/operators/query_planning.go @@ -74,6 +74,35 @@ func runPhases(ctx *plancontext.PlanningContext, root Operator) Operator { func runRewriters(ctx *plancontext.PlanningContext, root Operator) Operator { visitor := func(in Operator, _ semantics.TableSet, isRoot bool) (Operator, *ApplyResult) { switch in := in.(type) { + case *ApplyJoin: + var r *Route + _ = Visit(in.RHS, func(op Operator) error { + switch op := op.(type) { + case *Route: + opcode := op.Routing.OpCode() + switch opcode { + case engine.EqualUnique, engine.Equal, engine.IN, engine.MultiEqual: + r = op + } + return io.EOF + case *unaryOperator: + return nil + default: + // If we encounter something else than a route or an unary operator + // we will not be able to determine the op code of the RHS, we abort. + return io.EOF + } + }) + if r != nil { + r.Routing.Cost() + var vj Operator = newValuesJoin(in.LHS, in.RHS, in.JoinType) + for _, column := range in.JoinPredicates.columns { + vj = vj.AddPredicate(ctx, column.Original) + // column.RHSExpr + } + return vj, Rewrote("ApplyJoin to ValuesJoin") + } + return in, NoRewrite case *Horizon: return pushOrExpandHorizon(ctx, in) case *Join: @@ -183,7 +212,7 @@ func pushOrExpandHorizon(ctx *plancontext.PlanningContext, in *Horizon) (Operato } } - if !reachedPhase(ctx, initialPlanning) { + if !reachedPhase(ctx, horizonPlanning) { return in, NoRewrite } diff --git a/go/vt/vtgate/planbuilder/operators/subquery_planning.go b/go/vt/vtgate/planbuilder/operators/subquery_planning.go index a2aca74fb6e..74114063e36 100644 --- a/go/vt/vtgate/planbuilder/operators/subquery_planning.go +++ b/go/vt/vtgate/planbuilder/operators/subquery_planning.go @@ -449,7 +449,7 @@ func rewriteColNameToArgument( } func pushOrMergeSubQueryContainer(ctx *plancontext.PlanningContext, in *SubQueryContainer) (Operator, *ApplyResult) { - if !reachedPhase(ctx, initialPlanning) { + if !reachedPhase(ctx, horizonPlanning) { return in, NoRewrite } diff --git a/go/vt/vtgate/planbuilder/operators/values_join.go b/go/vt/vtgate/planbuilder/operators/values_join.go new file mode 100644 index 00000000000..28d14d7401a --- /dev/null +++ b/go/vt/vtgate/planbuilder/operators/values_join.go @@ -0,0 +1,120 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package operators + +import ( + "golang.org/x/exp/maps" + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" +) + +type ValuesJoin struct { + binaryOperator + + JoinType sqlparser.JoinType + + LHSExprs sqlparser.Exprs + + // Done at offset planning time + // Vars are the arguments that need to be copied from the LHS to the RHS + Vars map[string]int +} + +func newValuesJoin(lhs, rhs Operator, joinType sqlparser.JoinType) *ValuesJoin { + return &ValuesJoin{ + binaryOperator: newBinaryOp(lhs, rhs), + JoinType: joinType, + } +} + +var _ Operator = (*ValuesJoin)(nil) +var _ JoinOp = (*ValuesJoin)(nil) + +func (vj *ValuesJoin) Clone(inputs []Operator) Operator { + nvj := *vj + nvj.LHS = inputs[0] + nvj.RHS = inputs[1] + nvj.Vars = maps.Clone(vj.Vars) + return &nvj +} + +func (vj *ValuesJoin) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr) Operator { + return AddPredicate(ctx, vj, expr, false, newFilterSinglePredicate) +} + +func (vj *ValuesJoin) AddColumn(ctx *plancontext.PlanningContext, reuseExisting bool, addToGroupBy bool, expr *sqlparser.AliasedExpr) int { + col := breakExpressionInLHSandRHS(ctx, expr.Expr, TableID(vj.LHS)) + + for _, exp := range col.LHSExprs { + vj.LHSExprs = append(vj.LHSExprs, exp.Expr) + } + return vj.RHS.AddColumn(ctx, reuseExisting, addToGroupBy, expr) +} + +func (vj *ValuesJoin) AddWSColumn(ctx *plancontext.PlanningContext, offset int, underRoute bool) int { + return vj.RHS.AddWSColumn(ctx, offset, underRoute) +} + +func (vj *ValuesJoin) FindCol(ctx *plancontext.PlanningContext, expr sqlparser.Expr, underRoute bool) int { + return vj.RHS.FindCol(ctx, expr, underRoute) +} + +func (vj *ValuesJoin) GetColumns(ctx *plancontext.PlanningContext) []*sqlparser.AliasedExpr { + return vj.RHS.GetColumns(ctx) +} + +func (vj *ValuesJoin) GetSelectExprs(ctx *plancontext.PlanningContext) sqlparser.SelectExprs { + return vj.GetSelectExprs(ctx) +} + +func (vj *ValuesJoin) ShortDescription() string { + return "" +} + +func (vj *ValuesJoin) GetOrdering(ctx *plancontext.PlanningContext) []OrderBy { + return vj.RHS.GetOrdering(ctx) +} + +func (vj *ValuesJoin) MakeInner() { + if vj.IsInner() { + return + } + vj.JoinType = sqlparser.NormalJoinType +} + +func (vj *ValuesJoin) IsInner() bool { + return vj.JoinType.IsInner() +} + +func (vj *ValuesJoin) AddJoinPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr) { + if expr == nil { + return + } + predicates := sqlparser.SplitAndExpression(nil, expr) + for _, pred := range predicates { + col := breakExpressionInLHSandRHS(ctx, pred, TableID(vj.LHS)) + + if col.IsPureLeft() { + vj.LHS = vj.LHS.AddPredicate(ctx, pred) + } else { + for _, exp := range col.LHSExprs { + vj.LHSExprs = append(vj.LHSExprs, exp.Expr) + } + vj.RHS = vj.RHS.AddPredicate(ctx, pred) + } + } +} diff --git a/go/vt/vtgate/planbuilder/testdata/onecase.json b/go/vt/vtgate/planbuilder/testdata/onecase.json index 9d653b2f6e9..5482eb9f0d3 100644 --- a/go/vt/vtgate/planbuilder/testdata/onecase.json +++ b/go/vt/vtgate/planbuilder/testdata/onecase.json @@ -1,7 +1,7 @@ [ { "comment": "Add your test case here for debugging and run go test -run=One.", - "query": "", + "query": "select u.toto+ue.bar from user u join user_extra ue on u.foo = ue.user_id", "plan": { } } diff --git a/proto/query.proto b/proto/query.proto index cdf5c368578..ba137434e40 100644 --- a/proto/query.proto +++ b/proto/query.proto @@ -223,7 +223,7 @@ enum Type { RAW = 2084; // ROW_TUPLE represents multiple rows. // Properties: 37, None. - ROW_TUPLE= 2085; + ROW_TUPLE = 2085; } // Value represents a typed value.