Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Undo vschema changes on LookupVindex workflow creation fail #16810

Merged
merged 11 commits into from
Sep 26, 2024
28 changes: 23 additions & 5 deletions go/vt/vtctl/workflow/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ type testTMClient struct {

mu sync.Mutex
vrQueries map[int][]*queryResult
createVReplicationWorkflowRequests map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest
createVReplicationWorkflowRequests map[uint32]*createVReplicationWorkflowRequestResponse
readVReplicationWorkflowRequests map[uint32]*tabletmanagerdatapb.ReadVReplicationWorkflowRequest
primaryPositions map[uint32]string
vdiffRequests map[uint32]*vdiffRequestResponse
Expand All @@ -289,7 +289,7 @@ func newTestTMClient(env *testEnv) *testTMClient {
return &testTMClient{
schema: make(map[string]*tabletmanagerdatapb.SchemaDefinition),
vrQueries: make(map[int][]*queryResult),
createVReplicationWorkflowRequests: make(map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest),
createVReplicationWorkflowRequests: make(map[uint32]*createVReplicationWorkflowRequestResponse),
readVReplicationWorkflowRequests: make(map[uint32]*tabletmanagerdatapb.ReadVReplicationWorkflowRequest),
readVReplicationWorkflowsResponses: make(map[string][]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse),
primaryPositions: make(map[uint32]string),
Expand All @@ -304,9 +304,12 @@ func (tmc *testTMClient) CreateVReplicationWorkflow(ctx context.Context, tablet
defer tmc.mu.Unlock()

if expect := tmc.createVReplicationWorkflowRequests[tablet.Alias.Uid]; expect != nil {
if !proto.Equal(expect, req) {
if expect.req != nil && !proto.Equal(expect.req, req) {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected CreateVReplicationWorkflow request: got %+v, want %+v", req, expect)
}
if expect.res != nil {
return expect.res, expect.err
}
}
res := sqltypes.MakeTestResult(sqltypes.MakeTestFields("rowsaffected", "int64"), "1")
return &tabletmanagerdatapb.CreateVReplicationWorkflowResponse{Result: sqltypes.ResultToProto3(res)}, nil
Expand Down Expand Up @@ -418,20 +421,29 @@ func (tmc *testTMClient) expectVRQueryResultOnKeyspaceTablets(keyspace string, q
}
}

func (tmc *testTMClient) expectCreateVReplicationWorkflowRequest(tabletID uint32, req *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) {
func (tmc *testTMClient) expectCreateVReplicationWorkflowRequest(tabletID uint32, req *createVReplicationWorkflowRequestResponse) {
tmc.mu.Lock()
defer tmc.mu.Unlock()

tmc.createVReplicationWorkflowRequests[tabletID] = req
}

func (tmc *testTMClient) expectCreateVReplicationWorkflowRequestOnTargetTablets(req *createVReplicationWorkflowRequestResponse) {
tmc.mu.Lock()
defer tmc.mu.Unlock()

for _, tablet := range tmc.env.tablets[tmc.env.targetKeyspace.KeyspaceName] {
tmc.createVReplicationWorkflowRequests[tablet.Alias.Uid] = req
}
}

func (tmc *testTMClient) VReplicationExec(ctx context.Context, tablet *topodatapb.Tablet, query string) (*querypb.QueryResult, error) {
tmc.mu.Lock()
defer tmc.mu.Unlock()

qrs := tmc.vrQueries[int(tablet.Alias.Uid)]
if len(qrs) == 0 {
return nil, fmt.Errorf("tablet %v does not expect any more queries: %s", tablet, query)
return nil, fmt.Errorf("tablet %v does not expect any more queries: %q", tablet, query)
}
matched := false
if qrs[0].query[0] == '/' {
Expand Down Expand Up @@ -479,6 +491,12 @@ type vdiffRequestResponse struct {
err error
}

type createVReplicationWorkflowRequestResponse struct {
req *tabletmanagerdatapb.CreateVReplicationWorkflowRequest
res *tabletmanagerdatapb.CreateVReplicationWorkflowResponse
err error
}

func (tmc *testTMClient) expectVDiffRequest(tablet *topodatapb.Tablet, vrr *vdiffRequestResponse) {
tmc.mu.Lock()
defer tmc.mu.Unlock()
Expand Down
10 changes: 6 additions & 4 deletions go/vt/vtctl/workflow/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,11 @@ func (mz *materializer) createWorkflowStreams(req *tabletmanagerdatapb.CreateVRe
if err := validateNewWorkflow(mz.ctx, mz.ts, mz.tmc, mz.ms.TargetKeyspace, mz.ms.Workflow); err != nil {
return err
}

err := mz.buildMaterializer()
if err != nil {
return err
}
if err := mz.deploySchema(); err != nil {
return err
}

var workflowSubType binlogdatapb.VReplicationWorkflowSubType
workflowSubType, err = mz.getWorkflowSubType()
Expand All @@ -133,6 +131,10 @@ func (mz *materializer) createWorkflowStreams(req *tabletmanagerdatapb.CreateVRe
}
req.Options = optionsJSON

if err := mz.deploySchema(); err != nil {
return err
}

Comment on lines +134 to +137
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

return mz.forAllTargets(func(target *topo.ShardInfo) error {
targetPrimary, err := mz.ts.GetTablet(mz.ctx, target.PrimaryAlias)
if err != nil {
Expand Down Expand Up @@ -304,7 +306,7 @@ func (mz *materializer) deploySchema() error {
continue
}
if ts.CreateDdl == "" {
return fmt.Errorf("target table %v does not exist and there is no create ddl defined", ts.TargetTable)
return fmt.Errorf("target table %s does not exist and there is no create ddl defined", ts.TargetTable)
}

var err error
Expand Down
56 changes: 47 additions & 9 deletions go/vt/vtctl/workflow/materializer_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/mysqlctl/tmutils"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/topotools"
Expand Down Expand Up @@ -120,16 +121,39 @@ func newTestMaterializerEnv(t *testing.T, ctx context.Context, ms *vtctldatapb.M
if err == nil {
tableName = table.Name.String()
}
var (
cols []string
fields []*querypb.Field
)
if ts.CreateDdl != "" {
stmt, err := env.venv.Parser().ParseStrictDDL(ts.CreateDdl)
require.NoError(t, err)
ddl, ok := stmt.(*sqlparser.CreateTable)
require.True(t, ok)
cols = make([]string, len(ddl.TableSpec.Columns))
fields = make([]*querypb.Field, len(ddl.TableSpec.Columns))
for i, col := range ddl.TableSpec.Columns {
cols[i] = col.Name.String()
fields[i] = &querypb.Field{
Name: col.Name.String(),
Type: col.Type.SQLType(),
}
}
}
env.tmc.schema[ms.SourceKeyspace+"."+tableName] = &tabletmanagerdatapb.SchemaDefinition{
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{
Name: tableName,
Schema: fmt.Sprintf("%s_schema", tableName),
Name: tableName,
Schema: ts.CreateDdl,
Columns: cols,
Fields: fields,
}},
}
env.tmc.schema[ms.TargetKeyspace+"."+ts.TargetTable] = &tabletmanagerdatapb.SchemaDefinition{
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{
Name: ts.TargetTable,
Schema: fmt.Sprintf("%s_schema", ts.TargetTable),
Name: ts.TargetTable,
Schema: ts.CreateDdl,
Columns: cols,
Fields: fields,
}},
}
}
Expand Down Expand Up @@ -199,7 +223,7 @@ type testMaterializerTMClient struct {

mu sync.Mutex
vrQueries map[int][]*queryResult
createVReplicationWorkflowRequests map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest
createVReplicationWorkflowRequests map[uint32]*createVReplicationWorkflowRequestResponse

// Used to confirm the number of times WorkflowDelete was called.
workflowDeleteCalls int
Expand All @@ -215,21 +239,29 @@ func newTestMaterializerTMClient(keyspace string, sourceShards []string, tableSe
sourceShards: sourceShards,
tableSettings: tableSettings,
vrQueries: make(map[int][]*queryResult),
createVReplicationWorkflowRequests: make(map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest),
createVReplicationWorkflowRequests: make(map[uint32]*createVReplicationWorkflowRequestResponse),
}
}

func (tmc *testMaterializerTMClient) CreateVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, request *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) (*tabletmanagerdatapb.CreateVReplicationWorkflowResponse, error) {
tmc.mu.Lock()
defer tmc.mu.Unlock()
if expect := tmc.createVReplicationWorkflowRequests[tablet.Alias.Uid]; expect != nil {
if !proto.Equal(expect, request) {
if expect.req != nil && !proto.Equal(expect.req, request) {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected CreateVReplicationWorkflow request: got %+v, want %+v", request, expect)
}
if expect.res != nil {
return expect.res, expect.err
}
}
res := sqltypes.MakeTestResult(sqltypes.MakeTestFields("rowsaffected", "int64"), "1")
return &tabletmanagerdatapb.CreateVReplicationWorkflowResponse{Result: sqltypes.ResultToProto3(res)}, nil
}

func (tmc *testMaterializerTMClient) ReadVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, request *tabletmanagerdatapb.ReadVReplicationWorkflowRequest) (*tabletmanagerdatapb.ReadVReplicationWorkflowResponse, error) {
tmc.mu.Lock()
defer tmc.mu.Unlock()

if tmc.readVReplicationWorkflow != nil {
return tmc.readVReplicationWorkflow(ctx, tablet, request)
}
Expand Down Expand Up @@ -283,6 +315,9 @@ func (tmc *testMaterializerTMClient) DeleteVReplicationWorkflow(ctx context.Cont
}

func (tmc *testMaterializerTMClient) GetSchema(ctx context.Context, tablet *topodatapb.Tablet, request *tabletmanagerdatapb.GetSchemaRequest) (*tabletmanagerdatapb.SchemaDefinition, error) {
tmc.mu.Lock()
defer tmc.mu.Unlock()

schemaDefn := &tabletmanagerdatapb.SchemaDefinition{}
for _, table := range request.Tables {
if table == "/.*/" {
Expand Down Expand Up @@ -315,7 +350,7 @@ func (tmc *testMaterializerTMClient) expectVRQuery(tabletID int, query string, r
})
}

func (tmc *testMaterializerTMClient) expectCreateVReplicationWorkflowRequest(tabletID uint32, req *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) {
func (tmc *testMaterializerTMClient) expectCreateVReplicationWorkflowRequest(tabletID uint32, req *createVReplicationWorkflowRequestResponse) {
tmc.mu.Lock()
defer tmc.mu.Unlock()

Expand Down Expand Up @@ -344,7 +379,7 @@ func (tmc *testMaterializerTMClient) VReplicationExec(ctx context.Context, table

qrs := tmc.vrQueries[int(tablet.Alias.Uid)]
if len(qrs) == 0 {
return nil, fmt.Errorf("tablet %v does not expect any more queries: %s", tablet, query)
return nil, fmt.Errorf("tablet %v does not expect any more queries: %q", tablet, query)
}
matched := false
if qrs[0].query[0] == '/' {
Expand Down Expand Up @@ -403,6 +438,9 @@ func (tmc *testMaterializerTMClient) HasVReplicationWorkflows(ctx context.Contex
}

func (tmc *testMaterializerTMClient) ReadVReplicationWorkflows(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.ReadVReplicationWorkflowsRequest) (*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse, error) {
tmc.mu.Lock()
defer tmc.mu.Unlock()

workflowType := binlogdatapb.VReplicationWorkflowType_MoveTables
if len(req.IncludeWorkflows) > 0 {
for _, wf := range req.IncludeWorkflows {
Expand Down
Loading
Loading