Skip to content

Commit

Permalink
refactor: remove projection and semiJoin logicalPlan
Browse files Browse the repository at this point in the history
Signed-off-by: Andres Taylor <andres@planetscale.com>
  • Loading branch information
systay committed May 24, 2024
1 parent d8cbde7 commit ae5caf9
Show file tree
Hide file tree
Showing 7 changed files with 16 additions and 173 deletions.
6 changes: 1 addition & 5 deletions go/vt/vtgate/engine/cached_size.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 5 additions & 43 deletions go/vt/vtgate/engine/semi_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ package engine

import (
"context"
"fmt"
"strings"

"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
Expand All @@ -33,14 +31,6 @@ type SemiJoin struct {
// of the SemiJoin. They can be any primitive.
Left, Right Primitive `json:",omitempty"`

// Cols defines which columns from the left
// results should be used to build the
// return result. For results coming from the
// left query, the index values go as -1, -2, etc.
// If Cols is {-1, -2}, it means that
// the returned result will be {Left0, Left1}.
Cols []int `json:",omitempty"`

// Vars defines the list of SemiJoinVars that need to
// be built from the LHS result before invoking
// the RHS subquery.
Expand All @@ -54,7 +44,7 @@ func (jn *SemiJoin) TryExecute(ctx context.Context, vcursor VCursor, bindVars ma
if err != nil {
return nil, err
}
result := &sqltypes.Result{Fields: projectFields(lresult.Fields, jn.Cols)}
result := &sqltypes.Result{Fields: lresult.Fields}
for _, lrow := range lresult.Rows {
for k, col := range jn.Vars {
joinVars[k] = sqltypes.ValueBindVariable(lrow[col])
Expand All @@ -64,7 +54,7 @@ func (jn *SemiJoin) TryExecute(ctx context.Context, vcursor VCursor, bindVars ma
return nil, err
}
if len(rresult.Rows) > 0 {
result.Rows = append(result.Rows, projectRows(lrow, jn.Cols))
result.Rows = append(result.Rows, lrow)
}
}
return result, nil
Expand All @@ -74,15 +64,15 @@ func (jn *SemiJoin) TryExecute(ctx context.Context, vcursor VCursor, bindVars ma
func (jn *SemiJoin) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
joinVars := make(map[string]*querypb.BindVariable)
err := vcursor.StreamExecutePrimitive(ctx, jn.Left, bindVars, wantfields, func(lresult *sqltypes.Result) error {
result := &sqltypes.Result{Fields: projectFields(lresult.Fields, jn.Cols)}
result := &sqltypes.Result{Fields: lresult.Fields}
for _, lrow := range lresult.Rows {
for k, col := range jn.Vars {
joinVars[k] = sqltypes.ValueBindVariable(lrow[col])
}
rowAdded := false
err := vcursor.StreamExecutePrimitive(ctx, jn.Right, combineVars(bindVars, joinVars), false, func(rresult *sqltypes.Result) error {
if len(rresult.Rows) > 0 && !rowAdded {
result.Rows = append(result.Rows, projectRows(lrow, jn.Cols))
result.Rows = append(result.Rows, lrow)
rowAdded = true
}
return nil
Expand Down Expand Up @@ -135,8 +125,7 @@ func (jn *SemiJoin) NeedsTransaction() bool {

func (jn *SemiJoin) description() PrimitiveDescription {
other := map[string]any{
"TableName": jn.GetTableName(),
"ProjectedIndexes": strings.Trim(strings.Join(strings.Fields(fmt.Sprint(jn.Cols)), ","), "[]"),
"TableName": jn.GetTableName(),
}
if len(jn.Vars) > 0 {
other["JoinVars"] = orderedStringIntMap(jn.Vars)
Expand All @@ -146,30 +135,3 @@ func (jn *SemiJoin) description() PrimitiveDescription {
Other: other,
}
}

func projectFields(lfields []*querypb.Field, cols []int) []*querypb.Field {
if lfields == nil {
return nil
}
if len(cols) == 0 {
return lfields
}
fields := make([]*querypb.Field, len(cols))
for i, index := range cols {
fields[i] = lfields[-index-1]
}
return fields
}

func projectRows(lrow []sqltypes.Value, cols []int) []sqltypes.Value {
if len(cols) == 0 {
return lrow
}
row := make([]sqltypes.Value, len(cols))
for i, index := range cols {
if index < 0 {
row[i] = lrow[-index-1]
}
}
return row
}
2 changes: 0 additions & 2 deletions go/vt/vtgate/engine/semi_join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ func TestSemiJoinExecute(t *testing.T) {
Vars: map[string]int{
"bv": 1,
},
Cols: []int{-1, -2, -3},
}
r, err := jn.TryExecute(context.Background(), &noopVCursor{}, bv, true)
require.NoError(t, err)
Expand Down Expand Up @@ -139,7 +138,6 @@ func TestSemiJoinStreamExecute(t *testing.T) {
Vars: map[string]int{
"bv": 1,
},
Cols: []int{-1, -2, -3},
}
r, err := wrapStreamExecute(jn, &noopVCursor{}, map[string]*querypb.BindVariable{}, true)
require.NoError(t, err)
Expand Down
16 changes: 0 additions & 16 deletions go/vt/vtgate/planbuilder/logical_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,3 @@ func newBuilderCommon(input logicalPlan) logicalPlanCommon {
func (bc *logicalPlanCommon) Order() int {
return bc.order
}

// -------------------------------------------------------------------------

// resultsBuilder is a superset of logicalPlanCommon. It also handles
// resultsColumn functionality.
type resultsBuilder struct {
logicalPlanCommon
truncater truncater
}

func newResultsBuilder(input logicalPlan, truncater truncater) resultsBuilder {
return resultsBuilder{
logicalPlanCommon: newBuilderCommon(input),
truncater: truncater,
}
}
21 changes: 10 additions & 11 deletions go/vt/vtgate/planbuilder/operator_transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,11 @@ func transformSubQuery(ctx *plancontext.PlanningContext, op *operators.SubQuery)
return newUncorrelatedSubquery(op.FilterType, op.SubqueryValueName, op.HasValuesName, inner, outer), nil
}

lhsCols := op.OuterExpressionsNeeded(ctx, op.Outer)
return newSemiJoin(outer, inner, op.Vars, lhsCols), nil
return &primitiveWrapper{prim: &engine.SemiJoin{
Left: outer.Primitive(),
Right: inner.Primitive(),
Vars: op.Vars,
}}, nil
}

// transformFkVerify transforms a FkVerify operator into a logical plan.
Expand Down Expand Up @@ -407,26 +410,22 @@ func transformProjection(ctx *plancontext.PlanningContext, op *operators.Project
var evalengineExprs []evalengine.Expr
var columnNames []string
for _, pe := range ap {
ee, err := getEvalEngingeExpr(ctx, pe)
ee, err := getEvalEngineExpr(ctx, pe)
if err != nil {
return nil, err
}
evalengineExprs = append(evalengineExprs, ee)
columnNames = append(columnNames, pe.Original.ColumnName())
}

primitive := &engine.Projection{
return &primitiveWrapper{prim: &engine.Projection{
Input: src.Primitive(),
Cols: columnNames,
Exprs: evalengineExprs,
}

return &projection{
source: src,
primitive: primitive,
}, nil
}}, nil
}

func getEvalEngingeExpr(ctx *plancontext.PlanningContext, pe *operators.ProjExpr) (evalengine.Expr, error) {
func getEvalEngineExpr(ctx *plancontext.PlanningContext, pe *operators.ProjExpr) (evalengine.Expr, error) {
switch e := pe.Info.(type) {
case *operators.EvalEngine:
return e.EExpr, nil
Expand Down
37 changes: 0 additions & 37 deletions go/vt/vtgate/planbuilder/projection.go

This file was deleted.

59 changes: 0 additions & 59 deletions go/vt/vtgate/planbuilder/semi_join.go

This file was deleted.

0 comments on commit ae5caf9

Please sign in to comment.