Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VStream: When doing a full copy, first send table schemas as DDL statement events #17150

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/local/vstream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
*/
func main() {
ctx := context.Background()
streamCustomer := true
streamCustomer := false
var vgtid *binlogdatapb.VGtid
if streamCustomer {
vgtid = &binlogdatapb.VGtid{
Expand Down
41 changes: 41 additions & 0 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ const tabletPickerContextTimeout = 90 * time.Second
// ending the stream from the tablet.
const stopOnReshardDelay = 500 * time.Millisecond

type globalTableName struct {
Keyspace string
Table string
}

// vstream contains the metadata for one VStream request.
type vstream struct {
// mu protects parts of vgtid, the semantics of a send, and journaler.
Expand Down Expand Up @@ -123,6 +128,9 @@ type vstream struct {
// the shard map tracking the copy completion, keyed by streamId. streamId is of the form <keyspace>.<shard>
copyCompletedShard map[string]struct{}

// A map of initial table schemas (CREATE TABLE) sent to the client if we're copying tables.
copySchemaSent map[globalTableName]struct{}

vsm *vstreamManager

eventCh chan []*binlogdatapb.VEvent
Expand Down Expand Up @@ -595,6 +603,39 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
}
}

if sgtid.Gtid == "" {
// We're copying the tables, so let's first send the table schemas in the
// stream so that the client can create the tables if they like and follow
// the full lifecycle of the tables.
if vs.copySchemaSent == nil {
vs.copySchemaSent = make(map[globalTableName]struct{}, 5)
}
ddlevents := make([]*binlogdatapb.VEvent, 0, 5)
if err := tabletConn.GetSchema(ctx, target, querypb.SchemaTableType_TABLES, nil, func(res *querypb.GetSchemaResponse) error {
for tableName, schema := range res.TableDefinition {
key := globalTableName{Keyspace: sgtid.Keyspace, Table: tableName}
func() {
vs.mu.Lock()
defer vs.mu.Unlock()
if _, ok := vs.copySchemaSent[key]; !ok {
ev := &binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_DDL,
Statement: schema,
}
ddlevents = append(ddlevents, ev)
vs.copySchemaSent[key] = struct{}{}
}
}()
}
return nil
}); err != nil {
return vterrors.Wrapf(err, "failed to get schema for keyspace %s", sgtid.Keyspace)
}
if err := vs.send(ddlevents); err != nil {
return vterrors.Wrapf(err, "failed to send schema DDL events for keyspace %s", sgtid.Keyspace)
}
}

// Safe to access sgtid.Gtid here (because it can't change until streaming begins).
req := &binlogdatapb.VStreamRequest{
Target: target,
Expand Down
Loading