diff --git a/go/vt/vtgate/endtoend/main_test.go b/go/vt/vtgate/endtoend/main_test.go index 046af36a3dd..29783035992 100644 --- a/go/vt/vtgate/endtoend/main_test.go +++ b/go/vt/vtgate/endtoend/main_test.go @@ -46,6 +46,12 @@ create table t1( primary key(id1) ) Engine=InnoDB; +create table t1_copy_basic( + id1 bigint, + id2 bigint, + primary key(id1) +) Engine=InnoDB; + create table t1_id2_idx( id2 bigint, keyspace_id varbinary(10), @@ -134,6 +140,12 @@ create table t1_sharded( Name: "t1_id2_vdx", }}, }, + "t1_copy_basic": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "id1", + Name: "hash", + }}, + }, "t1_sharded": { ColumnVindexes: []*vschemapb.ColumnVindex{{ Column: "id1", diff --git a/go/vt/vtgate/endtoend/vstream_test.go b/go/vt/vtgate/endtoend/vstream_test.go index 477bb2518b5..2fc6cf2105b 100644 --- a/go/vt/vtgate/endtoend/vstream_test.go +++ b/go/vt/vtgate/endtoend/vstream_test.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "io" + "regexp" "sync" "testing" @@ -98,18 +99,29 @@ func TestVStream(t *testing.T) { // In a real world scenario where every mysql instance hosts only one // keyspace/shard, we should expect only a single event. // The events could come in any order as the scatter insert runs in parallel. - emptyEventSkipped := false for i := 0; i < 2; i++ { events, err := reader.Recv() if err != nil { t.Fatal(err) } fmt.Printf("events: %v\n", events) - // An empty transaction has three events: begin, gtid and commit. - if len(events) == 3 && !emptyEventSkipped { - emptyEventSkipped = true - continue + + // An empty transaction has either: + // - three events: begin, vgtid and commit. + // - two events: vgtid and other + if len(events) == 3 { + if events[0].Type == binlogdatapb.VEventType_BEGIN && + events[1].Type == binlogdatapb.VEventType_VGTID && + events[2].Type == binlogdatapb.VEventType_COMMIT { + continue + } + } else if len(events) == 2 { + if events[0].Type == binlogdatapb.VEventType_VGTID && + events[1].Type == binlogdatapb.VEventType_OTHER { + continue + } } + if len(events) != 5 { t.Errorf("Unexpected event length: %v", events) continue @@ -381,6 +393,149 @@ func TestVStreamSharded(t *testing.T) { } +// TestVStreamCopyTransactions tests that we are properly wrapping +// ROW events in the stream with BEGIN and COMMIT events. +func TestVStreamCopyTransactions(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + keyspace := "ks" + shards := []string{"-80", "80-"} + table := "t1_copy_basic" + beginEventSeen, commitEventSeen := false, false + numResultInTrx := 0 + vgtid := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{ + { + Keyspace: keyspace, + Shard: shards[0], + Gtid: "", // Start a vstream copy + }, + { + Keyspace: keyspace, + Shard: shards[1], + Gtid: "", // Start a vstream copy + }, + }, + } + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: table, + Filter: fmt.Sprintf("select * from %s", table), + }}, + } + + gconn, conn, _, closeConnections := initialize(ctx, t) + defer closeConnections() + + // Clear any existing data. + q := fmt.Sprintf("delete from %s", table) + _, err := conn.ExecuteFetch(q, -1, false) + require.NoError(t, err, "error clearing data: %v", err) + + // Generate some test data. Enough to cross the default + // vstream_packet_size threshold. + for i := 1; i <= 100000; i++ { + values := fmt.Sprintf("(%d, %d)", i, i) + q := fmt.Sprintf("insert into %s (id1, id2) values %s", table, values) + _, err := conn.ExecuteFetch(q, 1, false) + require.NoError(t, err, "error inserting data: %v", err) + } + + // Start a vstream. + reader, err := gconn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, nil) + require.NoError(t, err, "error starting vstream: %v", err) + +recvLoop: + for { + finished := true + + vevents, err := reader.Recv() + numResultInTrx++ + eventCount := len(vevents) + t.Logf("------------------ Received %d events in response #%d for the transaction ------------------\n", + eventCount, numResultInTrx) + switch err { + case nil: + + for _, event := range vevents { + switch event.Type { + case binlogdatapb.VEventType_BEGIN: + require.False(t, beginEventSeen, "received a second BEGIN event within the transaction: numResultInTrx=%d\n", + numResultInTrx) + beginEventSeen = true + t.Logf("Found BEGIN event, beginEventSeen=%t, commitEventSeen=%t, eventType=%v, numResultInTrx=%d\n", + beginEventSeen, commitEventSeen, event.Type, numResultInTrx) + require.False(t, commitEventSeen, "received a BEGIN event when expecting a COMMIT event: numResultInTrx=%d\n", + numResultInTrx) + case binlogdatapb.VEventType_VGTID: + t.Logf("Found VGTID event, beginEventSeen=%t, commitEventSeen=%t, eventType=%v, numResultInTrx=%d, event=%+v\n", + beginEventSeen, commitEventSeen, event.Type, numResultInTrx, event) + + finished = true + for _, shardGtid := range event.Vgtid.ShardGtids { + finished = finished && len(shardGtid.TablePKs) == 0 + } + + case binlogdatapb.VEventType_FIELD: + t.Logf("Found FIELD event, beginEventSeen=%t, commitEventSeen=%t, eventType=%v, numResultInTrx=%d, event=%+v\n", + beginEventSeen, commitEventSeen, event.Type, numResultInTrx, event) + case binlogdatapb.VEventType_ROW: + // Uncomment if you need to do more debugging. + // t.Logf("Found ROW event, beginEventSeen=%t, commitEventSeen=%t, eventType=%v, numResultInTrx=%d, event=%+v\n", + // beginEventSeen, commitEventSeen, event.Type, numResultInTrx, event) + + case binlogdatapb.VEventType_COMMIT: + commitEventSeen = true + t.Logf("Found COMMIT event, beginEventSeen=%t, commitEventSeen=%t, eventType=%v, numResultInTrx=%d, event=%+v\n", + beginEventSeen, commitEventSeen, event.Type, numResultInTrx, event) + require.True(t, beginEventSeen, "received COMMIT event before receiving BEGIN event: numResultInTrx=%d\n", + numResultInTrx) + + if finished { + t.Logf("Finished vstream copy\n") + t.Logf("-------------------------------------------------------------------\n\n") + cancel() + break recvLoop + } + + default: + t.Logf("Found extraneous event: %+v\n", event) + } + if beginEventSeen && commitEventSeen { + t.Logf("Received both BEGIN and COMMIT, so resetting transactional state\n") + beginEventSeen = false + commitEventSeen = false + numResultInTrx = 0 + } + } + case io.EOF: + t.Logf("vstream ended\n") + t.Logf("-------------------------------------------------------------------\n\n") + cancel() + return + default: + require.FailNowf(t, "unexpected error", "encountered error in vstream: %v", err) + return + } + } + // The last response, when the vstream copy completes, does not + // typically contain ROW events. + if beginEventSeen || commitEventSeen { + require.True(t, (beginEventSeen && commitEventSeen), "did not receive both BEGIN and COMMIT events in the final ROW event set") + } +} + +func removeAnyDeprecatedDisplayWidths(orig string) string { + var adjusted string + baseIntType := "int" + intRE := regexp.MustCompile(`(?i)int\(([0-9]*)?\)`) + adjusted = intRE.ReplaceAllString(orig, baseIntType) + baseYearType := "year" + yearRE := regexp.MustCompile(`(?i)year\(([0-9]*)?\)`) + adjusted = yearRE.ReplaceAllString(adjusted, baseYearType) + return adjusted +} + var printMu sync.Mutex func printEvents(evs []*binlogdatapb.VEvent) { diff --git a/go/vt/vttablet/tabletserver/vstreamer/copy.go b/go/vt/vttablet/tabletserver/vstreamer/copy.go index 0065555047d..c4a9f626f6b 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/copy.go +++ b/go/vt/vttablet/tabletserver/vstreamer/copy.go @@ -255,12 +255,26 @@ func (uvs *uvstreamer) copyTable(ctx context.Context, tableName string) error { log.Infof("sendFieldEvent returned error %v", err) return err } + // sendFieldEvent() sends a BEGIN event first. + uvs.inTransaction = true } + if len(rows.Rows) == 0 { log.V(2).Infof("0 rows returned for table %s", tableName) return nil } + // We are about to send ROW events, so we need to ensure + // that we do so within a transaction. The COMMIT event + // will be sent in sendEventsForRows() below. + if !uvs.inTransaction { + evs := []*binlogdatapb.VEvent{{ + Type: binlogdatapb.VEventType_BEGIN, + }} + uvs.send(evs) + uvs.inTransaction = true + } + newLastPK = sqltypes.CustomProto3ToResult(uvs.pkfields, &querypb.QueryResult{ Fields: rows.Fields, Rows: []*querypb.Row{rows.Lastpk}, @@ -271,6 +285,8 @@ func (uvs *uvstreamer) copyTable(ctx context.Context, tableName string) error { log.Infof("sendEventsForRows returned error %v", err) return err } + // sendEventsForRows() sends a COMMIT event last. + uvs.inTransaction = false uvs.setCopyState(tableName, qrLastPK) log.V(2).Infof("NewLastPK: %v", qrLastPK) diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go index 40bf27dd0cf..ea3885bff64 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go @@ -51,13 +51,17 @@ type uvstreamer struct { cancel func() // input parameters - vse *Engine - send func([]*binlogdatapb.VEvent) error - cp dbconfigs.Connector - se *schema.Engine - startPos string - filter *binlogdatapb.Filter - inTablePKs []*binlogdatapb.TableLastPK + vse *Engine + send func([]*binlogdatapb.VEvent) error + cp dbconfigs.Connector + se *schema.Engine + startPos string + // Are we currently in an explicit transaction? + // If we are not, and we're about to send ROW + // events, then we need to send a BEGIN event first. + inTransaction bool + filter *binlogdatapb.Filter + inTablePKs []*binlogdatapb.TableLastPK vschema *localVSchema