Skip to content

Commit

Permalink
Working e2e with insert, update and delete
Browse files Browse the repository at this point in the history
Signed-off-by: Rohit Nayak <rohit@planetscale.com>
  • Loading branch information
rohit-nayak-ps committed Jun 4, 2024
1 parent 0071734 commit c9baeb6
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 23 deletions.
50 changes: 44 additions & 6 deletions go/test/endtoend/vreplication/materialize_view_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ package vreplication

import (
"fmt"
"strings"
"testing"
"time"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"

Expand Down Expand Up @@ -45,7 +49,7 @@ func TestMaterializeView(t *testing.T) {
workflow := "wf1"
sourceKeyspace := "product"
targetKeyspace := "product"
viewQuery := "select /*vt+ view=orders_view */ o.oid, c.cid, p.pid, o.mname, o.qty, o.price, c.name, p.description from orders o join customer c on o.cid = c.cid join product p on p.pid = o.pid;"
viewQuery := "select /*vt+ view=orders_view */ orders.oid, customer.cid, product.pid, orders.mname, orders.qty, orders.price, customer.name, product.description from orders join customer on orders.cid = customer.cid join product on product.pid = orders.pid;"
tableSettings := fmt.Sprintf("[ {\"target_table\": \"orders_view\", \"source_expression\": \"%s\" }]", viewQuery)

output, err := vc.VtctldClient.ExecuteCommandWithOutput("materialize", "--workflow", workflow, "--target-keyspace", targetKeyspace,
Expand All @@ -54,18 +58,52 @@ func TestMaterializeView(t *testing.T) {
require.NoError(t, err, "Materialize")

waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", targetKeyspace, workflow), binlogdatapb.VReplicationWorkflowState_Running.String())
diffViews(t, vtgateConn, viewQuery, "After copy phase")

queries := []string{
"insert into orders (oid, cid, pid, mname, qty, price) values (11, 1, 1, 'mname1', 1, 1100)",
"update orders set qty = 2 where oid = 11",
"insert into orders (oid, cid, pid, mname, qty, price) values (12, 1, 1, 'mname1', 1, 1200)",
}
for _, query := range queries {
execVtgateQuery(t, vtgateConn, "product", query)
}
time.Sleep(3 * time.Second)
diffViews(t, vtgateConn, viewQuery, "After DMLs")

rsMaterializedView := execVtgateQuery(t, vtgateConn, "product", "select * from orders_view")
rsViewQuery := execVtgateQuery(t, vtgateConn, "product", viewQuery)
diffResults(t, rsMaterializedView, rsViewQuery)
queries = []string{
"delete from orders where oid = 11",
}
for _, query := range queries {
execVtgateQuery(t, vtgateConn, "product", query)
}
time.Sleep(3 * time.Second)
diffViews(t, vtgateConn, viewQuery, "After Delete")

queries = []string{
"update orders set qty = 3 where oid = 12",
}
for _, query := range queries {
execVtgateQuery(t, vtgateConn, "product", query)
}
time.Sleep(3 * time.Second)
diffViews(t, vtgateConn, viewQuery, "After Update")
}

func diffResults(t *testing.T, rs1, rs2 *sqltypes.Result) {
func diffViews(t *testing.T, vtgateConn *mysql.Conn, viewQuery, message string) {
rsMaterializedView := execVtgateQuery(t, vtgateConn, "product", "select * from orders_view order by oid")
rsViewQuery := execVtgateQuery(t, vtgateConn, "product", strings.Replace(viewQuery, ";", " order by oid;", 1))
diffResults(t, rsViewQuery, rsMaterializedView, message)
}

func diffResults(t *testing.T, rs1, rs2 *sqltypes.Result, message string) {
if len(rs1.Rows) != len(rs2.Rows) {
t.Fatalf("%s:: Row count mismatch: %d != %d", message, len(rs1.Rows), len(rs2.Rows))
}
for i := range rs1.Rows {
for j := range rs1.Rows[i] {
if rs1.Rows[i][j].RawStr() != rs2.Rows[i][j].RawStr() {
t.Fatalf("Row %d, Column %d: %v != %v", i, j, rs1.Rows[i][j].RawStr(), rs2.Rows[i][j].RawStr())
t.Fatalf("%s:: Row %d, Column %d: %v != %v", message, i, j, rs1.Rows[i][j].RawStr(), rs2.Rows[i][j].RawStr())
}
}
}
Expand Down
18 changes: 12 additions & 6 deletions go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,16 +398,22 @@ func (tp *TablePlan) applyChangeForJoin(eventTableName string, eventTablePlan *T
log.Infof("Ignoring non-main table insert for %v", eventTableName)
return nil, nil
}
clear(bindvars)
log.Infof("Inserting into main table %v: %s", tp.JoinPlan.MainTable, tp.JoinPlan.Insert.Query)
log.Infof("Inserting into main table %v: %s, bindvars %+q", tp.JoinPlan.MainTable, tp.JoinPlan.Insert.Query, bindvars)
return execParsedQuery(tp.JoinPlan.Insert, bindvars, executor)
case before && after:
update := tp.JoinPlan.Updates[eventTableName]
if update == nil {
upd := tp.JoinPlan.Updates[eventTableName]
if upd == nil {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "update query not found for %v", eventTableName)
}
log.Infof("Updating %v: %s with bindvars %+q", eventTableName, update.Query, bindvars)
return execParsedQuery(update, bindvars, executor)
log.Infof("Updating %v: %s with bindvars %+q", eventTableName, upd.Query, bindvars)
return execParsedQuery(upd, bindvars, executor)
case before && !after:
del := tp.JoinPlan.Deletes[eventTableName]
if del == nil {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "delete query not found for %v", eventTableName)
}
log.Infof("Deleting from %v: %s with bindvars %+q", eventTableName, del.Query, bindvars)
return execParsedQuery(del, bindvars, executor)
default:
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "applyChangeForJoin called with before or without after")
}
Expand Down
23 changes: 17 additions & 6 deletions go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func buildReplicatorPlanForJoin(source *binlogdatapb.BinlogSource, colInfoMap ma
joinPlan.TableName = view
joinPlan.MainTableName = joinTables[0]
query := source.Filter.Rules[0].Filter
log.Infof("View, query %s %s", view, query)
log.Infof("View %s, query %s", view, query)
tablePlan := &TablePlan{
TargetName: view,
SendRule: source.Filter.Rules[0],
Expand Down Expand Up @@ -270,7 +270,14 @@ func buildReplicatorPlanForJoin(source *binlogdatapb.BinlogSource, colInfoMap ma
log.Infof("table %s, column %v", table, cols)
}

qr, err := dbClient.ExecuteFetch("select * from "+view, 1)
if view == "" {
return nil, fmt.Errorf("view name is empty")
}
qr, err := dbClient.ExecuteFetch(fmt.Sprintf("select * from %s limit 1", view), 1)
if err != nil {
return nil, err

}
tablePlan.Fields = qr.Fields
insert := generateInsertForJoin(view, query, tablePlan.Fields)
log.Infof("Insert for Join is %+q", insert)
Expand Down Expand Up @@ -310,7 +317,9 @@ type ViewColumn struct {
}

func generateInsertForJoin(view string, viewQuery string, fields []*querypb.Field) *sqlparser.ParsedQuery {
buf := sqlparser.NewTrackedBuffer(nil)
bvf := &bindvarFormatter{}
buf := sqlparser.NewTrackedBuffer(bvf.formatter)
bvf.mode = bvAfter
buf.Myprintf("insert into %v (", sqlparser.NewIdentifierCS(view))
separator := ""
for _, field := range fields {
Expand All @@ -319,8 +328,9 @@ func generateInsertForJoin(view string, viewQuery string, fields []*querypb.Fiel
}
buf.Myprintf(") ")
buf.Myprintf("%s", viewQuery)
buf.Myprintf(" where oid = ")
buf.Myprintf("%v", sqlparser.NewColName("oid"))
return buf.ParsedQuery()

}

func generateUpdatesForJoin(view string, viewColumns map[string][]*ViewColumn) map[string]*sqlparser.ParsedQuery {
Expand All @@ -340,6 +350,7 @@ func generateUpdatesForJoin(view string, viewColumns map[string][]*ViewColumn) m
separator = ", "
}
col := cols[0]
bvf.mode = bvBefore
buf.Myprintf(" where %s = ", sqlparser.NewIdentifierCI(col.ViewColumnName).String())
buf.Myprintf("%v", sqlparser.NewColName(col.ViewColumnName))
updates[table] = buf.ParsedQuery()
Expand All @@ -353,7 +364,7 @@ func generateDeletesForJoin(view string, viewColumns map[string][]*ViewColumn) m
for table, cols := range viewColumns {
bvf := &bindvarFormatter{}
buf := sqlparser.NewTrackedBuffer(bvf.formatter)
bvf.mode = bvAfter
bvf.mode = bvBefore
buf.Myprintf("delete from %v where ", sqlparser.NewIdentifierCS(view))
separator := ""
for i, col := range cols {
Expand All @@ -362,7 +373,7 @@ func generateDeletesForJoin(view string, viewColumns map[string][]*ViewColumn) m
}
buf.Myprintf("%s%s = ", separator, sqlparser.NewIdentifierCI(col.ViewColumnName).CompliantName())
buf.Myprintf("%v", sqlparser.NewColName(col.ViewColumnName))
separator = ", "
separator = " and "
}
deletes[table] = buf.ParsedQuery()
log.Infof("Delete for table %s is %s, bindLocations %d", table, buf.String(), len(deletes[table].BindLocations()))
Expand Down
16 changes: 11 additions & 5 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,13 +187,16 @@ func (vp *vplayer) play(ctx context.Context) error {
}
log.Infof("Starting VReplication player id: %v, startPos: %v, stop: %v, filter: %v", vp.vr.id, vp.startPos, vp.stopPos, vp.vr.source)
plan, err := buildReplicatorPlanForJoin(vp.vr.source, vp.vr.colInfoMap, vp.copyState, vp.vr.stats, vp.vr.vre.env.CollationEnv(), vp.vr.vre.env.Parser(), vp.vr.dbClient, false)
for tableName, tablePlan := range plan.TablePlans {
vp.tablePlans[tableName] = tablePlan
}
if err != nil {
vp.vr.stats.ErrorCounts.Add([]string{"Plan"}, 1)
return err
}
if plan == nil {
return errors.New("no plan generated")
}
for tableName, tablePlan := range plan.TablePlans {
vp.tablePlans[tableName] = tablePlan
}
vp.replicatorPlan = plan

// We can't run in statement mode if there are filters defined.
Expand Down Expand Up @@ -342,9 +345,12 @@ func (vp *vplayer) applyRowEventForJoin(ctx context.Context, rowEvent *binlogdat
applyFunc := func(sql string) (*sqltypes.Result, error) {
stats := NewVrLogStats("ROWCHANGE")
start := time.Now()
log.Infof("Applying row change for table %s, sql is %s", rowEvent.TableName, sql)
log.Flush()
qr, err := vp.query(ctx, sql)
log.Infof("Applying row change for table %s, sql is %s, updated %d rows, error %v",
rowEvent.TableName, sql, len(qr.Rows), err)
if err != nil {
log.Errorf("Error applying row change for table %s, sql is %s: %v", rowEvent.TableName, sql, err)
}
vp.vr.stats.QueryCount.Add(vp.phase, 1)
vp.vr.stats.QueryTimings.Record(vp.phase, start)
stats.Send(sql)
Expand Down

0 comments on commit c9baeb6

Please sign in to comment.