diff --git a/go/vt/vttablet/tabletserver/vstreamer/helper_event_test.go b/go/vt/vttablet/tabletserver/vstreamer/helper_event_test.go index 6c54a27996e..8c4ac20cb10 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/helper_event_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/helper_event_test.go @@ -46,12 +46,14 @@ import ( "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/mysql/collations/colldata" + "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/proto/binlogdata" "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/schemadiff" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer/testenv" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) const ( @@ -69,10 +71,11 @@ var ( // TestColumn has all the attributes of a column required for the test cases. type TestColumn struct { name, dataType, colType string - collationID collations.ID len int64 + collationID collations.ID dataTypeLowered string skip bool + collationName string } // TestFieldEvent has all the attributes of a table required for creating a field event. @@ -81,6 +84,47 @@ type TestFieldEvent struct { cols []*TestColumn } +func (tfe *TestFieldEvent) String() string { + var fe binlogdatapb.FieldEvent + var field *query.Field + fe.TableName = tfe.table + for _, col := range tfe.cols { + if col.skip { + continue + } + if col.name == "keyspace_id" { + field = &query.Field{ + Name: col.name, + Type: getQueryType(col.dataType), + Charset: uint32(col.collationID), + } + } else { + field = &query.Field{ + Name: col.name, + Type: getQueryType(col.dataType), + Table: tfe.table, + OrgTable: tfe.table, + Database: tfe.db, + OrgName: col.name, + ColumnLength: uint32(col.len), + Charset: uint32(col.collationID), + ColumnType: col.colType, + } + } + fe.Fields = append(fe.Fields, field) + + } + if !ignoreKeyspaceShardInFieldAndRowEvents { + fe.Keyspace = testenv.DBName + fe.Shard = testenv.DefaultShard + } + ev := &binlogdatapb.VEvent{ + Type: binlogdatapb.VEventType_FIELD, + FieldEvent: &fe, + } + return ev.String() +} + // TestQuery represents a database query and the expected events it generates. type TestQuery struct { query string @@ -95,19 +139,21 @@ type TestRowChange struct { // TestRowEventSpec is used for defining a custom row event. type TestRowEventSpec struct { - table string - changes []TestRowChange + table string + changes []TestRowChange + keyspace string + shard string } // Generates a string representation for a custom row event. func (s *TestRowEventSpec) String() string { - ev := &binlogdata.RowEvent{ + ev := &binlogdatapb.RowEvent{ TableName: s.table, } - var rowChanges []*binlogdata.RowChange + var rowChanges []*binlogdatapb.RowChange if s.changes != nil && len(s.changes) > 0 { for _, c := range s.changes { - rowChange := binlogdata.RowChange{} + rowChange := binlogdatapb.RowChange{} if c.before != nil && len(c.before) > 0 { rowChange.Before = &query.Row{} for _, val := range c.before { @@ -126,8 +172,18 @@ func (s *TestRowEventSpec) String() string { } ev.RowChanges = rowChanges } - vEvent := &binlogdata.VEvent{ - Type: binlogdata.VEventType_ROW, + if !ignoreKeyspaceShardInFieldAndRowEvents { + ev.Keyspace = testenv.DBName + ev.Shard = "0" // this is the default shard + if s.keyspace != "" { + ev.Keyspace = s.keyspace + } + if s.shard != "" { + ev.Shard = s.shard + } + } + vEvent := &binlogdatapb.VEvent{ + Type: binlogdatapb.VEventType_ROW, RowEvent: ev, } return vEvent.String() @@ -136,15 +192,20 @@ func (s *TestRowEventSpec) String() string { // TestRowEvent is used to define either the actual row event string (the `event` field) or a custom row event // (the `spec` field). Only one should be specified. If a test validates `flags` of a RowEvent then it is set. type TestRowEvent struct { - event string - spec *TestRowEventSpec - flags int + event string + spec *TestRowEventSpec + flags int + restart bool // if set to true, it will start a new group of output events } // TestSpecOptions has any non-standard test-specific options which can modify the event generation behaviour. type TestSpecOptions struct { - noblob bool - filter *binlogdata.Filter + noblob bool // if set to true, it will skip blob and text columns in the row event + // by default the filter will be a "select * from table", set this to specify a custom one + // if filter is set, customFieldEvents need to be specified as well + filter *binlogdatapb.Filter + customFieldEvents bool + position string } // TestSpec is defined one per unit test. @@ -175,11 +236,13 @@ func (ts *TestSpec) setCurrentState(table string, row *query.Row) { } // Init() initializes the test. It creates the tables and sets up the internal state. -func (ts *TestSpec) Init() error { +func (ts *TestSpec) Init() { var err error if ts.inited { - return nil + return } + // setup SrvVschema watcher, if not already done + engine.watcherOnce.Do(engine.setWatch) defer func() { ts.inited = true }() if ts.options == nil { ts.options = &TestSpecOptions{} @@ -194,9 +257,7 @@ func (ts *TestSpec) Init() error { ts.ddls[i] = fmt.Sprintf("%s %s", ts.ddls[i], tableOptions) } ts.schema, err = schemadiff.NewSchemaFromQueries(schemadiff.NewTestEnv(), ts.ddls) - if err != nil { - return err - } + require.NoError(ts.t, err) ts.fieldEvents = make(map[string]*TestFieldEvent) ts.fieldEventsSent = make(map[string]bool) ts.state = make(map[string]*query.Row) @@ -208,7 +269,6 @@ func (ts *TestSpec) Init() error { execStatement(ts.t, ts.ddls[i]) fe := ts.getFieldEvent(t) ts.fieldEvents[t.Name()] = fe - var pkColumns []string var hasPK bool for _, index := range t.TableSpec.Indexes { @@ -229,7 +289,6 @@ func (ts *TestSpec) Init() error { ts.pkColumns[t.Name()] = pkColumns } engine.se.Reload(context.Background()) - return nil } // Close() should be called (via defer) at the end of the test to clean up the tables created in the test. @@ -267,10 +326,7 @@ func (ts *TestSpec) getBindVarsForInsert(stmt sqlparser.Statement) (string, map[ func (ts *TestSpec) getBindVarsForUpdate(stmt sqlparser.Statement) (string, map[string]string) { bv := make(map[string]string) upd := stmt.(*sqlparser.Update) - //buf := sqlparser.NewTrackedBuffer(nil) table := sqlparser.String(upd.TableExprs[0].(*sqlparser.AliasedTableExpr).Expr) - //upd.TableExprs[0].(*sqlparser.AliasedTableExpr).Expr.Format(buf) - //table := buf.String() fe, ok := ts.fieldEvents[table] require.True(ts.t, ok, "field event for table %s not found", table) index := int64(0) @@ -293,7 +349,7 @@ func (ts *TestSpec) getBindVarsForUpdate(stmt sqlparser.Statement) (string, map[ func (ts *TestSpec) Run() { require.NoError(ts.t, engine.se.Reload(context.Background())) if !ts.inited { - require.NoError(ts.t, ts.Init()) + ts.Init() } var testcases []testcase for _, t := range ts.tests { @@ -310,6 +366,10 @@ func (ts *TestSpec) Run() { (len(tq.events) > 0 && !(len(tq.events) == 1 && tq.events[0].event == "" && tq.events[0].spec == nil)): for _, e := range tq.events { + if e.restart { + tc.output = append(tc.output, output) + output = []string{} + } if e.event != "" { output = append(output, e.event) } else if e.spec != nil { @@ -345,27 +405,58 @@ func (ts *TestSpec) Run() { del := stmt.(*sqlparser.Delete) table = del.TableExprs[0].(*sqlparser.AliasedTableExpr).As.String() default: - require.FailNowf(ts.t, "unsupported statement type", "stmt: %s", stmt) + _, ok := stmt.(sqlparser.DDLStatement) + if !ok { + require.FailNowf(ts.t, "unsupported statement type", "stmt: %s", stmt) + } + output = append(output, "gtid") + output = append(output, ts.getDDLEvent(tq.query)) } if isRowEvent { fe := ts.fieldEvents[table] if fe == nil { require.FailNowf(ts.t, "field event for table %s not found", table) } - if !ts.fieldEventsSent[table] { + // for the first row event, we send the field event as well, if a custom field event is not specified + if !ts.options.customFieldEvents && !ts.fieldEventsSent[table] { output = append(output, fe.String()) ts.fieldEventsSent[table] = true } output = append(output, ts.getRowEvent(table, bv, fe, stmt, uint32(flags))) } } - } tc.input = input tc.output = append(tc.output, output) testcases = append(testcases, tc) } - runCases(ts.t, ts.options.filter, testcases, "current", nil) + startPos := "current" + if ts.options.position != "" { + startPos = ts.options.position + } + runCases(ts.t, ts.options.filter, testcases, startPos, nil) +} + +func (ts *TestSpec) getDDLEvent(query string) string { + ddlEvent := &binlogdatapb.VEvent{ + Type: binlogdatapb.VEventType_DDL, + Statement: query, + } + return ddlEvent.String() +} + +func (ts *TestSpec) reloadSchema() { + engine.se.Reload(context.Background()) + var ddls []string + for _, table := range ts.tables { + showCreateTableDDL := fmt.Sprintf("show create table %s", table) + qr, err := env.Mysqld.FetchSuperQuery(context.Background(), showCreateTableDDL) + require.NoError(ts.t, err) + ddls = append(ddls, qr.Rows[0][1].ToString()) + } + var err error + ts.schema, err = schemadiff.NewSchemaFromQueries(schemadiff.NewTestEnv(), ddls) + require.NoError(ts.t, err) } func (ts *TestSpec) getFieldEvent(table *schemadiff.CreateTableEntity) *TestFieldEvent { @@ -407,8 +498,8 @@ func (ts *TestSpec) getFieldEvent(table *schemadiff.CreateTableEntity) *TestFiel tc.colType = fmt.Sprintf("%s(%d)", tc.dataTypeLowered, l) case "blob": tc.len = lengthBlob - tc.colType = "blob" tc.collationID = collations.CollationBinaryID + tc.colType = "blob" case "text": tc.len = lengthText tc.colType = "text" @@ -462,9 +553,9 @@ func (ts *TestSpec) getMetadataMap(table string, col *TestColumn, value string) } func (ts *TestSpec) getRowEvent(table string, bv map[string]string, fe *TestFieldEvent, stmt sqlparser.Statement, flags uint32) string { - ev := &binlogdata.RowEvent{ + ev := &binlogdatapb.RowEvent{ TableName: table, - RowChanges: []*binlogdata.RowChange{ + RowChanges: []*binlogdatapb.RowChange{ { Before: nil, After: nil, @@ -472,6 +563,10 @@ func (ts *TestSpec) getRowEvent(table string, bv map[string]string, fe *TestFiel }, Flags: flags, } + if !ignoreKeyspaceShardInFieldAndRowEvents { + ev.Keyspace = testenv.DBName + ev.Shard = "0" // this is the default shard + } var row query.Row for i, col := range fe.cols { if fe.cols[i].skip { @@ -492,16 +587,16 @@ func (ts *TestSpec) getRowEvent(table string, bv map[string]string, fe *TestFiel row.Lengths = append(row.Lengths, l) } ev.RowChanges = ts.getRowChanges(table, stmt, &row) - vEvent := &binlogdata.VEvent{ - Type: binlogdata.VEventType_ROW, + vEvent := &binlogdatapb.VEvent{ + Type: binlogdatapb.VEventType_ROW, RowEvent: ev, } return vEvent.String() } -func (ts *TestSpec) getRowChanges(table string, stmt sqlparser.Statement, row *query.Row) []*binlogdata.RowChange { - var rowChanges []*binlogdata.RowChange - var rowChange binlogdata.RowChange +func (ts *TestSpec) getRowChanges(table string, stmt sqlparser.Statement, row *query.Row) []*binlogdatapb.RowChange { + var rowChanges []*binlogdatapb.RowChange + var rowChange binlogdatapb.RowChange switch stmt.(type) { case *sqlparser.Insert: rowChange.After = row @@ -517,8 +612,8 @@ func (ts *TestSpec) getRowChanges(table string, stmt sqlparser.Statement, row *q return rowChanges } -func (ts *TestSpec) getRowChangeForUpdate(table string, newState *query.Row) *binlogdata.RowChange { - var rowChange binlogdata.RowChange +func (ts *TestSpec) getRowChangeForUpdate(table string, newState *query.Row) *binlogdatapb.RowChange { + var rowChange binlogdatapb.RowChange var bitmap byte var before, after query.Row @@ -566,7 +661,7 @@ func (ts *TestSpec) getRowChangeForUpdate(table string, newState *query.Row) *bi rowChange.Before = &before rowChange.After = &after if hasSkip { - rowChange.DataColumns = &binlogdata.RowChange_Bitmap{ + rowChange.DataColumns = &binlogdatapb.RowChange_Bitmap{ Count: int64(len(currentState.Lengths)), Cols: []byte{bitmap}, } @@ -597,3 +692,82 @@ func (ts *TestSpec) getBefore(table string) *query.Row { } return &row } + +func (ts *TestSpec) Reset() { + for table := range ts.fieldEvents { + ts.fieldEventsSent[table] = false + } +} + +func (ts *TestSpec) SetStartPosition(pos string) { + ts.options.position = pos +} + +func getRowEvent(ts *TestSpec, fe *TestFieldEvent, query string) string { + stmt, err := sqlparser.NewTestParser().Parse(query) + var bv map[string]string + var table string + switch stmt.(type) { + case *sqlparser.Insert: + table, bv = ts.getBindVarsForInsert(stmt) + default: + panic("unhandled statement type for query " + query) + } + require.NoError(ts.t, err) + return ts.getRowEvent(table, bv, fe, stmt, 0) +} + +func getLastPKEvent(table, colName string, colType query.Type, colValue []sqltypes.Value, collationId, flags uint32) string { + lastPK := getQRFromLastPK([]*query.Field{{Name: colName, + Type: colType, Charset: collationId, + Flags: flags}}, colValue) + ev := &binlogdatapb.VEvent{ + Type: binlogdatapb.VEventType_LASTPK, + LastPKEvent: &binlogdatapb.LastPKEvent{ + TableLastPK: &binlogdatapb.TableLastPK{TableName: table, Lastpk: lastPK}, + }, + } + return ev.String() +} + +func getCopyCompletedEvent(table string) string { + ev := &binlogdatapb.VEvent{ + Type: binlogdatapb.VEventType_LASTPK, + LastPKEvent: &binlogdatapb.LastPKEvent{ + Completed: true, + TableLastPK: &binlogdatapb.TableLastPK{TableName: table}, + }, + } + return ev.String() +} + +func getQueryType(strType string) query.Type { + switch strType { + case "INT32": + return query.Type_INT32 + case "INT64": + return query.Type_INT64 + case "UINT64": + return query.Type_UINT64 + case "UINT32": + return query.Type_UINT32 + case "VARBINARY": + return query.Type_VARBINARY + case "BINARY": + return query.Type_BINARY + case "VARCHAR": + return query.Type_VARCHAR + case "CHAR": + return query.Type_CHAR + case "TEXT": + return query.Type_TEXT + case "BLOB": + return query.Type_BLOB + case "ENUM": + return query.Type_ENUM + case "SET": + return query.Type_SET + default: + panic("unknown type " + strType) + } +} diff --git a/go/vt/vttablet/tabletserver/vstreamer/main_flaky_test.go b/go/vt/vttablet/tabletserver/vstreamer/main_flaky_test.go deleted file mode 100644 index d4b8e62341a..00000000000 --- a/go/vt/vttablet/tabletserver/vstreamer/main_flaky_test.go +++ /dev/null @@ -1,110 +0,0 @@ -/* -Copyright 2019 The Vitess Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package vstreamer - -import ( - "context" - "fmt" - "os" - "testing" - - "github.com/stretchr/testify/require" - - _flag "vitess.io/vitess/go/internal/flag" - "vitess.io/vitess/go/mysql" - "vitess.io/vitess/go/vt/dbconfigs" - "vitess.io/vitess/go/vt/vtenv" - "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" - "vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer/testenv" -) - -var ( - engine *Engine - env *testenv.Env - - ignoreKeyspaceShardInFieldAndRowEvents bool - testRowEventFlags bool -) - -func TestMain(m *testing.M) { - _flag.ParseFlagsForTest() - ignoreKeyspaceShardInFieldAndRowEvents = true - - exitCode := func() int { - var err error - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - env, err = testenv.Init(ctx) - if err != nil { - fmt.Fprintf(os.Stderr, "%v", err) - return 1 - } - defer env.Close() - - // engine cannot be initialized in testenv because it introduces - // circular dependencies - engine = NewEngine(env.TabletEnv, env.SrvTopo, env.SchemaEngine, nil, env.Cells[0]) - engine.InitDBConfig(env.KeyspaceName, env.ShardName) - engine.Open() - defer engine.Close() - - return m.Run() - }() - os.Exit(exitCode) -} - -func newEngine(t *testing.T, ctx context.Context, binlogRowImage string) { - if engine != nil { - engine.Close() - } - if env != nil { - env.Close() - } - var err error - env, err = testenv.Init(ctx) - require.NoError(t, err) - - setBinlogRowImage(t, binlogRowImage) - - // engine cannot be initialized in testenv because it introduces - // circular dependencies - engine = NewEngine(env.TabletEnv, env.SrvTopo, env.SchemaEngine, nil, env.Cells[0]) - engine.InitDBConfig(env.KeyspaceName, env.ShardName) - engine.Open() -} - -func customEngine(t *testing.T, modifier func(mysql.ConnParams) mysql.ConnParams) *Engine { - original, err := env.Dbcfgs.AppWithDB().MysqlParams() - require.NoError(t, err) - modified := modifier(*original) - cfg := env.TabletEnv.Config().Clone() - cfg.DB = dbconfigs.NewTestDBConfigs(modified, modified, modified.DbName) - - engine := NewEngine(tabletenv.NewEnv(vtenv.NewTestEnv(), cfg, "VStreamerTest"), env.SrvTopo, env.SchemaEngine, nil, env.Cells[0]) - engine.InitDBConfig(env.KeyspaceName, env.ShardName) - engine.Open() - return engine -} - -func setBinlogRowImage(t *testing.T, mode string) { - execStatements(t, []string{ - fmt.Sprintf("set @@binlog_row_image='%s'", mode), - fmt.Sprintf("set @@session.binlog_row_image='%s'", mode), - fmt.Sprintf("set @@global.binlog_row_image='%s'", mode), - }) - -} diff --git a/go/vt/vttablet/tabletserver/vstreamer/main_test.go b/go/vt/vttablet/tabletserver/vstreamer/main_test.go new file mode 100644 index 00000000000..8ec639d47c4 --- /dev/null +++ b/go/vt/vttablet/tabletserver/vstreamer/main_test.go @@ -0,0 +1,368 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vstreamer + +import ( + "context" + "fmt" + "io" + "os" + "sync" + "testing" + "time" + + "vitess.io/vitess/go/mysql/replication" + "vitess.io/vitess/go/vt/log" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" + + "github.com/stretchr/testify/require" + + _flag "vitess.io/vitess/go/internal/flag" + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/vt/dbconfigs" + "vitess.io/vitess/go/vt/vtenv" + "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" + "vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer/testenv" +) + +var ( + engine *Engine + env *testenv.Env + + ignoreKeyspaceShardInFieldAndRowEvents bool + testRowEventFlags bool +) + +func TestMain(m *testing.M) { + _flag.ParseFlagsForTest() + ignoreKeyspaceShardInFieldAndRowEvents = true + + exitCode := func() int { + var err error + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + env, err = testenv.Init(ctx) + if err != nil { + fmt.Fprintf(os.Stderr, "%v", err) + return 1 + } + defer env.Close() + + // engine cannot be initialized in testenv because it introduces + // circular dependencies + engine = NewEngine(env.TabletEnv, env.SrvTopo, env.SchemaEngine, nil, env.Cells[0]) + engine.InitDBConfig(env.KeyspaceName, env.ShardName) + engine.Open() + defer engine.Close() + + return m.Run() + }() + os.Exit(exitCode) +} + +func newEngine(t *testing.T, ctx context.Context, binlogRowImage string) { + if engine != nil { + engine.Close() + } + if env != nil { + env.Close() + } + var err error + env, err = testenv.Init(ctx) + require.NoError(t, err) + + setBinlogRowImage(t, binlogRowImage) + + // engine cannot be initialized in testenv because it introduces + // circular dependencies + engine = NewEngine(env.TabletEnv, env.SrvTopo, env.SchemaEngine, nil, env.Cells[0]) + engine.InitDBConfig(env.KeyspaceName, env.ShardName) + engine.Open() +} + +func customEngine(t *testing.T, modifier func(mysql.ConnParams) mysql.ConnParams) *Engine { + original, err := env.Dbcfgs.AppWithDB().MysqlParams() + require.NoError(t, err) + modified := modifier(*original) + cfg := env.TabletEnv.Config().Clone() + cfg.DB = dbconfigs.NewTestDBConfigs(modified, modified, modified.DbName) + + engine := NewEngine(tabletenv.NewEnv(vtenv.NewTestEnv(), cfg, "VStreamerTest"), env.SrvTopo, env.SchemaEngine, nil, env.Cells[0]) + engine.InitDBConfig(env.KeyspaceName, env.ShardName) + engine.Open() + return engine +} + +func setBinlogRowImage(t *testing.T, mode string) { + execStatements(t, []string{ + fmt.Sprintf("set @@binlog_row_image='%s'", mode), + fmt.Sprintf("set @@session.binlog_row_image='%s'", mode), + fmt.Sprintf("set @@global.binlog_row_image='%s'", mode), + }) +} + +func runCases(t *testing.T, filter *binlogdatapb.Filter, testcases []testcase, position string, tablePK []*binlogdatapb.TableLastPK) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + wg, ch := startStream(ctx, t, filter, position, tablePK) + defer wg.Wait() + // If position is 'current', we wait for a heartbeat to be + // sure the vstreamer has started. + if position == "current" { + log.Infof("Starting stream with current position") + expectLog(ctx, t, "current pos", ch, [][]string{{`gtid`, `type:OTHER`}}) + } + log.Infof("Starting to run test cases") + for _, tcase := range testcases { + switch input := tcase.input.(type) { + case []string: + execStatements(t, input) + case string: + execStatement(t, input) + default: + t.Fatalf("unexpected input: %#v", input) + } + engine.se.Reload(ctx) + expectLog(ctx, t, tcase.input, ch, tcase.output) + } + cancel() + if evs, ok := <-ch; ok { + t.Fatalf("unexpected evs: %v", evs) + } + log.Infof("Last line of runCases") +} + +func expectLog(ctx context.Context, t *testing.T, input any, ch <-chan []*binlogdatapb.VEvent, output [][]string) { + timer := time.NewTimer(1 * time.Minute) + defer timer.Stop() + for _, wantset := range output { + var evs []*binlogdatapb.VEvent + for { + select { + case allevs, ok := <-ch: + if !ok { + require.FailNow(t, "expectLog: not ok, stream ended early") + } + for _, ev := range allevs { + // Ignore spurious heartbeats that can happen on slow machines. + if ev.Type == binlogdatapb.VEventType_HEARTBEAT { + continue + } + if ev.Throttled { + continue + } + evs = append(evs, ev) + } + case <-ctx.Done(): + t.Fatalf("expectLog: Done(), stream ended early") + case <-timer.C: + t.Fatalf("expectLog: timed out waiting for events: %v", wantset) + } + if len(evs) != 0 { + break + } + } + + numEventsToMatch := len(evs) + if len(wantset) != len(evs) { + log.Warningf("%v: evs\n%v, want\n%v, >> got length %d, wanted length %d", input, evs, wantset, len(evs), len(wantset)) + if len(wantset) < len(evs) { + numEventsToMatch = len(wantset) + } + } + for i := 0; i < numEventsToMatch; i++ { + want := wantset[i] + // CurrentTime is not testable. + evs[i].CurrentTime = 0 + evs[i].Keyspace = "" + evs[i].Shard = "" + switch want { + case "begin": + if evs[i].Type != binlogdatapb.VEventType_BEGIN { + t.Fatalf("%v (%d): event: %v, want begin", input, i, evs[i]) + } + case "gtid": + if evs[i].Type != binlogdatapb.VEventType_GTID { + t.Fatalf("%v (%d): event: %v, want gtid", input, i, evs[i]) + } + case "lastpk": + if evs[i].Type != binlogdatapb.VEventType_LASTPK { + t.Fatalf("%v (%d): event: %v, want lastpk", input, i, evs[i]) + } + case "commit": + if evs[i].Type != binlogdatapb.VEventType_COMMIT { + t.Fatalf("%v (%d): event: %v, want commit", input, i, evs[i]) + } + case "other": + if evs[i].Type != binlogdatapb.VEventType_OTHER { + t.Fatalf("%v (%d): event: %v, want other", input, i, evs[i]) + } + case "ddl": + if evs[i].Type != binlogdatapb.VEventType_DDL { + t.Fatalf("%v (%d): event: %v, want ddl", input, i, evs[i]) + } + case "copy_completed": + if evs[i].Type != binlogdatapb.VEventType_COPY_COMPLETED { + t.Fatalf("%v (%d): event: %v, want copy_completed", input, i, evs[i]) + } + default: + evs[i].Timestamp = 0 + if evs[i].Type == binlogdatapb.VEventType_FIELD { + for j := range evs[i].FieldEvent.Fields { + evs[i].FieldEvent.Fields[j].Flags = 0 + if ignoreKeyspaceShardInFieldAndRowEvents { + evs[i].FieldEvent.Keyspace = "" + evs[i].FieldEvent.Shard = "" + } + } + } + if ignoreKeyspaceShardInFieldAndRowEvents && evs[i].Type == binlogdatapb.VEventType_ROW { + evs[i].RowEvent.Keyspace = "" + evs[i].RowEvent.Shard = "" + } + if !testRowEventFlags && evs[i].Type == binlogdatapb.VEventType_ROW { + evs[i].RowEvent.Flags = 0 + } + want = env.RemoveAnyDeprecatedDisplayWidths(want) + if got := fmt.Sprintf("%v", evs[i]); got != want { + log.Errorf("%v (%d): event:\n%q, want\n%q", input, i, got, want) + t.Fatalf("%v (%d): event:\n%q, want\n%q", input, i, got, want) + } + } + } + if len(wantset) != len(evs) { + t.Fatalf("%v: evs\n%v, want\n%v, got length %d, wanted length %d", input, evs, wantset, len(evs), len(wantset)) + } + } +} + +func startStream(ctx context.Context, t *testing.T, filter *binlogdatapb.Filter, position string, tablePKs []*binlogdatapb.TableLastPK) (*sync.WaitGroup, <-chan []*binlogdatapb.VEvent) { + switch position { + case "": + position = primaryPosition(t) + case "vscopy": + position = "" + } + + wg := sync.WaitGroup{} + wg.Add(1) + ch := make(chan []*binlogdatapb.VEvent) + + go func() { + defer close(ch) + defer wg.Done() + if vstream(ctx, t, position, tablePKs, filter, ch) != nil { + t.Log("vstream returned error") + } + }() + return &wg, ch +} + +func vstream(ctx context.Context, t *testing.T, pos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, ch chan []*binlogdatapb.VEvent) error { + if filter == nil { + filter = &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "/.*/", + }}, + } + } + return engine.Stream(ctx, pos, tablePKs, filter, throttlerapp.VStreamerName, func(evs []*binlogdatapb.VEvent) error { + timer := time.NewTimer(2 * time.Second) + defer timer.Stop() + + log.Infof("Received events: %v", evs) + select { + case ch <- evs: + case <-ctx.Done(): + return fmt.Errorf("engine.Stream Done() stream ended early") + case <-timer.C: + t.Log("VStream timed out waiting for events") + return io.EOF + } + return nil + }) +} + +func execStatement(t *testing.T, query string) { + t.Helper() + if err := env.Mysqld.ExecuteSuperQuery(context.Background(), query); err != nil { + t.Fatal(err) + } +} + +func execStatements(t *testing.T, queries []string) { + if err := env.Mysqld.ExecuteSuperQueryList(context.Background(), queries); err != nil { + t.Fatal(err) + } +} + +func primaryPosition(t *testing.T) string { + t.Helper() + // We use the engine's cp because there is one test that overrides + // the flavor to FilePos. If so, we have to obtain the position + // in that flavor format. + connParam, err := engine.env.Config().DB.DbaWithDB().MysqlParams() + if err != nil { + t.Fatal(err) + } + conn, err := mysql.Connect(context.Background(), connParam) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + pos, err := conn.PrimaryPosition() + if err != nil { + t.Fatal(err) + } + return replication.EncodePosition(pos) +} + +func setVSchema(t *testing.T, vschema string) { + t.Helper() + + curCount := engine.vschemaUpdates.Get() + if err := env.SetVSchema(vschema); err != nil { + t.Fatal(err) + } + // Wait for curCount to go up. + updated := false + for i := 0; i < 10; i++ { + if engine.vschemaUpdates.Get() != curCount { + updated = true + break + } + time.Sleep(10 * time.Millisecond) + } + if !updated { + log.Infof("vschema did not get updated") + t.Error("vschema did not get updated") + } +} + +func insertSomeRows(t *testing.T, numRows int) { + var queries []string + for idx, query := range []string{ + "insert into t1 (id11, id12) values", + "insert into t2 (id21, id22) values", + } { + for i := 1; i <= numRows; i++ { + queries = append(queries, fmt.Sprintf("%s (%d, %d)", query, i, i*(idx+1)*10)) + } + } + execStatements(t, queries) +} diff --git a/go/vt/vttablet/tabletserver/vstreamer/testenv/testenv.go b/go/vt/vttablet/tabletserver/vstreamer/testenv/testenv.go index 125fa75416f..35c65cf831d 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/testenv/testenv.go +++ b/go/vt/vttablet/tabletserver/vstreamer/testenv/testenv.go @@ -43,7 +43,11 @@ import ( vttestpb "vitess.io/vitess/go/vt/proto/vttest" ) -const DBName = "vttest" +const ( + DBName = "vttest" + DefaultCollationName = "utf8mb4_0900_ai_ci" + DefaultShard = "0" +) var ( // These are exported to coordinate on version specific diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go index f64c6699217..ffa757d277f 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go @@ -33,7 +33,7 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/mysql/collations" - "vitess.io/vitess/go/mysql/replication" + "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" @@ -57,25 +57,6 @@ func checkIfOptionIsSupported(t *testing.T, variable string) bool { return false } -func (tfe *TestFieldEvent) String() string { - s := fmt.Sprintf("type:FIELD field_event:{table_name:\"%s\"", tfe.table) - fld := "" - for _, col := range tfe.cols { - if col.skip { - continue - } - fld += fmt.Sprintf(" fields:{name:\"%s\" type:%s table:\"%s\" org_table:\"%s\" database:\"%s\" org_name:\"%s\" column_length:%d charset:%d", - col.name, col.dataType, tfe.table, tfe.table, tfe.db, col.name, col.len, col.collationID) - if col.colType != "" { - fld += fmt.Sprintf(" column_type:\"%s\"", col.colType) - } - fld += "}" - } - s += fld - s += "}" - return s -} - // TestPlayerNoBlob sets up a new environment with mysql running with // binlog_row_image as noblob. It confirms that the VEvents created are // correct: that they don't contain the missing columns and that the @@ -114,7 +95,7 @@ func TestNoBlob(t *testing.T) { }, } defer ts.Close() - require.NoError(t, ts.Init()) + ts.Init() ts.tests = [][]*TestQuery{{ {"begin", nil}, {"insert into t1 values (1, 'blob1', 'aaa')", nil}, @@ -141,7 +122,7 @@ func TestSetAndEnum(t *testing.T) { }, } defer ts.Close() - require.NoError(t, ts.Init()) + ts.Init() ts.tests = [][]*TestQuery{{ {"begin", nil}, {"insert into t1 values (1, 'aaa', 'red,blue', 'S')", nil}, @@ -159,11 +140,10 @@ func TestCellValuePadding(t *testing.T) { ddls: []string{ "create table t1(id int, val binary(4), primary key(val))", "create table t2(id int, val char(4), primary key(val))", - "create table t3(id int, val char(4) collate utf8mb4_bin, primary key(val))", - }, + "create table t3(id int, val char(4) collate utf8mb4_bin, primary key(val))"}, } defer ts.Close() - require.NoError(t, ts.Init()) + ts.Init() ts.tests = [][]*TestQuery{{ {"begin", nil}, {"insert into t1 values (1, 'aaa\000')", nil}, @@ -201,7 +181,7 @@ func TestColumnCollationHandling(t *testing.T) { }, } defer ts.Close() - require.NoError(t, ts.Init()) + ts.Init() ts.tests = [][]*TestQuery{{ {"begin", nil}, {"insert into t1 values (1, 'aaa', 'aaa', 1, 'aaa', 'aaa', 'aaa')", nil}, @@ -214,6 +194,8 @@ func TestColumnCollationHandling(t *testing.T) { ts.Run() } +// This test is not ported to the new test framework because it only runs on old deprecated versions of MySQL. +// We leave the test for older flavors until we EOL them. func TestSetStatement(t *testing.T) { if !checkIfOptionIsSupported(t, "log_builtin_as_identified_by_password") { // the combination of setting this option and support for "set password" only works on a few flavors @@ -234,7 +216,7 @@ func TestSetStatement(t *testing.T) { "insert into t1 values (1, 'aaa')", "commit", "set global log_builtin_as_identified_by_password=1", - "SET PASSWORD FOR 'vt_appdebug'@'localhost'='*AA17DA66C7C714557F5485E84BCAFF2C209F2F53'", //select password('vtappdebug_password'); + "SET PASSWORD FOR 'vt_appdebug'@'localhost'='*AA17DA66C7C714557F5485E84BCAFF2C209F2F53'", // select password('vtappdebug_password'); } testcases := []testcase{{ input: queries, @@ -264,7 +246,7 @@ func TestSetForeignKeyCheck(t *testing.T) { }, } defer ts.Close() - require.NoError(t, ts.Init()) + ts.Init() ts.tests = [][]*TestQuery{{ {"begin", nil}, {"insert into t1 values (1, 'aaa')", []TestRowEvent{{flags: 1}}}, @@ -279,45 +261,30 @@ func TestSetForeignKeyCheck(t *testing.T) { } func TestStmtComment(t *testing.T) { - - if testing.Short() { - t.Skip() + ts := &TestSpec{ + t: t, + ddls: []string{ + "create table t1(id int, val varbinary(128), primary key(id))", + }, + options: nil, } + defer ts.Close() - execStatements(t, []string{ - "create table t1(id int, val varbinary(128), primary key(id))", - }) - defer execStatements(t, []string{ - "drop table t1", - }) - engine.se.Reload(context.Background()) - queries := []string{ - "begin", - "insert into t1 values (1, 'aaa')", - "commit", - "/*!40000 ALTER TABLE `t1` DISABLE KEYS */", - } - testcases := []testcase{{ - input: queries, - output: [][]string{{ - `begin`, - `type:FIELD field_event:{table_name:"t1" fields:{name:"id" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"}}`, - `type:ROW row_event:{table_name:"t1" row_changes:{after:{lengths:1 lengths:3 values:"1aaa"}}}`, - `gtid`, - `commit`, - }, { - `gtid`, - `other`, - }}, + ts.Init() + + ts.tests = [][]*TestQuery{{ + {"begin", nil}, + {"insert into t1 values (1, 'aaa')", nil}, + {"commit", nil}, + {"/*!40000 ALTER TABLE `t1` DISABLE KEYS */", []TestRowEvent{ + {restart: true, event: "gtid"}, + {event: "other"}}, + }, }} - runCases(t, nil, testcases, "current", nil) + ts.Run() } func TestVersion(t *testing.T) { - if testing.Short() { - t.Skip() - } - oldEngine := engine defer func() { engine = oldEngine @@ -364,42 +331,19 @@ func TestVersion(t *testing.T) { assert.True(t, proto.Equal(mt, dbSchema.Tables[0])) } -func insertLotsOfData(t *testing.T, numRows int) { - query1 := "insert into t1 (id11, id12) values" - s := "" - for i := 1; i <= numRows; i++ { - if s != "" { - s += "," - } - s += fmt.Sprintf("(%d,%d)", i, i*10) - } - query1 += s - query2 := "insert into t2 (id21, id22) values" - s = "" - for i := 1; i <= numRows; i++ { - if s != "" { - s += "," - } - s += fmt.Sprintf("(%d,%d)", i, i*20) - } - query2 += s - execStatements(t, []string{ - query1, - query2, - }) -} - func TestMissingTables(t *testing.T) { - if testing.Short() { - t.Skip() + ts := &TestSpec{ + t: t, + ddls: []string{ + "create table t1(id11 int, id12 int, primary key(id11))", + }, } - engine.se.Reload(context.Background()) + ts.Init() + defer ts.Close() execStatements(t, []string{ - "create table t1(id11 int, id12 int, primary key(id11))", "create table shortlived(id31 int, id32 int, primary key(id31))", }) defer execStatements(t, []string{ - "drop table t1", "drop table _shortlived", }) startPos := primaryPosition(t) @@ -414,6 +358,9 @@ func TestMissingTables(t *testing.T) { Filter: "select * from t1", }}, } + fe := ts.fieldEvents["t1"] + insert := "insert into t1 values (101, 1010)" + rowEvent := getRowEvent(ts, fe, insert) testcases := []testcase{ { input: []string{}, @@ -421,26 +368,11 @@ func TestMissingTables(t *testing.T) { }, { - input: []string{ - "insert into t1 values (101, 1010)", - }, + input: []string{insert}, output: [][]string{ - { - "begin", - "gtid", - "commit", - }, - { - "gtid", - "type:OTHER", - }, - { - "begin", - "type:FIELD field_event:{table_name:\"t1\" fields:{name:\"id11\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id11\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id12\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id12\" column_length:11 charset:63 column_type:\"int(11)\"}}", - "type:ROW row_event:{table_name:\"t1\" row_changes:{after:{lengths:3 lengths:4 values:\"1011010\"}}}", - "gtid", - "commit", - }, + {"begin", "gtid", "commit"}, + {"gtid", "type:OTHER"}, + {"begin", fe.String(), rowEvent, "gtid", "commit"}, }, }, } @@ -448,21 +380,20 @@ func TestMissingTables(t *testing.T) { } func TestVStreamCopySimpleFlow(t *testing.T) { - if testing.Short() { - t.Skip() + ts := &TestSpec{ + t: t, + ddls: []string{ + "create table t1(id11 int, id12 int, primary key(id11))", + "create table t2(id21 int, id22 int, primary key(id21))", + }, } - execStatements(t, []string{ - "create table t1(id11 int, id12 int, primary key(id11))", - "create table t2(id21 int, id22 int, primary key(id21))", - }) + ts.Init() + defer ts.Close() + log.Infof("Pos before bulk insert: %s", primaryPosition(t)) - insertLotsOfData(t, 10) + insertSomeRows(t, 10) log.Infof("Pos after bulk insert: %s", primaryPosition(t)) - defer execStatements(t, []string{ - "drop table t1", - "drop table t2", - }) - engine.se.Reload(context.Background()) + ctx := context.Background() qr, err := env.Mysqld.FetchSuperQuery(ctx, "SELECT count(*) as cnt from t1, t2 where t1.id11 = t2.id21") if err != nil { @@ -483,9 +414,23 @@ func TestVStreamCopySimpleFlow(t *testing.T) { var tablePKs []*binlogdatapb.TableLastPK tablePKs = append(tablePKs, getTablePK("t1", 1)) tablePKs = append(tablePKs, getTablePK("t2", 2)) + t1FieldEvent := &TestFieldEvent{ + table: "t1", + db: testenv.DBName, + cols: []*TestColumn{ + {name: "id11", dataType: "INT32", colType: "int(11)", len: 11, collationID: 63}, + {name: "id12", dataType: "INT32", colType: "int(11)", len: 11, collationID: 63}, + }, + } + t2FieldEvent := &TestFieldEvent{ + table: "t2", + db: testenv.DBName, + cols: []*TestColumn{ + {name: "id21", dataType: "INT32", colType: "int(11)", len: 11, collationID: 63}, + {name: "id22", dataType: "INT32", colType: "int(11)", len: 11, collationID: 63}, + }, + } - t1FieldEvent := []string{"begin", "type:FIELD field_event:{table_name:\"t1\" fields:{name:\"id11\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id11\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id12\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id12\" column_length:11 charset:63 column_type:\"int(11)\"}}"} - t2FieldEvent := []string{"begin", "type:FIELD field_event:{table_name:\"t2\" fields:{name:\"id21\" type:INT32 table:\"t2\" org_table:\"t2\" database:\"vttest\" org_name:\"id21\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id22\" type:INT32 table:\"t2\" org_table:\"t2\" database:\"vttest\" org_name:\"id22\" column_length:11 charset:63 column_type:\"int(11)\"}}"} t1Events := []string{} t2Events := []string{} for i := 1; i <= 10; i++ { @@ -499,21 +444,23 @@ func TestVStreamCopySimpleFlow(t *testing.T) { insertEvents1 := []string{ "begin", - "type:FIELD field_event:{table_name:\"t1\" fields:{name:\"id11\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id11\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id12\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id12\" column_length:11 charset:63 column_type:\"int(11)\"}}", - "type:ROW row_event:{table_name:\"t1\" row_changes:{after:{lengths:3 lengths:4 values:\"1011010\"}}}", + t1FieldEvent.String(), + getRowEvent(ts, t1FieldEvent, "insert into t1 values (101, 1010)"), "gtid", - "commit"} + "commit", + } insertEvents2 := []string{ "begin", - "type:FIELD field_event:{table_name:\"t2\" fields:{name:\"id21\" type:INT32 table:\"t2\" org_table:\"t2\" database:\"vttest\" org_name:\"id21\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id22\" type:INT32 table:\"t2\" org_table:\"t2\" database:\"vttest\" org_name:\"id22\" column_length:11 charset:63 column_type:\"int(11)\"}}", - "type:ROW row_event:{table_name:\"t2\" row_changes:{after:{lengths:3 lengths:4 values:\"2022020\"}}}", + t2FieldEvent.String(), + getRowEvent(ts, t2FieldEvent, "insert into t2 values (202, 2020)"), "gtid", - "commit"} + "commit", + } testcases := []testcase{ { input: []string{}, - output: [][]string{t1FieldEvent, {"gtid"}, t1Events, {"begin", "lastpk", "commit"}, t2FieldEvent, t2Events, {"begin", "lastpk", "commit"}, {"copy_completed"}}, + output: [][]string{{"begin", t1FieldEvent.String()}, {"gtid"}, t1Events, {"begin", "lastpk", "commit"}, {"begin", t2FieldEvent.String()}, t2Events, {"begin", "lastpk", "commit"}, {"copy_completed"}}, }, { @@ -535,20 +482,17 @@ func TestVStreamCopySimpleFlow(t *testing.T) { } func TestVStreamCopyWithDifferentFilters(t *testing.T) { - if testing.Short() { - t.Skip() + ts := &TestSpec{ + t: t, + ddls: []string{ + "create table t1(id1 int, id2 int, id3 int, primary key(id1)) charset=utf8mb4", + "create table t2a(id1 int, id2 int, primary key(id1)) charset=utf8mb4", + "create table t2b(id1 varchar(20), id2 int, primary key(id1)) charset=utf8mb4", + }, } - execStatements(t, []string{ - "create table t1(id1 int, id2 int, id3 int, primary key(id1)) ENGINE=InnoDB CHARSET=utf8mb4", - "create table t2a(id1 int, id2 int, primary key(id1)) ENGINE=InnoDB CHARSET=utf8mb4", - "create table t2b(id1 varchar(20), id2 int, primary key(id1)) ENGINE=InnoDB CHARSET=utf8mb4", - }) - defer execStatements(t, []string{ - "drop table t1", - "drop table t2a", - "drop table t2b", - }) - engine.se.Reload(context.Background()) + ts.Init() + defer ts.Close() + ctx, cancel := context.WithCancel(context.Background()) defer cancel() filter := &binlogdatapb.Filter{ @@ -560,6 +504,15 @@ func TestVStreamCopyWithDifferentFilters(t *testing.T) { }}, } + t1FieldEvent := &TestFieldEvent{ + table: "t1", + db: testenv.DBName, + cols: []*TestColumn{ + {name: "id1", dataType: "INT32", colType: "int(11)", len: 11, collationID: 63}, + {name: "id2", dataType: "INT32", colType: "int(11)", len: 11, collationID: 63}, + }, + } + execStatements(t, []string{ "insert into t1(id1, id2, id3) values (1, 2, 3)", "insert into t2a(id1, id2) values (1, 4)", @@ -568,32 +521,32 @@ func TestVStreamCopyWithDifferentFilters(t *testing.T) { }) var expectedEvents = []string{ - "type:BEGIN", - "type:FIELD field_event:{table_name:\"t1\" fields:{name:\"id1\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id1\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id2\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id2\" column_length:11 charset:63 column_type:\"int(11)\"}}", - "type:GTID", - "type:ROW row_event:{table_name:\"t1\" row_changes:{after:{lengths:1 lengths:1 values:\"12\"}}}", - "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t1\" lastpk:{fields:{name:\"id1\" type:INT32 charset:63 flags:53251} rows:{lengths:1 values:\"1\"}}}}", - "type:COMMIT", - "type:BEGIN", - "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t1\"} completed:true}", - "type:COMMIT", - "type:BEGIN", - "type:FIELD field_event:{table_name:\"t2a\" fields:{name:\"id1\" type:INT32 table:\"t2a\" org_table:\"t2a\" database:\"vttest\" org_name:\"id1\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id2\" type:INT32 table:\"t2a\" org_table:\"t2a\" database:\"vttest\" org_name:\"id2\" column_length:11 charset:63 column_type:\"int(11)\"}}", - "type:ROW row_event:{table_name:\"t2a\" row_changes:{after:{lengths:1 lengths:1 values:\"14\"}}}", - "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t2a\" lastpk:{fields:{name:\"id1\" type:INT32 charset:63 flags:53251} rows:{lengths:1 values:\"1\"}}}}", - "type:COMMIT", - "type:BEGIN", - "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t2a\"} completed:true}", - "type:COMMIT", - "type:BEGIN", - fmt.Sprintf("type:FIELD field_event:{table_name:\"t2b\" fields:{name:\"id1\" type:VARCHAR table:\"t2b\" org_table:\"t2b\" database:\"vttest\" org_name:\"id1\" column_length:80 charset:%d column_type:\"varchar(20)\"} fields:{name:\"id2\" type:INT32 table:\"t2b\" org_table:\"t2b\" database:\"vttest\" org_name:\"id2\" column_length:11 charset:63 column_type:\"int(11)\"}}", testenv.DefaultCollationID), - "type:ROW row_event:{table_name:\"t2b\" row_changes:{after:{lengths:1 lengths:1 values:\"a5\"}}}", - "type:ROW row_event:{table_name:\"t2b\" row_changes:{after:{lengths:1 lengths:1 values:\"b6\"}}}", - fmt.Sprintf("type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t2b\" lastpk:{fields:{name:\"id1\" type:VARCHAR charset:%d flags:20483} rows:{lengths:1 values:\"b\"}}}}", testenv.DefaultCollationID), - "type:COMMIT", - "type:BEGIN", - "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t2b\"} completed:true}", - "type:COMMIT", + "begin", + t1FieldEvent.String(), + "gtid", + getRowEvent(ts, t1FieldEvent, "insert into t1 values (1, 2)"), + getLastPKEvent("t1", "id1", sqltypes.Int32, []sqltypes.Value{sqltypes.NewInt32(1)}, collations.CollationBinaryID, uint32(53251)), + "commit", + "begin", + getCopyCompletedEvent("t1"), + "commit", + "begin", + ts.fieldEvents["t2a"].String(), + getRowEvent(ts, ts.fieldEvents["t2a"], "insert into t2a values (1, 4)"), + getLastPKEvent("t2a", "id1", sqltypes.Int32, []sqltypes.Value{sqltypes.NewInt32(1)}, collations.CollationBinaryID, uint32(53251)), + "commit", + "begin", + getCopyCompletedEvent("t2a"), + "commit", + "begin", + ts.fieldEvents["t2b"].String(), + getRowEvent(ts, ts.fieldEvents["t2b"], "insert into t2b values ('a', 5)"), + getRowEvent(ts, ts.fieldEvents["t2b"], "insert into t2b values ('b', 6)"), + getLastPKEvent("t2b", "id1", sqltypes.VarChar, []sqltypes.Value{sqltypes.NewVarChar("b")}, uint32(testenv.DefaultCollationID), uint32(20483)), + "commit", + "begin", + getCopyCompletedEvent("t2b"), + "commit", } var allEvents []*binlogdatapb.VEvent @@ -630,11 +583,16 @@ func TestVStreamCopyWithDifferentFilters(t *testing.T) { ev.RowEvent.Keyspace = "" ev.RowEvent.Shard = "" } + ev.Keyspace = "" + ev.Shard = "" got := ev.String() want := expectedEvents[i] - - want = env.RemoveAnyDeprecatedDisplayWidths(want) - + switch want { + case "begin", "commit", "gtid": + want = fmt.Sprintf("type:%s", strings.ToUpper(want)) + default: + want = env.RemoveAnyDeprecatedDisplayWidths(want) + } if !strings.HasPrefix(got, want) { errGoroutine = fmt.Errorf("event %d did not match, want %s, got %s", i, want, got) return errGoroutine @@ -669,7 +627,7 @@ func TestFilteredVarBinary(t *testing.T) { }, } defer ts.Close() - require.NoError(t, ts.Init()) + ts.Init() ts.tests = [][]*TestQuery{{ {"begin", nil}, {"insert into t1 values (1, 'kepler')", noEvents}, @@ -714,7 +672,7 @@ func TestFilteredInt(t *testing.T) { }, } defer ts.Close() - require.NoError(t, ts.Init()) + ts.Init() ts.fieldEvents["t1"].cols[1].skip = true ts.tests = [][]*TestQuery{{ {"begin", nil}, @@ -752,7 +710,7 @@ func TestSavepoint(t *testing.T) { }, } defer ts.Close() - require.NoError(t, ts.Init()) + ts.Init() ts.tests = [][]*TestQuery{{ {"begin", nil}, {"insert into stream1 values (1, 'aaa')", nil}, @@ -787,7 +745,7 @@ func TestSavepointWithFilter(t *testing.T) { }, } defer ts.Close() - require.NoError(t, ts.Init()) + ts.Init() ts.tests = [][]*TestQuery{{ {"begin", nil}, {"insert into stream1 values (1, 'aaa')", noEvents}, @@ -828,90 +786,68 @@ func TestSavepointWithFilter(t *testing.T) { } func TestStatements(t *testing.T) { - if testing.Short() { - t.Skip() + ts := &TestSpec{ + t: t, + ddls: []string{ + "create table stream1(id int, val varbinary(128), primary key(id))", + "create table stream2(id int, val varbinary(128), primary key(id))", + }, } - - execStatements(t, []string{ - "create table stream1(id int, val varbinary(128), primary key(id))", - "create table stream2(id int, val varbinary(128), primary key(id))", - }) - defer execStatements(t, []string{ - "drop table stream1", - "drop table stream2", - }) - engine.se.Reload(context.Background()) - - testcases := []testcase{{ - input: []string{ - "begin", - "insert into stream1 values (1, 'aaa')", - "update stream1 set val='bbb' where id = 1", - "commit", + defer ts.Close() + ts.Init() + fe := &TestFieldEvent{ + table: "stream1", + db: testenv.DBName, + cols: []*TestColumn{ + {name: "id", dataType: "INT32", colType: "int(11)", len: 11, collationID: 63}, + {name: "val", dataType: "VARBINARY", colType: "varbinary(256)", len: 256, collationID: 63}, }, - output: [][]string{{ - `begin`, - `type:FIELD field_event:{table_name:"stream1" fields:{name:"id" type:INT32 table:"stream1" org_table:"stream1" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"stream1" org_table:"stream1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"}}`, - `type:ROW row_event:{table_name:"stream1" row_changes:{after:{lengths:1 lengths:3 values:"1aaa"}}}`, - `type:ROW row_event:{table_name:"stream1" row_changes:{before:{lengths:1 lengths:3 values:"1aaa"} after:{lengths:1 lengths:3 values:"1bbb"}}}`, - `gtid`, - `commit`, + } + ddlAlterWithPrefixAndSuffix := "/* prefix */ alter table stream1 change column val val varbinary(256) /* suffix */" + ddlTruncate := "truncate table stream2" + ddlReverseAlter := "/* prefix */ alter table stream1 change column val val varbinary(128) /* suffix */" + ts.tests = [][]*TestQuery{{ + {"begin", nil}, + {"insert into stream1 values (1, 'aaa')", nil}, + {"update stream1 set val='bbb' where id = 1", []TestRowEvent{ + {spec: &TestRowEventSpec{table: "stream1", changes: []TestRowChange{{before: []string{"1", "aaa"}, after: []string{"1", "bbb"}}}}}, }}, - }, { - // Normal DDL. - input: "alter table stream1 change column val val varbinary(128)", - output: [][]string{{ - `gtid`, - `type:DDL statement:"alter table stream1 change column val val varbinary(128)"`, + {"commit", nil}, + }, { // Normal DDL. + {"alter table stream1 change column val val varbinary(128)", nil}, + }, { // DDL padded with comments. + {ddlAlterWithPrefixAndSuffix, []TestRowEvent{ + {event: "gtid"}, + {event: ts.getDDLEvent(ddlAlterWithPrefixAndSuffix)}, }}, - }, { - // DDL padded with comments. - input: " /* prefix */ alter table stream1 change column val val varbinary(256) /* suffix */ ", - output: [][]string{{ - `gtid`, - `type:DDL statement:"/* prefix */ alter table stream1 change column val val varbinary(256) /* suffix */"`, + }, { // Multiple tables, and multiple rows changed per statement. + {"begin", nil}, + {"insert into stream1 values (2, 'bbb')", []TestRowEvent{ + {event: fe.String()}, + {spec: &TestRowEventSpec{table: "stream1", changes: []TestRowChange{{after: []string{"2", "bbb"}}}}}, }}, - }, { - // Multiple tables, and multiple rows changed per statement. - input: []string{ - "begin", - "insert into stream1 values (2, 'bbb')", - "insert into stream2 values (1, 'aaa')", - "update stream1 set val='ccc'", - "delete from stream1", - "commit", - }, - output: [][]string{{ - `begin`, - `type:FIELD field_event:{table_name:"stream1" fields:{name:"id" type:INT32 table:"stream1" org_table:"stream1" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"stream1" org_table:"stream1" database:"vttest" org_name:"val" column_length:256 charset:63 column_type:"varbinary(256)"}}`, - `type:ROW row_event:{table_name:"stream1" row_changes:{after:{lengths:1 lengths:3 values:"2bbb"}}}`, - `type:FIELD field_event:{table_name:"stream2" fields:{name:"id" type:INT32 table:"stream2" org_table:"stream2" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"stream2" org_table:"stream2" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"}}`, - `type:ROW row_event:{table_name:"stream2" row_changes:{after:{lengths:1 lengths:3 values:"1aaa"}}}`, - `type:ROW row_event:{table_name:"stream1" ` + - `row_changes:{before:{lengths:1 lengths:3 values:"1bbb"} after:{lengths:1 lengths:3 values:"1ccc"}} ` + - `row_changes:{before:{lengths:1 lengths:3 values:"2bbb"} after:{lengths:1 lengths:3 values:"2ccc"}}}`, - `type:ROW row_event:{table_name:"stream1" ` + - `row_changes:{before:{lengths:1 lengths:3 values:"1ccc"}} ` + - `row_changes:{before:{lengths:1 lengths:3 values:"2ccc"}}}`, - `gtid`, - `commit`, + {"insert into stream2 values (1, 'aaa')", nil}, + {"update stream1 set val='ccc'", []TestRowEvent{ + {spec: &TestRowEventSpec{table: "stream1", changes: []TestRowChange{{before: []string{"1", "bbb"}, after: []string{"1", "ccc"}}, {before: []string{"2", "bbb"}, after: []string{"2", "ccc"}}}}}, + }}, + {"delete from stream1", []TestRowEvent{ + {spec: &TestRowEventSpec{table: "stream1", changes: []TestRowChange{{before: []string{"1", "ccc"}}, {before: []string{"2", "ccc"}}}}}, }}, + {"commit", nil}, }, { - // truncate is a DDL - input: "truncate table stream2", - output: [][]string{{ - `gtid`, - `type:DDL statement:"truncate table stream2"`, + {ddlTruncate, []TestRowEvent{ + {event: "gtid"}, + {event: ts.getDDLEvent(ddlTruncate)}, }}, }, { - // Reverse alter table, else FilePos tests fail - input: " /* prefix */ alter table stream1 change column val val varbinary(128) /* suffix */ ", - output: [][]string{{ - `gtid`, - `type:DDL statement:"/* prefix */ alter table stream1 change column val val varbinary(128) /* suffix */"`, + {ddlReverseAlter, []TestRowEvent{ + {event: "gtid"}, + {event: ts.getDDLEvent(ddlReverseAlter)}, }}, }} - runCases(t, nil, testcases, "current", nil) + ts.Run() + + ts.Reset() // Test FilePos flavor savedEngine := engine defer func() { engine = savedEngine }() @@ -919,9 +855,8 @@ func TestStatements(t *testing.T) { in.Flavor = "FilePos" return in }) - defer engine.Close() - runCases(t, nil, testcases, "current", nil) + ts.Run() } // TestOther tests "other" and "priv" statements. These statements can @@ -929,19 +864,15 @@ func TestStatements(t *testing.T) { // mariadb. So, we just show that vreplication transmits "OTHER" events // if the binlog is affected by the statement. func TestOther(t *testing.T) { - if testing.Short() { - t.Skip() + ts := &TestSpec{ + t: t, + ddls: []string{ + "create table stream1(id int, val varbinary(128), primary key(id))", + "create table stream2(id int, val varbinary(128), primary key(id))", + }, } - - execStatements(t, []string{ - "create table stream1(id int, val varbinary(128), primary key(id))", - "create table stream2(id int, val varbinary(128), primary key(id))", - }) - defer execStatements(t, []string{ - "drop table stream1", - "drop table stream2", - }) - engine.se.Reload(context.Background()) + ts.Init() + defer ts.Close() testcases := []string{ "repair table stream2", @@ -995,106 +926,81 @@ func TestOther(t *testing.T) { customRun("filePos") } +// TestRegexp tests a filter which has a regexp suffix. func TestRegexp(t *testing.T) { - if testing.Short() { - t.Skip() + ts := &TestSpec{ + t: t, + ddls: []string{ + "create table yes_stream(id int, val varbinary(128), primary key(id))", + "create table no_stream(id int, val varbinary(128), primary key(id))", + }, + options: &TestSpecOptions{ + filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "/yes.*/", + }}, + }, + }, } + defer ts.Close() - execStatements(t, []string{ - "create table yes_stream(id int, val varbinary(128), primary key(id))", - "create table no_stream(id int, val varbinary(128), primary key(id))", - }) - defer execStatements(t, []string{ - "drop table yes_stream", - "drop table no_stream", - }) - engine.se.Reload(context.Background()) + ts.Init() + + ts.tests = [][]*TestQuery{{ + {"begin", nil}, + {"insert into yes_stream values (1, 'aaa')", nil}, + {"insert into no_stream values (2, 'bbb')", noEvents}, + {"update yes_stream set val='bbb' where id = 1", nil}, + {"update no_stream set val='bbb' where id = 2", noEvents}, + {"commit", nil}, + }} + ts.Run() +} +func TestREKeyRange(t *testing.T) { filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ - Match: "/yes.*/", + Match: "/.*/", + Filter: "-80", }}, } - - testcases := []testcase{{ - input: []string{ - "begin", - "insert into yes_stream values (1, 'aaa')", - "insert into no_stream values (2, 'bbb')", - "update yes_stream set val='bbb' where id = 1", - "update no_stream set val='bbb' where id = 2", - "commit", + ts := &TestSpec{ + t: t, + ddls: []string{ + "create table t1(id1 int, id2 int, val varbinary(128), primary key(id1))", + }, + options: &TestSpecOptions{ + filter: filter, }, - output: [][]string{{ - `begin`, - `type:FIELD field_event:{table_name:"yes_stream" fields:{name:"id" type:INT32 table:"yes_stream" org_table:"yes_stream" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"yes_stream" org_table:"yes_stream" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"}}`, - `type:ROW row_event:{table_name:"yes_stream" row_changes:{after:{lengths:1 lengths:3 values:"1aaa"}}}`, - `type:ROW row_event:{table_name:"yes_stream" row_changes:{before:{lengths:1 lengths:3 values:"1aaa"} after:{lengths:1 lengths:3 values:"1bbb"}}}`, - `gtid`, - `commit`, - }}, - }} - runCases(t, filter, testcases, "", nil) -} - -func TestREKeyRange(t *testing.T) { - if testing.Short() { - t.Skip() } ignoreKeyspaceShardInFieldAndRowEvents = false defer func() { ignoreKeyspaceShardInFieldAndRowEvents = true }() - // Needed for this test to run if run standalone - engine.watcherOnce.Do(engine.setWatch) - - execStatements(t, []string{ - "create table t1(id1 int, id2 int, val varbinary(128), primary key(id1))", - }) - defer execStatements(t, []string{ - "drop table t1", - }) - engine.se.Reload(context.Background()) + ts.Init() + defer ts.Close() setVSchema(t, shardedVSchema) defer env.SetVSchema("{}") - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - filter := &binlogdatapb.Filter{ - Rules: []*binlogdatapb.Rule{{ - Match: "/.*/", - Filter: "-80", - }}, - } - wg, ch := startStream(ctx, t, filter, "", nil) - defer wg.Wait() // 1, 2, 3 and 5 are in shard -80. // 4 and 6 are in shard 80-. - input := []string{ - "begin", - "insert into t1 values (1, 4, 'aaa')", - "insert into t1 values (4, 1, 'bbb')", - // Stay in shard. - "update t1 set id1 = 2 where id1 = 1", - // Move from -80 to 80-. - "update t1 set id1 = 6 where id1 = 2", - // Move from 80- to -80. - "update t1 set id1 = 3 where id1 = 4", - "commit", - } - execStatements(t, input) - expectLog(ctx, t, input, ch, [][]string{{ - `begin`, - `type:FIELD field_event:{table_name:"t1" fields:{name:"id1" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id1" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"id2" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id2" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} keyspace:"vttest" shard:"0"}`, - `type:ROW row_event:{table_name:"t1" row_changes:{after:{lengths:1 lengths:1 lengths:3 values:"14aaa"}} keyspace:"vttest" shard:"0"}`, - `type:ROW row_event:{table_name:"t1" row_changes:{before:{lengths:1 lengths:1 lengths:3 values:"14aaa"} after:{lengths:1 lengths:1 lengths:3 values:"24aaa"}} keyspace:"vttest" shard:"0"}`, - `type:ROW row_event:{table_name:"t1" row_changes:{before:{lengths:1 lengths:1 lengths:3 values:"24aaa"}} keyspace:"vttest" shard:"0"}`, - `type:ROW row_event:{table_name:"t1" row_changes:{after:{lengths:1 lengths:1 lengths:3 values:"31bbb"}} keyspace:"vttest" shard:"0"}`, - `gtid`, - `commit`, - }}) + ts.tests = [][]*TestQuery{{ + {"begin", nil}, + {"insert into t1 values (1, 1, 'aaa')", nil}, + {"insert into t1 values (4, 1, 'bbb')", noEvents}, + {"update t1 set id1 = 2 where id1 = 1", []TestRowEvent{ // Stays in shard. + {spec: &TestRowEventSpec{table: "t1", changes: []TestRowChange{{before: []string{"1", "1", "aaa"}, after: []string{"2", "1", "aaa"}}}}}, + }}, + {"update t1 set id1 = 6 where id1 = 2", []TestRowEvent{ // Moves from -80 to 80-. + {spec: &TestRowEventSpec{table: "t1", changes: []TestRowChange{{before: []string{"2", "1", "aaa"}}}}}, + }}, + {"update t1 set id1 = 3 where id1 = 4", []TestRowEvent{ // Moves from 80- back to -80. + {spec: &TestRowEventSpec{table: "t1", changes: []TestRowChange{{after: []string{"3", "1", "bbb"}}}}}, + }}, + {"commit", nil}, + }} + ts.Run() // Switch the vschema to make id2 the primary vindex. altVSchema := `{ @@ -1116,215 +1022,159 @@ func TestREKeyRange(t *testing.T) { } }` setVSchema(t, altVSchema) - + engine.se.Reload(context.Background()) + ts.Reset() // Only the first insert should be sent. - input = []string{ - "begin", - "insert into t1 values (4, 1, 'aaa')", - "insert into t1 values (1, 4, 'aaa')", - "commit", - } - execStatements(t, input) - expectLog(ctx, t, input, ch, [][]string{{ - `begin`, - `type:ROW row_event:{table_name:"t1" row_changes:{after:{lengths:1 lengths:1 lengths:3 values:"41aaa"}} keyspace:"vttest" shard:"0"}`, - `gtid`, - `commit`, - }}) - cancel() + ts.tests = [][]*TestQuery{{ + {"begin", nil}, + {"insert into t1 values (4, 1, 'aaa')", nil}, + {"insert into t1 values (1, 4, 'aaa')", noEvents}, + {"commit", nil}, + }} + ts.Init() + ts.Run() } func TestInKeyRangeMultiColumn(t *testing.T) { - if testing.Short() { - t.Skip() - } - engine.watcherOnce.Do(engine.setWatch) - engine.se.Reload(context.Background()) - - execStatements(t, []string{ - "create table t1(region int, id int, val varbinary(128), primary key(id))", - }) - defer execStatements(t, []string{ - "drop table t1", - }) - engine.se.Reload(context.Background()) - - setVSchema(t, multicolumnVSchema) - defer env.SetVSchema("{}") - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ Match: "t1", Filter: "select id, region, val, keyspace_id() from t1 where in_keyrange('-80')", }}, } - wg, ch := startStream(ctx, t, filter, "", nil) - defer wg.Wait() - - // 1, 2, 3 and 5 are in shard -80. - // 4 and 6 are in shard 80-. - input := []string{ - "begin", - "insert into t1 values (1, 1, 'aaa')", - "insert into t1 values (128, 2, 'bbb')", - // Stay in shard. - "update t1 set region = 2 where id = 1", - // Move from -80 to 80-. - "update t1 set region = 128 where id = 1", - // Move from 80- to -80. - "update t1 set region = 1 where id = 2", - "commit", - } - execStatements(t, input) - expectLog(ctx, t, input, ch, [][]string{{ - `begin`, - `type:FIELD field_event:{table_name:"t1" fields:{name:"id" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"region" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"region" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} fields:{name:"keyspace_id" type:VARBINARY charset:63}}`, - `type:ROW row_event:{table_name:"t1" row_changes:{after:{lengths:1 lengths:1 lengths:3 lengths:9 values:"11aaa\x01\x16k@\xb4J\xbaK\xd6"}}}`, - `type:ROW row_event:{table_name:"t1" row_changes:{before:{lengths:1 lengths:1 lengths:3 lengths:9 values:"11aaa\x01\x16k@\xb4J\xbaK\xd6"} ` + - `after:{lengths:1 lengths:1 lengths:3 lengths:9 values:"12aaa\x02\x16k@\xb4J\xbaK\xd6"}}}`, - `type:ROW row_event:{table_name:"t1" row_changes:{before:{lengths:1 lengths:1 lengths:3 lengths:9 values:"12aaa\x02\x16k@\xb4J\xbaK\xd6"}}}`, - `type:ROW row_event:{table_name:"t1" row_changes:{after:{lengths:1 lengths:1 lengths:3 lengths:9 values:"21bbb\x01\x06\xe7\xea\"Βp\x8f"}}}`, - `gtid`, - `commit`, - }}) - cancel() -} - -func TestREMultiColumnVindex(t *testing.T) { - if testing.Short() { - t.Skip() + ts := &TestSpec{ + t: t, + ddls: []string{ + "create table t1(region int, id int, val varbinary(128), primary key(id))", + }, + options: &TestSpecOptions{ + filter: filter, + }, } - engine.watcherOnce.Do(engine.setWatch) - - execStatements(t, []string{ - "create table t1(region int, id int, val varbinary(128), primary key(id))", - }) - defer execStatements(t, []string{ - "drop table t1", - }) - engine.se.Reload(context.Background()) + ts.Init() + defer ts.Close() setVSchema(t, multicolumnVSchema) defer env.SetVSchema("{}") - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + fe := &TestFieldEvent{ + table: "t1", + db: testenv.DBName, + cols: []*TestColumn{ + {name: "id", dataType: "INT32", colType: "int(11)", len: 11, collationID: 63}, + {name: "region", dataType: "INT32", colType: "int(11)", len: 11, collationID: 63}, + {name: "val", dataType: "VARBINARY", colType: "varbinary(128)", len: 128, collationID: 63}, + {name: "keyspace_id", dataType: "VARBINARY", colType: "varbinary(256)", len: 256, collationID: 63}, + }, + } + + // 1 and 2 are in shard -80. + // 128 is in shard 80-. + keyspaceId1 := "\x01\x16k@\xb4J\xbaK\xd6" + keyspaceId2 := "\x02\x16k@\xb4J\xbaK\xd6" + keyspaceId3 := "\x01\x06\xe7\xea\"Βp\x8f" + ts.tests = [][]*TestQuery{{ + {"begin", nil}, + {"insert into t1 values (1, 1, 'aaa')", []TestRowEvent{ + {event: fe.String()}, + {spec: &TestRowEventSpec{table: "t1", changes: []TestRowChange{{after: []string{"1", "1", "aaa", keyspaceId1}}}}}, + }}, + {"insert into t1 values (128, 2, 'bbb')", noEvents}, + {"update t1 set region = 2 where id = 1", []TestRowEvent{ + {spec: &TestRowEventSpec{table: "t1", changes: []TestRowChange{{before: []string{"1", "1", "aaa", keyspaceId1}, after: []string{"1", "2", "aaa", keyspaceId2}}}}}, + }}, + {"update t1 set region = 128 where id = 1", []TestRowEvent{ + {spec: &TestRowEventSpec{table: "t1", changes: []TestRowChange{{before: []string{"1", "2", "aaa", keyspaceId2}}}}}, + }}, + {"update t1 set region = 1 where id = 2", []TestRowEvent{ + {spec: &TestRowEventSpec{table: "t1", changes: []TestRowChange{{after: []string{"2", "1", "bbb", keyspaceId3}}}}}, + }}, + {"commit", nil}, + }} + ts.Run() +} +func TestREMultiColumnVindex(t *testing.T) { filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ Match: "/.*/", Filter: "-80", }}, } - wg, ch := startStream(ctx, t, filter, "", nil) - defer wg.Wait() - - // 1, 2, 3 and 5 are in shard -80. - // 4 and 6 are in shard 80-. - input := []string{ - "begin", - "insert into t1 values (1, 1, 'aaa')", - "insert into t1 values (128, 2, 'bbb')", - // Stay in shard. - "update t1 set region = 2 where id = 1", - // Move from -80 to 80-. - "update t1 set region = 128 where id = 1", - // Move from 80- to -80. - "update t1 set region = 1 where id = 2", - "commit", + ts := &TestSpec{ + t: t, + ddls: []string{ + "create table t1(region int, id int, val varbinary(128), primary key(id))", + }, + options: &TestSpecOptions{ + filter: filter, + }, } - execStatements(t, input) - expectLog(ctx, t, input, ch, [][]string{{ - `begin`, - `type:FIELD field_event:{table_name:"t1" fields:{name:"region" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"region" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"id" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"}}`, - `type:ROW row_event:{table_name:"t1" row_changes:{after:{lengths:1 lengths:1 lengths:3 values:"11aaa"}}}`, - `type:ROW row_event:{table_name:"t1" row_changes:{before:{lengths:1 lengths:1 lengths:3 values:"11aaa"} after:{lengths:1 lengths:1 lengths:3 values:"21aaa"}}}`, - `type:ROW row_event:{table_name:"t1" row_changes:{before:{lengths:1 lengths:1 lengths:3 values:"21aaa"}}}`, - `type:ROW row_event:{table_name:"t1" row_changes:{after:{lengths:1 lengths:1 lengths:3 values:"12bbb"}}}`, - `gtid`, - `commit`, - }}) - cancel() + ts.Init() + defer ts.Close() + + setVSchema(t, multicolumnVSchema) + defer env.SetVSchema("{}") + // (region, id) is the primary vindex. + // (1,1), (1, 2) are in shard -80. + // (128, 1) (128, 2) are in shard 80-. + ts.tests = [][]*TestQuery{{ + {"begin", nil}, + {"insert into t1 values (1, 1, 'aaa')", nil}, + {"insert into t1 values (128, 2, 'bbb')", noEvents}, + {"update t1 set region = 2 where id = 1", nil}, + {"update t1 set region = 128 where id = 1", []TestRowEvent{ + {spec: &TestRowEventSpec{table: "t1", changes: []TestRowChange{{before: []string{"2", "1", "aaa"}}}}}, + }}, + {"update t1 set region = 1 where id = 2", []TestRowEvent{ + {spec: &TestRowEventSpec{table: "t1", changes: []TestRowChange{{after: []string{"1", "2", "bbb"}}}}}, + }}, + {"commit", nil}, + }} + ts.Run() } +// TestSelectFilter tests a filter with an in_keyrange function, used in a sharded keyspace. func TestSelectFilter(t *testing.T) { - if testing.Short() { - t.Skip() + fe := &TestFieldEvent{ + table: "t1", + db: testenv.DBName, + cols: []*TestColumn{ + {name: "id2", dataType: "INT32", colType: "int(11)", len: 11, collationID: 63}, + {name: "val", dataType: "VARBINARY", colType: "varbinary(128)", len: 128, collationID: 63}, + }, } - engine.se.Reload(context.Background()) - - execStatements(t, []string{ - "create table t1(id1 int, id2 int, val varbinary(128), primary key(id1))", - }) - defer execStatements(t, []string{ - "drop table t1", - }) - engine.se.Reload(context.Background()) - - filter := &binlogdatapb.Filter{ - Rules: []*binlogdatapb.Rule{{ - Match: "t1", - Filter: "select id2, val from t1 where in_keyrange(id2, 'hash', '-80')", - }}, + ts := &TestSpec{ + t: t, + ddls: []string{ + "create table t1(id1 int, id2 int, val varbinary(128), primary key(id1))", + }, + options: &TestSpecOptions{ + filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "select id2, val from t1 where in_keyrange(id2, 'hash', '-80')", + }}, + }, + }, } + defer ts.Close() - testcases := []testcase{{ - input: []string{ - "begin", - "insert into t1 values (4, 1, 'aaa')", - "insert into t1 values (2, 4, 'aaa')", - "commit", - }, - output: [][]string{{ - `begin`, - `type:FIELD field_event:{table_name:"t1" fields:{name:"id2" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id2" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"}}`, - `type:ROW row_event:{table_name:"t1" row_changes:{after:{lengths:1 lengths:3 values:"1aaa"}}}`, - `gtid`, - `commit`, + ts.Init() + + ts.tests = [][]*TestQuery{{ + {"begin", nil}, + {"insert into t1 values (4, 1, 'aaa')", []TestRowEvent{ + {event: fe.String()}, + {spec: &TestRowEventSpec{table: "t1", changes: []TestRowChange{{after: []string{"1", "aaa"}}}}}, }}, + {"insert into t1 values (2, 4, 'aaa')", noEvents}, // not in keyrange + {"commit", nil}, }} - runCases(t, filter, testcases, "", nil) + ts.Run() } func TestDDLAddColumn(t *testing.T) { - if testing.Short() { - t.Skip() - } - - execStatements(t, []string{ - "create table ddl_test1(id int, val1 varbinary(128), primary key(id))", - "create table ddl_test2(id int, val1 varbinary(128), primary key(id))", - }) - defer execStatements(t, []string{ - "drop table ddl_test1", - "drop table ddl_test2", - }) - - // Record position before the next few statements. - pos := primaryPosition(t) - execStatements(t, []string{ - "begin", - "insert into ddl_test1 values(1, 'aaa')", - "insert into ddl_test2 values(1, 'aaa')", - "commit", - // Adding columns is allowed. - "alter table ddl_test1 add column val2 varbinary(128)", - "alter table ddl_test2 add column val2 varbinary(128)", - "begin", - "insert into ddl_test1 values(2, 'bbb', 'ccc')", - "insert into ddl_test2 values(2, 'bbb', 'ccc')", - "commit", - }) - engine.se.Reload(context.Background()) - env.SchemaEngine.Reload(context.Background()) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // Test RE as well as select-based filters. filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ Match: "ddl_test2", @@ -1334,45 +1184,74 @@ func TestDDLAddColumn(t *testing.T) { }}, } - ch := make(chan []*binlogdatapb.VEvent) - go func() { - defer close(ch) - if err := vstream(ctx, t, pos, nil, filter, ch); err != nil { - t.Error(err) - } - }() - expectLog(ctx, t, "ddls", ch, [][]string{{ - // Current schema has 3 columns, but they'll be truncated to match the two columns in the event. - `begin`, - `type:FIELD field_event:{table_name:"ddl_test1" fields:{name:"id" type:INT32 table:"ddl_test1" org_table:"ddl_test1" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val1" type:VARBINARY table:"ddl_test1" org_table:"ddl_test1" database:"vttest" org_name:"val1" column_length:128 charset:63 column_type:"varbinary(128)"}}`, - `type:ROW row_event:{table_name:"ddl_test1" row_changes:{after:{lengths:1 lengths:3 values:"1aaa"}}}`, - `type:FIELD field_event:{table_name:"ddl_test2" fields:{name:"id" type:INT32 table:"ddl_test2" org_table:"ddl_test2" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val1" type:VARBINARY table:"ddl_test2" org_table:"ddl_test2" database:"vttest" org_name:"val1" column_length:128 charset:63 column_type:"varbinary(128)"}}`, - `type:ROW row_event:{table_name:"ddl_test2" row_changes:{after:{lengths:1 lengths:3 values:"1aaa"}}}`, - `gtid`, - `commit`, + ts := &TestSpec{ + t: t, + ddls: []string{ + "create table ddl_test1(id int, val1 varbinary(128), primary key(id))", + "create table ddl_test2(id int, val1 varbinary(128), primary key(id))", + }, + options: &TestSpecOptions{ + // Test RE as well as select-based filters. + filter: filter, + }, + } + defer ts.Close() + // Record position before the next few statements. + ts.Init() + pos := primaryPosition(t) + ts.SetStartPosition(pos) + alterTest1 := "alter table ddl_test1 add column val2 varbinary(128)" + alterTest2 := "alter table ddl_test2 add column val2 varbinary(128)" + fe1 := &TestFieldEvent{ + table: "ddl_test1", + db: testenv.DBName, + cols: []*TestColumn{ + {name: "id", dataType: "INT32", colType: "int(11)", len: 11, collationID: 63}, + {name: "val1", dataType: "VARBINARY", colType: "varbinary(128)", len: 128, collationID: 63}, + {name: "val2", dataType: "VARBINARY", colType: "varbinary(128)", len: 128, collationID: 63}, + }, + } + fe2 := &TestFieldEvent{ + table: "ddl_test2", + db: testenv.DBName, + cols: []*TestColumn{ + {name: "id", dataType: "INT32", colType: "int(11)", len: 11, collationID: 63}, + {name: "val1", dataType: "VARBINARY", colType: "varbinary(128)", len: 128, collationID: 63}, + {name: "val2", dataType: "VARBINARY", colType: "varbinary(128)", len: 128, collationID: 63}, + }, + } + ts.tests = [][]*TestQuery{{ + {"begin", nil}, + {"insert into ddl_test1 values(1, 'aaa')", nil}, + {"insert into ddl_test2 values(1, 'aaa')", nil}, + {"commit", nil}, }, { - `gtid`, - `type:DDL statement:"alter table ddl_test1 add column val2 varbinary(128)"`, + // Adding columns is allowed. + {alterTest1, []TestRowEvent{ + {event: "gtid"}, + {event: ts.getDDLEvent(alterTest1)}, + }}, }, { - `gtid`, - `type:DDL statement:"alter table ddl_test2 add column val2 varbinary(128)"`, + {alterTest2, []TestRowEvent{ + {event: "gtid"}, + {event: ts.getDDLEvent(alterTest2)}, + }}, }, { - // The plan will be updated to now include the third column - // because the new table map will have three columns. - `begin`, - `type:FIELD field_event:{table_name:"ddl_test1" fields:{name:"id" type:INT32 table:"ddl_test1" org_table:"ddl_test1" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val1" type:VARBINARY table:"ddl_test1" org_table:"ddl_test1" database:"vttest" org_name:"val1" column_length:128 charset:63 column_type:"varbinary(128)"} fields:{name:"val2" type:VARBINARY table:"ddl_test1" org_table:"ddl_test1" database:"vttest" org_name:"val2" column_length:128 charset:63 column_type:"varbinary(128)"}}`, - `type:ROW row_event:{table_name:"ddl_test1" row_changes:{after:{lengths:1 lengths:3 lengths:3 values:"2bbbccc"}}}`, - `type:FIELD field_event:{table_name:"ddl_test2" fields:{name:"id" type:INT32 table:"ddl_test2" org_table:"ddl_test2" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val1" type:VARBINARY table:"ddl_test2" org_table:"ddl_test2" database:"vttest" org_name:"val1" column_length:128 charset:63 column_type:"varbinary(128)"} fields:{name:"val2" type:VARBINARY table:"ddl_test2" org_table:"ddl_test2" database:"vttest" org_name:"val2" column_length:128 charset:63 column_type:"varbinary(128)"}}`, - `type:ROW row_event:{table_name:"ddl_test2" row_changes:{after:{lengths:1 lengths:3 lengths:3 values:"2bbbccc"}}}`, - `gtid`, - `commit`, - }}) + {"begin", nil}, + {"insert into ddl_test1 values(2, 'bbb', 'ccc')", []TestRowEvent{ + {event: fe1.String()}, + {spec: &TestRowEventSpec{table: "ddl_test1", changes: []TestRowChange{{after: []string{"2", "bbb", "ccc"}}}}}, + }}, + {"insert into ddl_test2 values(2, 'bbb', 'ccc')", []TestRowEvent{ + {event: fe2.String()}, + {spec: &TestRowEventSpec{table: "ddl_test2", changes: []TestRowChange{{after: []string{"2", "bbb", "ccc"}}}}}, + }}, + {"commit", nil}, + }} + ts.Run() } func TestDDLDropColumn(t *testing.T) { - if testing.Short() { - t.Skip() - } env.SchemaEngine.Reload(context.Background()) execStatement(t, "create table ddl_test2(id int, val1 varbinary(128), val2 varbinary(128), primary key(id))") defer execStatement(t, "drop table ddl_test2") @@ -1405,10 +1284,6 @@ func TestDDLDropColumn(t *testing.T) { } func TestUnsentDDL(t *testing.T) { - if testing.Short() { - t.Skip() - } - execStatement(t, "create table unsent(id int, val varbinary(128), primary key(id))") testcases := []testcase{{ @@ -1431,102 +1306,70 @@ func TestUnsentDDL(t *testing.T) { } func TestBuffering(t *testing.T) { - if testing.Short() { - t.Skip() - } - reset := AdjustPacketSize(10) defer reset() - execStatement(t, "create table packet_test(id int, val varbinary(128), primary key(id))") - defer execStatement(t, "drop table packet_test") - engine.se.Reload(context.Background()) - - testcases := []testcase{{ - // All rows in one packet. - input: []string{ - "begin", - "insert into packet_test values (1, '123')", - "insert into packet_test values (2, '456')", - "commit", + ts := &TestSpec{ + t: t, + ddls: []string{ + "create table packet_test(id int, val varbinary(128), primary key(id))", }, - output: [][]string{{ - `begin`, - `type:FIELD field_event:{table_name:"packet_test" fields:{name:"id" type:INT32 table:"packet_test" org_table:"packet_test" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"packet_test" org_table:"packet_test" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"}}`, - `type:ROW row_event:{table_name:"packet_test" row_changes:{after:{lengths:1 lengths:3 values:"1123"}}}`, - `type:ROW row_event:{table_name:"packet_test" row_changes:{after:{lengths:1 lengths:3 values:"2456"}}}`, - `gtid`, - `commit`, - }}, + } + defer ts.Close() + ts.Init() + ddl := "alter table packet_test change val val varchar(128)" + ts.tests = [][]*TestQuery{{ + // All rows in one packet. + {"begin", nil}, + {"insert into packet_test values (1, '123')", nil}, + {"insert into packet_test values (2, '456')", nil}, + {"commit", nil}, }, { // A new row causes packet size to be exceeded. // Also test deletes - input: []string{ - "begin", - "insert into packet_test values (3, '123456')", - "insert into packet_test values (4, '789012')", - "delete from packet_test where id=3", - "delete from packet_test where id=4", - "commit", - }, - output: [][]string{{ - `begin`, - `type:ROW row_event:{table_name:"packet_test" row_changes:{after:{lengths:1 lengths:6 values:"3123456"}}}`, - }, { - `type:ROW row_event:{table_name:"packet_test" row_changes:{after:{lengths:1 lengths:6 values:"4789012"}}}`, - }, { - `type:ROW row_event:{table_name:"packet_test" row_changes:{before:{lengths:1 lengths:6 values:"3123456"}}}`, - }, { - `type:ROW row_event:{table_name:"packet_test" row_changes:{before:{lengths:1 lengths:6 values:"4789012"}}}`, - `gtid`, - `commit`, + {"begin", nil}, + {"insert into packet_test values (3, '123456')", []TestRowEvent{ + {spec: &TestRowEventSpec{table: "packet_test", changes: []TestRowChange{{after: []string{"3", "123456"}}}}}, + }}, + {"insert into packet_test values (4, '789012')", []TestRowEvent{ + {restart: true, spec: &TestRowEventSpec{table: "packet_test", changes: []TestRowChange{{after: []string{"4", "789012"}}}}}, + }}, + {"delete from packet_test where id=3", []TestRowEvent{ + {restart: true, spec: &TestRowEventSpec{table: "packet_test", changes: []TestRowChange{{before: []string{"3", "123456"}}}}}, }}, + {"delete from packet_test where id=4", []TestRowEvent{ + {restart: true, spec: &TestRowEventSpec{table: "packet_test", changes: []TestRowChange{{before: []string{"4", "789012"}}}}}, + }}, + {"commit", nil}, }, { // A single row is itself bigger than the packet size. - input: []string{ - "begin", - "insert into packet_test values (5, '123456')", - "insert into packet_test values (6, '12345678901')", - "insert into packet_test values (7, '23456')", - "commit", - }, - output: [][]string{{ - `begin`, - `type:ROW row_event:{table_name:"packet_test" row_changes:{after:{lengths:1 lengths:6 values:"5123456"}}}`, - }, { - `type:ROW row_event:{table_name:"packet_test" row_changes:{after:{lengths:1 lengths:11 values:"612345678901"}}}`, - }, { - `type:ROW row_event:{table_name:"packet_test" row_changes:{after:{lengths:1 lengths:5 values:"723456"}}}`, - `gtid`, - `commit`, + {"begin", nil}, + {"insert into packet_test values (5, '123456')", []TestRowEvent{ + {spec: &TestRowEventSpec{table: "packet_test", changes: []TestRowChange{{after: []string{"5", "123456"}}}}}, + }}, + {"insert into packet_test values (6, '12345678901')", []TestRowEvent{ + {restart: true, spec: &TestRowEventSpec{table: "packet_test", changes: []TestRowChange{{after: []string{"6", "12345678901"}}}}}, + }}, + {"insert into packet_test values (7, '23456')", []TestRowEvent{ + {restart: true, spec: &TestRowEventSpec{table: "packet_test", changes: []TestRowChange{{after: []string{"7", "23456"}}}}}, }}, + {"commit", nil}, }, { // An update packet is bigger because it has a before and after image. - input: []string{ - "begin", - "insert into packet_test values (8, '123')", - "update packet_test set val='456' where id=8", - "commit", - }, - output: [][]string{{ - `begin`, - `type:ROW row_event:{table_name:"packet_test" row_changes:{after:{lengths:1 lengths:3 values:"8123"}}}`, - }, { - `type:ROW row_event:{table_name:"packet_test" row_changes:{before:{lengths:1 lengths:3 values:"8123"} after:{lengths:1 lengths:3 values:"8456"}}}`, - `gtid`, - `commit`, + {"begin", nil}, + {"insert into packet_test values (8, '123')", nil}, + {"update packet_test set val='456' where id=8", []TestRowEvent{ + {restart: true, spec: &TestRowEventSpec{table: "packet_test", changes: []TestRowChange{{before: []string{"8", "123"}, after: []string{"8", "456"}}}}}, }}, + {"commit", nil}, }, { - // DDL is in its own packet - input: []string{ - "alter table packet_test change val val varchar(128)", - }, - output: [][]string{{ - `gtid`, - `type:DDL statement:"alter table packet_test change val val varchar(128)"`, + // DDL is in its own packet. + {ddl, []TestRowEvent{ + {event: "gtid"}, + {event: ts.getDDLEvent(ddl)}, }}, }} - runCases(t, nil, testcases, "", nil) + ts.Run() } // TestBestEffortNameInFieldEvent tests that we make a valid best effort @@ -1535,10 +1378,8 @@ func TestBuffering(t *testing.T) { // collation information, however, in the binlog_row_metadata in 8.0 but // not in 5.7. So in 5.7 our best effort uses varchar with its default // collation for text fields. +// todo: migrate to new framework func TestBestEffortNameInFieldEvent(t *testing.T) { - if testing.Short() { - t.Skip() - } bestEffortCollation := collations.ID(collations.CollationBinaryID) if strings.HasPrefix(testenv.MySQLVersion, "5.7") { bestEffortCollation = testenv.DefaultCollationID @@ -1589,14 +1430,12 @@ func TestBestEffortNameInFieldEvent(t *testing.T) { runCases(t, filter, testcases, position, nil) } +// todo: migrate to new framework // test that vstreamer ignores tables created by OnlineDDL func TestInternalTables(t *testing.T) { if version.GoOS == "darwin" { t.Skip("internal online ddl table matching doesn't work on Mac because it is case insensitive") } - if testing.Short() { - t.Skip() - } filter := &binlogdatapb.Filter{ FieldEventMode: binlogdatapb.Filter_BEST_EFFORT, Rules: []*binlogdatapb.Rule{{ @@ -1648,11 +1487,8 @@ func TestInternalTables(t *testing.T) { runCases(t, filter, testcases, position, nil) } +// todo: migrate to new framework func TestTypes(t *testing.T) { - if testing.Short() { - t.Skip() - } - // Modeled after vttablet endtoend compatibility tests. execStatements(t, []string{ "create table vitess_ints(tiny tinyint, tinyu tinyint unsigned, small smallint, smallu smallint unsigned, medium mediumint, mediumu mediumint unsigned, normal int, normalu int unsigned, big bigint, bigu bigint unsigned, y year, primary key(tiny)) ENGINE=InnoDB CHARSET=utf8mb4", @@ -1775,6 +1611,7 @@ func TestTypes(t *testing.T) { runCases(t, nil, testcases, "", nil) } +// todo: migrate to new framework func TestJSON(t *testing.T) { if err := env.Mysqld.ExecuteSuperQuery(context.Background(), "create table vitess_json(id int default 1, val json, primary key(id))"); err != nil { // If it's a syntax error, MySQL is an older version. Skip this test. @@ -1817,11 +1654,8 @@ func TestJSON(t *testing.T) { runCases(t, nil, testcases, "", nil) } +// todo: migrate to new framework func TestExternalTable(t *testing.T) { - if testing.Short() { - t.Skip() - } - execStatements(t, []string{ "create database external", "create table external.ext(id int, val varbinary(128), primary key(id))", @@ -1847,11 +1681,8 @@ func TestExternalTable(t *testing.T) { runCases(t, nil, testcases, "", nil) } +// todo: migrate to new framework func TestJournal(t *testing.T) { - if testing.Short() { - t.Skip() - } - execStatements(t, []string{ "create table if not exists _vt.resharding_journal(id int, db_name varchar(128), val blob, primary key(id))", }) @@ -1886,12 +1717,9 @@ func TestJournal(t *testing.T) { runCases(t, nil, testcases, "", nil) } +// todo: migrate to new framework // TestMinimalMode confirms that we don't support minimal binlog_row_image mode. func TestMinimalMode(t *testing.T) { - if testing.Short() { - t.Skip() - } - ctx, cancel := context.WithCancel(context.Background()) defer cancel() oldEngine := engine @@ -1913,10 +1741,8 @@ func TestMinimalMode(t *testing.T) { require.Error(t, err, "minimal binlog_row_image is not supported by Vitess VReplication") } +// todo: migrate to new framework func TestStatementMode(t *testing.T) { - if testing.Short() { - t.Skip() - } execStatements(t, []string{ "create table stream1(id int, val varbinary(128), primary key(id))", "create table stream2(id int, val varbinary(128), primary key(id))", @@ -1951,11 +1777,8 @@ func TestStatementMode(t *testing.T) { runCases(t, nil, testcases, "", nil) } +// todo: migrate to new framework func TestHeartbeat(t *testing.T) { - if testing.Short() { - t.Skip() - } - ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -1967,11 +1790,8 @@ func TestHeartbeat(t *testing.T) { cancel() } +// todo: migrate to new framework func TestNoFutureGTID(t *testing.T) { - if testing.Short() { - t.Skip() - } - // Execute something to make sure we have ranges in GTIDs. execStatements(t, []string{ "create table stream1(id int, val varbinary(128), primary key(id))", @@ -2008,70 +1828,73 @@ func TestNoFutureGTID(t *testing.T) { } func TestFilteredMultipleWhere(t *testing.T) { - if testing.Short() { - t.Skip() + fe := &TestFieldEvent{ + table: "t1", + db: testenv.DBName, + cols: []*TestColumn{ + {name: "id1", dataType: "INT32", colType: "int(11)", len: 11, collationID: 63}, + {name: "val", dataType: "VARBINARY", colType: "varbinary(128)", len: 128, collationID: 63}, + }, } + ts := &TestSpec{ + t: t, + ddls: []string{ + "create table t1(id1 int, id2 int, id3 int, val varbinary(128), primary key(id1))", + }, + options: &TestSpecOptions{ + filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "select id1, val from t1 where in_keyrange('-80') and id2 = 200 and id3 = 1000 and val = 'newton'", + }}, + }, + customFieldEvents: true, + }, + } + _ = fe + defer ts.Close() // Ensure clean-up - execStatements(t, []string{ - "create table t1(id1 int, id2 int, id3 int, val varbinary(128), primary key(id1))", - }) - defer execStatements(t, []string{ - "drop table t1", - }) - engine.se.Reload(context.Background()) + ts.Init() setVSchema(t, shardedVSchema) defer env.SetVSchema("{}") - filter := &binlogdatapb.Filter{ - Rules: []*binlogdatapb.Rule{{ - Match: "t1", - Filter: "select id1, val from t1 where in_keyrange('-80') and id2 = 200 and id3 = 1000 and val = 'newton'", + ts.tests = [][]*TestQuery{{ + {"begin", nil}, + {"insert into t1 values (1, 100, 1000, 'kepler')", noEvents}, + {"insert into t1 values (2, 200, 1000, 'newton')", []TestRowEvent{ + {event: fe.String()}, + {spec: &TestRowEventSpec{table: "t1", changes: []TestRowChange{{after: []string{"2", "newton"}}}}}, }}, - } - - testcases := []testcase{{ - input: []string{ - "begin", - "insert into t1 values (1, 100, 1000, 'kepler')", - "insert into t1 values (2, 200, 1000, 'newton')", - "insert into t1 values (3, 100, 2000, 'kepler')", - "insert into t1 values (128, 200, 1000, 'newton')", - "insert into t1 values (5, 200, 2000, 'kepler')", - "insert into t1 values (129, 200, 1000, 'kepler')", - "commit", - }, - output: [][]string{{ - `begin`, - `type:FIELD field_event:{table_name:"t1" fields:{name:"id1" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id1" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"}}`, - `type:ROW row_event:{table_name:"t1" row_changes:{after:{lengths:1 lengths:6 values:"2newton"}}}`, - `type:ROW row_event:{table_name:"t1" row_changes:{after:{lengths:3 lengths:6 values:"128newton"}}}`, - `gtid`, - `commit`, + {"insert into t1 values (3, 100, 2000, 'kepler')", noEvents}, + {"insert into t1 values (128, 200, 1000, 'newton')", []TestRowEvent{ + {spec: &TestRowEventSpec{table: "t1", changes: []TestRowChange{{after: []string{"128", "newton"}}}}}, }}, + {"insert into t1 values (5, 200, 2000, 'kepler')", noEvents}, + {"insert into t1 values (129, 200, 1000, 'kepler')", noEvents}, + {"commit", nil}, }} - runCases(t, filter, testcases, "", nil) + ts.Run() } // TestGeneratedColumns just confirms that generated columns are sent in a vstream as expected func TestGeneratedColumns(t *testing.T) { - execStatements(t, []string{ - "create table t1(id int, val varbinary(6), val2 varbinary(6) as (concat(id, val)), val3 varbinary(6) as (concat(val, id)), id2 int, primary key(id))", - }) - defer execStatements(t, []string{ - "drop table t1", - }) - engine.se.Reload(context.Background()) - queries := []string{ - "begin", - "insert into t1(id, val, id2) values (1, 'aaa', 10)", - "insert into t1(id, val, id2) values (2, 'bbb', 20)", - "commit", + ts := &TestSpec{ + t: t, + ddls: []string{ + "create table t1(id int, val varbinary(6), val2 varbinary(6) as (concat(id, val)), val3 varbinary(6) as (concat(val, id)), id2 int, primary key(id))", + }, + options: &TestSpecOptions{ + customFieldEvents: true, + }, } + defer ts.Close() + + ts.Init() fe := &TestFieldEvent{ table: "t1", - db: "vttest", + db: testenv.DBName, cols: []*TestColumn{ {name: "id", dataType: "INT32", colType: "int(11)", len: 11, collationID: 63}, {name: "val", dataType: "VARBINARY", colType: "varbinary(6)", len: 6, collationID: 63}, @@ -2081,18 +1904,18 @@ func TestGeneratedColumns(t *testing.T) { }, } - testcases := []testcase{{ - input: queries, - output: [][]string{{ - `begin`, - fe.String(), - `type:ROW row_event:{table_name:"t1" row_changes:{after:{lengths:1 lengths:3 lengths:4 lengths:4 lengths:2 values:"1aaa1aaaaaa110"}}}`, - `type:ROW row_event:{table_name:"t1" row_changes:{after:{lengths:1 lengths:3 lengths:4 lengths:4 lengths:2 values:"2bbb2bbbbbb220"}}}`, - `gtid`, - `commit`, + ts.tests = [][]*TestQuery{{ + {"begin", nil}, + {"insert into t1(id, val, id2) values (1, 'aaa', 10)", []TestRowEvent{ + {event: fe.String()}, + {spec: &TestRowEventSpec{table: "t1", changes: []TestRowChange{{after: []string{"1", "aaa", "1aaa", "aaa1", "10"}}}}}, + }}, + {"insert into t1(id, val, id2) values (2, 'bbb', 20)", []TestRowEvent{ + {spec: &TestRowEventSpec{table: "t1", changes: []TestRowChange{{after: []string{"2", "bbb", "2bbb", "bbb2", "20"}}}}}, }}, + {"commit", nil}, }} - runCases(t, nil, testcases, "current", nil) + ts.Run() } // TestGeneratedInvisiblePrimaryKey validates that generated invisible primary keys are sent in row events. @@ -2100,279 +1923,39 @@ func TestGeneratedInvisiblePrimaryKey(t *testing.T) { if !env.HasCapability(testenv.ServerCapabilityGeneratedInvisiblePrimaryKey) { t.Skip("skipping test as server does not support generated invisible primary keys") } - execStatements(t, []string{ - "SET @@session.sql_generate_invisible_primary_key=ON;", - "create table t1(val varbinary(6))", - "SET @@session.sql_generate_invisible_primary_key=OFF;", - }) - defer execStatements(t, []string{ - "drop table t1", - }) - engine.se.Reload(context.Background()) - queries := []string{ - "begin", - "insert into t1 values ('aaa')", - "update t1 set val = 'bbb' where my_row_id = 1", - "commit", + + execStatement(t, "SET @@session.sql_generate_invisible_primary_key=ON") + defer execStatement(t, "SET @@session.sql_generate_invisible_primary_key=OFF") + ts := &TestSpec{ + t: t, + ddls: []string{ + "create table t1(val varbinary(6))", + }, + options: nil, } + defer ts.Close() + + ts.Init() fe := &TestFieldEvent{ table: "t1", - db: "vttest", + db: testenv.DBName, cols: []*TestColumn{ {name: "my_row_id", dataType: "UINT64", colType: "bigint unsigned", len: 20, collationID: 63}, {name: "val", dataType: "VARBINARY", colType: "varbinary(6)", len: 6, collationID: 63}, }, } - testcases := []testcase{{ - input: queries, - output: [][]string{{ - `begin`, - fe.String(), - `type:ROW row_event:{table_name:"t1" row_changes:{after:{lengths:1 lengths:3 values:"1aaa"}}}`, - `type:ROW row_event:{table_name:"t1" row_changes:{before:{lengths:1 lengths:3 values:"1aaa"} after:{lengths:1 lengths:3 values:"1bbb"}}}`, - `gtid`, - `commit`, + ts.tests = [][]*TestQuery{{ + {"begin", nil}, + {"insert into t1 values ('aaa')", []TestRowEvent{ + {event: fe.String()}, + {spec: &TestRowEventSpec{table: "t1", changes: []TestRowChange{{after: []string{"1", "aaa"}}}}}, }}, + {"update t1 set val = 'bbb' where my_row_id = 1", []TestRowEvent{ + {spec: &TestRowEventSpec{table: "t1", changes: []TestRowChange{{before: []string{"1", "aaa"}, after: []string{"1", "bbb"}}}}}, + }}, + {"commit", nil}, }} - runCases(t, nil, testcases, "current", nil) -} - -func runCases(t *testing.T, filter *binlogdatapb.Filter, testcases []testcase, position string, tablePK []*binlogdatapb.TableLastPK) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - wg, ch := startStream(ctx, t, filter, position, tablePK) - defer wg.Wait() - // If position is 'current', we wait for a heartbeat to be - // sure the vstreamer has started. - if position == "current" { - log.Infof("Starting stream with current position") - expectLog(ctx, t, "current pos", ch, [][]string{{`gtid`, `type:OTHER`}}) - } - - log.Infof("Starting to run test cases") - for _, tcase := range testcases { - switch input := tcase.input.(type) { - case []string: - execStatements(t, input) - case string: - execStatement(t, input) - default: - t.Fatalf("unexpected input: %#v", input) - } - engine.se.Reload(ctx) - expectLog(ctx, t, tcase.input, ch, tcase.output) - } - - cancel() - if evs, ok := <-ch; ok { - t.Fatalf("unexpected evs: %v", evs) - } - log.Infof("Last line of runCases") -} - -func expectLog(ctx context.Context, t *testing.T, input any, ch <-chan []*binlogdatapb.VEvent, output [][]string) { - timer := time.NewTimer(1 * time.Minute) - defer timer.Stop() - for _, wantset := range output { - var evs []*binlogdatapb.VEvent - for { - select { - case allevs, ok := <-ch: - if !ok { - t.Fatal("expectLog: not ok, stream ended early") - } - for _, ev := range allevs { - // Ignore spurious heartbeats that can happen on slow machines. - if ev.Type == binlogdatapb.VEventType_HEARTBEAT { - continue - } - if ev.Throttled { - continue - } - evs = append(evs, ev) - } - case <-ctx.Done(): - t.Fatalf("expectLog: Done(), stream ended early") - case <-timer.C: - t.Fatalf("expectLog: timed out waiting for events: %v", wantset) - } - if len(evs) != 0 { - break - } - } - - numEventsToMatch := len(evs) - if len(wantset) != len(evs) { - log.Warningf("%v: evs\n%v, want\n%v, >> got length %d, wanted length %d", input, evs, wantset, len(evs), len(wantset)) - if len(wantset) < len(evs) { - numEventsToMatch = len(wantset) - } - } - for i := 0; i < numEventsToMatch; i++ { - want := wantset[i] - // CurrentTime is not testable. - evs[i].CurrentTime = 0 - evs[i].Keyspace = "" - evs[i].Shard = "" - switch want { - case "begin": - if evs[i].Type != binlogdatapb.VEventType_BEGIN { - t.Fatalf("%v (%d): event: %v, want gtid or begin", input, i, evs[i]) - } - case "gtid": - if evs[i].Type != binlogdatapb.VEventType_GTID { - t.Fatalf("%v (%d): event: %v, want gtid", input, i, evs[i]) - } - case "lastpk": - if evs[i].Type != binlogdatapb.VEventType_LASTPK { - t.Fatalf("%v (%d): event: %v, want lastpk", input, i, evs[i]) - } - case "commit": - if evs[i].Type != binlogdatapb.VEventType_COMMIT { - t.Fatalf("%v (%d): event: %v, want commit", input, i, evs[i]) - } - case "other": - if evs[i].Type != binlogdatapb.VEventType_OTHER { - t.Fatalf("%v (%d): event: %v, want other", input, i, evs[i]) - } - case "ddl": - if evs[i].Type != binlogdatapb.VEventType_DDL { - t.Fatalf("%v (%d): event: %v, want ddl", input, i, evs[i]) - } - case "copy_completed": - if evs[i].Type != binlogdatapb.VEventType_COPY_COMPLETED { - t.Fatalf("%v (%d): event: %v, want copy_completed", input, i, evs[i]) - } - default: - evs[i].Timestamp = 0 - if evs[i].Type == binlogdatapb.VEventType_FIELD { - for j := range evs[i].FieldEvent.Fields { - evs[i].FieldEvent.Fields[j].Flags = 0 - if ignoreKeyspaceShardInFieldAndRowEvents { - evs[i].FieldEvent.Keyspace = "" - evs[i].FieldEvent.Shard = "" - } - } - } - if ignoreKeyspaceShardInFieldAndRowEvents && evs[i].Type == binlogdatapb.VEventType_ROW { - evs[i].RowEvent.Keyspace = "" - evs[i].RowEvent.Shard = "" - } - if !testRowEventFlags && evs[i].Type == binlogdatapb.VEventType_ROW { - evs[i].RowEvent.Flags = 0 - } - want = env.RemoveAnyDeprecatedDisplayWidths(want) - if got := fmt.Sprintf("%v", evs[i]); got != want { - log.Errorf("%v (%d): event:\n%q, want\n%q", input, i, got, want) - t.Fatalf("%v (%d): event:\n%q, want\n%q", input, i, got, want) - } - } - } - if len(wantset) != len(evs) { - t.Fatalf("%v: evs\n%v, want\n%v, got length %d, wanted length %d", input, evs, wantset, len(evs), len(wantset)) - } - } -} - -func startStream(ctx context.Context, t *testing.T, filter *binlogdatapb.Filter, position string, tablePKs []*binlogdatapb.TableLastPK) (*sync.WaitGroup, <-chan []*binlogdatapb.VEvent) { - switch position { - case "": - position = primaryPosition(t) - case "vscopy": - position = "" - } - - wg := sync.WaitGroup{} - wg.Add(1) - ch := make(chan []*binlogdatapb.VEvent) - - go func() { - defer close(ch) - defer wg.Done() - vstream(ctx, t, position, tablePKs, filter, ch) - }() - return &wg, ch -} - -func vstream(ctx context.Context, t *testing.T, pos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, ch chan []*binlogdatapb.VEvent) error { - if filter == nil { - filter = &binlogdatapb.Filter{ - Rules: []*binlogdatapb.Rule{{ - Match: "/.*/", - }}, - } - } - return engine.Stream(ctx, pos, tablePKs, filter, throttlerapp.VStreamerName, func(evs []*binlogdatapb.VEvent) error { - timer := time.NewTimer(2 * time.Second) - defer timer.Stop() - - log.Infof("Received events: %v", evs) - select { - case ch <- evs: - case <-ctx.Done(): - return fmt.Errorf("engine.Stream Done() stream ended early") - case <-timer.C: - t.Log("VStream timed out waiting for events") - return io.EOF - } - return nil - }) -} - -func execStatement(t *testing.T, query string) { - t.Helper() - if err := env.Mysqld.ExecuteSuperQuery(context.Background(), query); err != nil { - t.Fatal(err) - } -} - -func execStatements(t *testing.T, queries []string) { - if err := env.Mysqld.ExecuteSuperQueryList(context.Background(), queries); err != nil { - t.Fatal(err) - } -} - -func primaryPosition(t *testing.T) string { - t.Helper() - // We use the engine's cp because there is one test that overrides - // the flavor to FilePos. If so, we have to obtain the position - // in that flavor format. - connParam, err := engine.env.Config().DB.DbaWithDB().MysqlParams() - if err != nil { - t.Fatal(err) - } - conn, err := mysql.Connect(context.Background(), connParam) - if err != nil { - t.Fatal(err) - } - defer conn.Close() - pos, err := conn.PrimaryPosition() - if err != nil { - t.Fatal(err) - } - return replication.EncodePosition(pos) -} - -func setVSchema(t *testing.T, vschema string) { - t.Helper() - - curCount := engine.vschemaUpdates.Get() - if err := env.SetVSchema(vschema); err != nil { - t.Fatal(err) - } - // Wait for curCount to go up. - updated := false - for i := 0; i < 10; i++ { - if engine.vschemaUpdates.Get() != curCount { - updated = true - break - } - time.Sleep(10 * time.Millisecond) - } - if !updated { - log.Infof("vschema did not get updated") - t.Error("vschema did not get updated") - } + ts.Run() }