Skip to content

Commit

Permalink
feat: fix derived table issue
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <manan@planetscale.com>
  • Loading branch information
GuptaManan100 committed Apr 11, 2024
1 parent 8a4fba0 commit ec79aa5
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 12 deletions.
19 changes: 19 additions & 0 deletions go/test/endtoend/vtgate/queries/reference/reference_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,25 @@ func TestReferenceRouting(t *testing.T) {
`[[INT64(0)]]`,
)

t.Run("Complex reference query", func(t *testing.T) {
utils.SkipIfBinaryIsBelowVersion(t, 17, "vtgate")
// Verify a complex query using reference tables with a left join having a derived table with an order by clause works as intended.
utils.AssertMatches(
t,
conn,
`SELECT t.id FROM (
SELECT zd.id, zd.zip_id
FROM `+shardedKeyspaceName+`.zip_detail AS zd
WHERE zd.id IN (2)
ORDER BY zd.discontinued_at
LIMIT 1
) AS t
LEFT JOIN `+shardedKeyspaceName+`.zip_detail AS t0 ON t.zip_id = t0.zip_id
ORDER BY t.id`,
`[[INT64(2)]]`,
)
})

// UPDATE should route an unqualified zip_detail to unsharded keyspace.
utils.Exec(t, conn,
"UPDATE zip_detail SET discontinued_at = NULL WHERE id = 2")
Expand Down
7 changes: 7 additions & 0 deletions go/vt/vtgate/planbuilder/operators/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,13 @@ func (a *Aggregator) isDerived() bool {
return a.TableID != nil
}

func (a *Aggregator) derivedName() string {
if a.TableID == nil {
return ""
}
return a.Alias
}

func (a *Aggregator) AddColumn(ctx *plancontext.PlanningContext, expr *sqlparser.AliasedExpr, _, addToGroupBy bool) (ops.Operator, int, error) {
if addToGroupBy {
return nil, 0, vterrors.VT13001("did not expect to add group by here")
Expand Down
7 changes: 4 additions & 3 deletions go/vt/vtgate/planbuilder/operators/horizon_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,12 +465,12 @@ func pushDownProjectionInApplyJoin(
var err error

// Create and update the Projection operators for the left and right children, if needed.
src.LHS, err = createProjectionWithTheseColumns(src.LHS, lhs, p.TableID, p.Alias)
src.LHS, err = createProjectionWithTheseColumns(ctx, src.LHS, lhs, p.TableID, p.Alias)
if err != nil {
return nil, nil, err
}

src.RHS, err = createProjectionWithTheseColumns(src.RHS, rhs, p.TableID, p.Alias)
src.RHS, err = createProjectionWithTheseColumns(ctx, src.RHS, rhs, p.TableID, p.Alias)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -579,6 +579,7 @@ func prefixColNames(tblName sqlparser.TableName, e sqlparser.Expr) (out sqlparse
}

func createProjectionWithTheseColumns(
ctx *plancontext.PlanningContext,
src ops.Operator,
p *projector,
tableID *semantics.TableSet,
Expand All @@ -587,7 +588,7 @@ func createProjectionWithTheseColumns(
if len(p.cols) == 0 {
return src, nil
}
proj, err := createProjection(src)
proj, err := createProjection(ctx, src, "")
if err != nil {
return nil, err
}
Expand Down
7 changes: 7 additions & 0 deletions go/vt/vtgate/planbuilder/operators/projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ func (p *Projection) isDerived() bool {
return p.TableID != nil
}

func (p *Projection) derivedName() string {
if p.TableID == nil {
return ""
}
return p.Alias
}

func (p *Projection) AddColumn(ctx *plancontext.PlanningContext, expr *sqlparser.AliasedExpr, _, addToGroupBy bool) (ops.Operator, int, error) {
if offset, found := canReuseColumn(ctx, p.Columns, expr.Expr, extractExpr); found {
return p, offset, nil
Expand Down
39 changes: 30 additions & 9 deletions go/vt/vtgate/planbuilder/operators/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,14 +531,26 @@ func (r *Route) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Ex
return r, err
}

func createProjection(src ops.Operator) (*Projection, error) {
func createProjection(ctx *plancontext.PlanningContext, src ops.Operator, derivedName string) (*Projection, error) {
proj := &Projection{Source: src}
cols, err := src.GetColumns()
if err != nil {
return nil, err
}
for _, col := range cols {
proj.addUnexploredExpr(col, col.Expr)
if derivedName == "" {
proj.addUnexploredExpr(col, col.Expr)
continue
}

// for derived tables, we want to use the exposed colname
tableName := sqlparser.TableName{
Name: sqlparser.NewIdentifierCS(derivedName),
}
columnName := col.ColumnName()
colName := sqlparser.NewColNameWithQualifier(columnName, tableName)
ctx.SemTable.CopyDependencies(col.Expr, colName)
proj.addUnexploredExpr(aeWrap(colName), colName)
}
return proj, nil
}
Expand All @@ -560,19 +572,20 @@ func (r *Route) AddColumn(ctx *plancontext.PlanningContext, expr *sqlparser.Alia

// if column is not already present, we check if we can easily find a projection
// or aggregation in our source that we can add to
if ok, offset := addColumnToInput(r.Source, expr, addToGroupBy); ok {
derived, ok, offset := addColumnToInput(r.Source, expr, addToGroupBy)
if ok {
return r, offset, nil
}

// If no-one could be found, we probably don't have one yet, so we add one here
src, err := createProjection(r.Source)
src, err := createProjection(ctx, r.Source, derived)
if err != nil {
return nil, 0, err
}
r.Source = src

// And since we are under the route, we don't need to continue pushing anything further down
offset := src.addColumnWithoutPushing(expr, false)
offset = src.addColumnWithoutPushing(expr, false)
if err != nil {
return nil, 0, err
}
Expand All @@ -582,26 +595,34 @@ func (r *Route) AddColumn(ctx *plancontext.PlanningContext, expr *sqlparser.Alia
type selectExpressions interface {
addColumnWithoutPushing(expr *sqlparser.AliasedExpr, addToGroupBy bool) int
isDerived() bool
derivedName() string
}

func addColumnToInput(operator ops.Operator, expr *sqlparser.AliasedExpr, addToGroupBy bool) (bool, int) {
func addColumnToInput(operator ops.Operator, expr *sqlparser.AliasedExpr, addToGroupBy bool) (
derivedName string, // if we found a derived table, this will contain its name
found bool, // whether a matching op was found or not
offset int, // the offsets the expressions received
) {
switch op := operator.(type) {
case *CorrelatedSubQueryOp:
return addColumnToInput(op.Outer, expr, addToGroupBy)
case *Limit:
return addColumnToInput(op.Source, expr, addToGroupBy)
case *Ordering:
return addColumnToInput(op.Source, expr, addToGroupBy)
case *Derived:
// Get the alias for the derived table. We should use this for creating the projection.
return op.Alias, false, 0
case selectExpressions:
if op.isDerived() {
// if the only thing we can push to is a derived table,
// we have to add a new projection and can't build on this one
return false, 0
return op.derivedName(), false, 0
}
offset := op.addColumnWithoutPushing(expr, addToGroupBy)
return true, offset
return "", true, offset
default:
return false, 0
return "", false, 0
}
}

Expand Down
59 changes: 59 additions & 0 deletions go/vt/vtgate/planbuilder/testdata/reference_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,65 @@
]
}
},
{
"comment": "Reference tables using left join with a derived table having a limit clause",
"query": "SELECT u.id FROM ( SELECT a.id, a.u_id FROM user.ref_with_source AS a WHERE a.id IN (3) ORDER BY a.d_at LIMIT 1) as u LEFT JOIN user.ref_with_source AS u0 ON u.u_id = u0.u_uid ORDER BY u.id",
"plan": {
"QueryType": "SELECT",
"Original": "SELECT u.id FROM ( SELECT a.id, a.u_id FROM user.ref_with_source AS a WHERE a.id IN (3) ORDER BY a.d_at LIMIT 1) as u LEFT JOIN user.ref_with_source AS u0 ON u.u_id = u0.u_uid ORDER BY u.id",
"Instructions": {
"OperatorType": "SimpleProjection",
"Columns": [
0
],
"Inputs": [
{
"OperatorType": "Sort",
"Variant": "Memory",
"OrderBy": "(0|1) ASC",
"Inputs": [
{
"OperatorType": "Join",
"Variant": "LeftJoin",
"JoinColumnIndexes": "L:2,L:3",
"JoinVars": {
"u_u_id": 4
},
"TableName": "ref_with_source_ref_with_source",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "Reference",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select u.id, u.u_id, u.id, weight_string(u.id), u.u_id from (select a.id, a.u_id from ref_with_source as a where 1 != 1) as u where 1 != 1",
"Query": "select u.id, u.u_id, u.id, weight_string(u.id), u.u_id from (select a.id, a.u_id from ref_with_source as a where a.id in (3) order by a.d_at asc limit 1) as u",
"Table": "ref_with_source"
},
{
"OperatorType": "Route",
"Variant": "Reference",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select 1 from ref_with_source as u0 where 1 != 1",
"Query": "select 1 from ref_with_source as u0 where u0.u_uid = :u_u_id",
"Table": "ref_with_source"
}
]
}
]
}
]
},
"TablesUsed": [
"user.ref_with_source"
]
}
},
{
"comment": "insert into qualified ambiguous reference table routes v3 to requested keyspace gen4 to source",
"query": "insert into user.ambiguous_ref_with_source(col) values(1)",
Expand Down

0 comments on commit ec79aa5

Please sign in to comment.