Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
Signed-off-by: Florent Poinsard <florent.poinsard@outlook.fr>
  • Loading branch information
frouioui committed Sep 3, 2024
1 parent 9a03a78 commit 9cfbb29
Show file tree
Hide file tree
Showing 10 changed files with 171 additions and 54 deletions.
16 changes: 0 additions & 16 deletions go/vt/vtgate/planbuilder/operators/apply_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,22 +112,6 @@ func (aj *ApplyJoin) AddPredicate(ctx *plancontext.PlanningContext, expr sqlpars
return AddPredicate(ctx, aj, expr, false, newFilterSinglePredicate)
}

func (aj *ApplyJoin) GetLHS() Operator {
return aj.LHS
}

func (aj *ApplyJoin) GetRHS() Operator {
return aj.RHS
}

func (aj *ApplyJoin) SetLHS(operator Operator) {
aj.LHS = operator
}

func (aj *ApplyJoin) SetRHS(operator Operator) {
aj.RHS = operator
}

func (aj *ApplyJoin) MakeInner() {
if aj.IsInner() {
return
Expand Down
16 changes: 0 additions & 16 deletions go/vt/vtgate/planbuilder/operators/hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,22 +229,6 @@ func (hj *HashJoin) GetOrdering(ctx *plancontext.PlanningContext) []OrderBy {
return nil // hash joins will never promise an output order
}

func (hj *HashJoin) GetLHS() Operator {
return hj.LHS
}

func (hj *HashJoin) GetRHS() Operator {
return hj.RHS
}

func (hj *HashJoin) SetLHS(op Operator) {
hj.LHS = op
}

func (hj *HashJoin) SetRHS(op Operator) {
hj.RHS = op
}

func (hj *HashJoin) MakeInner() {
hj.LeftJoin = false
}
Expand Down
16 changes: 0 additions & 16 deletions go/vt/vtgate/planbuilder/operators/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,22 +199,6 @@ func (j *Join) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Exp

var _ JoinOp = (*Join)(nil)

func (j *Join) GetLHS() Operator {
return j.LHS
}

func (j *Join) GetRHS() Operator {
return j.RHS
}

func (j *Join) SetLHS(operator Operator) {
j.LHS = operator
}

func (j *Join) SetRHS(operator Operator) {
j.RHS = operator
}

func (j *Join) MakeInner() {
if j.IsInner() {
return
Expand Down
16 changes: 16 additions & 0 deletions go/vt/vtgate/planbuilder/operators/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,22 @@ func (b *binaryOperator) SetInputs(operators []Operator) {
b.RHS = operators[1]
}

func (b *binaryOperator) GetLHS() Operator {
return b.LHS
}

func (b *binaryOperator) GetRHS() Operator {
return b.RHS
}

func (b *binaryOperator) SetLHS(operator Operator) {
b.LHS = operator
}

func (b *binaryOperator) SetRHS(operator Operator) {
b.RHS = operator
}

// Map takes in a mapping function and applies it to both the expression in OrderBy.
func (ob OrderBy) Map(mappingFunc func(sqlparser.Expr) sqlparser.Expr) OrderBy {
return OrderBy{
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/planbuilder/operators/phases.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type (

const (
physicalTransform Phase = iota
initialPlanning
horizonPlanning
pullDistinctFromUnion
delegateAggregation
recursiveCTEHorizons
Expand All @@ -48,7 +48,7 @@ func (p Phase) String() string {
switch p {
case physicalTransform:
return "physicalTransform"
case initialPlanning:
case horizonPlanning:
return "initial horizon planning optimization"
case pullDistinctFromUnion:
return "pull distinct from UNION"
Expand Down
31 changes: 30 additions & 1 deletion go/vt/vtgate/planbuilder/operators/query_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,35 @@ func runPhases(ctx *plancontext.PlanningContext, root Operator) Operator {
func runRewriters(ctx *plancontext.PlanningContext, root Operator) Operator {
visitor := func(in Operator, _ semantics.TableSet, isRoot bool) (Operator, *ApplyResult) {
switch in := in.(type) {
case *ApplyJoin:
var r *Route
_ = Visit(in.RHS, func(op Operator) error {
switch op := op.(type) {
case *Route:
opcode := op.Routing.OpCode()
switch opcode {
case engine.EqualUnique, engine.Equal, engine.IN, engine.MultiEqual:
r = op
}
return io.EOF
case *unaryOperator:
return nil
default:
// If we encounter something else than a route or an unary operator
// we will not be able to determine the op code of the RHS, we abort.
return io.EOF
}
})
if r != nil {
r.Routing.Cost()
var vj Operator = newValuesJoin(in.LHS, in.RHS, in.JoinType)
for _, column := range in.JoinPredicates.columns {
vj = vj.AddPredicate(ctx, column.Original)
// column.RHSExpr
}
return vj, Rewrote("ApplyJoin to ValuesJoin")
}
return in, NoRewrite
case *Horizon:
return pushOrExpandHorizon(ctx, in)
case *Join:
Expand Down Expand Up @@ -183,7 +212,7 @@ func pushOrExpandHorizon(ctx *plancontext.PlanningContext, in *Horizon) (Operato
}
}

if !reachedPhase(ctx, initialPlanning) {
if !reachedPhase(ctx, horizonPlanning) {
return in, NoRewrite
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/operators/subquery_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ func rewriteColNameToArgument(
}

func pushOrMergeSubQueryContainer(ctx *plancontext.PlanningContext, in *SubQueryContainer) (Operator, *ApplyResult) {
if !reachedPhase(ctx, initialPlanning) {
if !reachedPhase(ctx, horizonPlanning) {
return in, NoRewrite
}

Expand Down
120 changes: 120 additions & 0 deletions go/vt/vtgate/planbuilder/operators/values_join.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package operators

import (
"golang.org/x/exp/maps"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext"
)

type ValuesJoin struct {
binaryOperator

JoinType sqlparser.JoinType

LHSExprs sqlparser.Exprs

// Done at offset planning time
// Vars are the arguments that need to be copied from the LHS to the RHS
Vars map[string]int
}

func newValuesJoin(lhs, rhs Operator, joinType sqlparser.JoinType) *ValuesJoin {
return &ValuesJoin{
binaryOperator: newBinaryOp(lhs, rhs),
JoinType: joinType,
}
}

var _ Operator = (*ValuesJoin)(nil)
var _ JoinOp = (*ValuesJoin)(nil)

func (vj *ValuesJoin) Clone(inputs []Operator) Operator {
nvj := *vj
nvj.LHS = inputs[0]
nvj.RHS = inputs[1]
nvj.Vars = maps.Clone(vj.Vars)
return &nvj
}

func (vj *ValuesJoin) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr) Operator {
return AddPredicate(ctx, vj, expr, false, newFilterSinglePredicate)
}

func (vj *ValuesJoin) AddColumn(ctx *plancontext.PlanningContext, reuseExisting bool, addToGroupBy bool, expr *sqlparser.AliasedExpr) int {
col := breakExpressionInLHSandRHS(ctx, expr.Expr, TableID(vj.LHS))

for _, exp := range col.LHSExprs {
vj.LHSExprs = append(vj.LHSExprs, exp.Expr)
}
return vj.RHS.AddColumn(ctx, reuseExisting, addToGroupBy, expr)
}

func (vj *ValuesJoin) AddWSColumn(ctx *plancontext.PlanningContext, offset int, underRoute bool) int {
return vj.RHS.AddWSColumn(ctx, offset, underRoute)
}

func (vj *ValuesJoin) FindCol(ctx *plancontext.PlanningContext, expr sqlparser.Expr, underRoute bool) int {
return vj.RHS.FindCol(ctx, expr, underRoute)
}

func (vj *ValuesJoin) GetColumns(ctx *plancontext.PlanningContext) []*sqlparser.AliasedExpr {
return vj.RHS.GetColumns(ctx)
}

func (vj *ValuesJoin) GetSelectExprs(ctx *plancontext.PlanningContext) sqlparser.SelectExprs {
return vj.GetSelectExprs(ctx)
}

func (vj *ValuesJoin) ShortDescription() string {
return ""
}

func (vj *ValuesJoin) GetOrdering(ctx *plancontext.PlanningContext) []OrderBy {
return vj.RHS.GetOrdering(ctx)
}

func (vj *ValuesJoin) MakeInner() {
if vj.IsInner() {
return
}
vj.JoinType = sqlparser.NormalJoinType
}

func (vj *ValuesJoin) IsInner() bool {
return vj.JoinType.IsInner()
}

func (vj *ValuesJoin) AddJoinPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr) {
if expr == nil {
return
}
predicates := sqlparser.SplitAndExpression(nil, expr)
for _, pred := range predicates {
col := breakExpressionInLHSandRHS(ctx, pred, TableID(vj.LHS))

if col.IsPureLeft() {
vj.LHS = vj.LHS.AddPredicate(ctx, pred)
} else {
for _, exp := range col.LHSExprs {
vj.LHSExprs = append(vj.LHSExprs, exp.Expr)
}
vj.RHS = vj.RHS.AddPredicate(ctx, pred)
}
}
}
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/testdata/onecase.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[
{
"comment": "Add your test case here for debugging and run go test -run=One.",
"query": "",
"query": "select u.toto+ue.bar from user u join user_extra ue on u.foo = ue.user_id",
"plan": {
}
}
Expand Down
2 changes: 1 addition & 1 deletion proto/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ enum Type {
RAW = 2084;
// ROW_TUPLE represents multiple rows.
// Properties: 37, None.
ROW_TUPLE= 2085;
ROW_TUPLE = 2085;
}

// Value represents a typed value.
Expand Down

0 comments on commit 9cfbb29

Please sign in to comment.