diff --git a/go/cmd/vtctldclient/command/vreplication/reshard/create.go b/go/cmd/vtctldclient/command/vreplication/reshard/create.go index b8506ae61d0..05700dbb9fe 100644 --- a/go/cmd/vtctldclient/command/vreplication/reshard/create.go +++ b/go/cmd/vtctldclient/command/vreplication/reshard/create.go @@ -60,9 +60,8 @@ func commandReshardCreate(cmd *cobra.Command, args []string) error { cli.FinishedParsing(cmd) req := &vtctldatapb.ReshardCreateRequest{ - Workflow: common.BaseOptions.Workflow, - Keyspace: common.BaseOptions.TargetKeyspace, - + Workflow: common.BaseOptions.Workflow, + Keyspace: common.BaseOptions.TargetKeyspace, TabletTypes: common.CreateOptions.TabletTypes, TabletSelectionPreference: tsp, Cells: common.CreateOptions.Cells, @@ -70,10 +69,9 @@ func commandReshardCreate(cmd *cobra.Command, args []string) error { DeferSecondaryKeys: common.CreateOptions.DeferSecondaryKeys, AutoStart: common.CreateOptions.AutoStart, StopAfterCopy: common.CreateOptions.StopAfterCopy, - - SourceShards: reshardCreateOptions.sourceShards, - TargetShards: reshardCreateOptions.targetShards, - SkipSchemaCopy: reshardCreateOptions.skipSchemaCopy, + SourceShards: reshardCreateOptions.sourceShards, + TargetShards: reshardCreateOptions.targetShards, + SkipSchemaCopy: reshardCreateOptions.skipSchemaCopy, } resp, err := common.GetClient().ReshardCreate(common.GetCommandCtx(), req) if err != nil { diff --git a/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go b/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go index bfe0c81ebce..30d9e2b2eb5 100644 --- a/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go +++ b/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go @@ -27,7 +27,6 @@ import ( "google.golang.org/protobuf/encoding/protojson" "vitess.io/vitess/go/test/endtoend/cluster" - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" @@ -54,20 +53,33 @@ func TestVtctldclientCLI(t *testing.T) { require.NotNil(t, zone2) defer vc.TearDown() - sourceKeyspace := "product" - targetKeyspace := "customer" + sourceKeyspaceName := "product" + targetKeyspaceName := "customer" var mt iMoveTables workflowName := "wf1" targetTabs := setupMinimalCustomerKeyspace(t) t.Run("MoveTablesCreateFlags1", func(t *testing.T) { - testMoveTablesFlags1(t, &mt, sourceKeyspace, targetKeyspace, workflowName, targetTabs) + testMoveTablesFlags1(t, &mt, sourceKeyspaceName, targetKeyspaceName, workflowName, targetTabs) }) t.Run("MoveTablesCreateFlags2", func(t *testing.T) { - testMoveTablesFlags2(t, &mt, sourceKeyspace, targetKeyspace, workflowName, targetTabs) + testMoveTablesFlags2(t, &mt, sourceKeyspaceName, targetKeyspaceName, workflowName, targetTabs) + }) + t.Run("MoveTablesCompleteFlags3", func(t *testing.T) { + testMoveTablesFlags3(t, sourceKeyspaceName, targetKeyspaceName, targetTabs) }) - t.Run("MoveTablesCompleteFlags", func(t *testing.T) { - testMoveTablesFlags3(t, sourceKeyspace, targetKeyspace, targetTabs) + t.Run("Reshard", func(t *testing.T) { + cell := vc.Cells["zone1"] + targetKeyspace := cell.Keyspaces[targetKeyspaceName] + sourceShard := "-80" + newShards := "-40,40-80" + require.NoError(t, vc.AddShards(t, []*Cell{cell}, targetKeyspace, newShards, 1, 0, 400, nil)) + reshardWorkflowName := "reshard" + tablets := map[string]*cluster.VttabletProcess{ + "-40": targetKeyspace.Shards["-40"].Tablets["zone1-400"].Vttablet, + "40-80": targetKeyspace.Shards["40-80"].Tablets["zone1-500"].Vttablet, + } + splitShard(t, targetKeyspaceName, reshardWorkflowName, sourceShard, newShards, tablets) }) } @@ -81,34 +93,31 @@ func testMoveTablesFlags1(t *testing.T, mt *iMoveTables, sourceKeyspace, targetK } completeFlags := []string{"--keep-routing-rules", "--keep-data"} switchFlags := []string{} + // Test one set of MoveTable flags. *mt = createMoveTables(t, sourceKeyspace, targetKeyspace, workflowName, tables, createFlags, completeFlags, switchFlags) (*mt).Show() - moveTablesOutput := (*mt).GetLastOutput() - // Test one set of MoveTable flags. + moveTablesResponse := getMoveTablesShowResponse(mt) + workflowResponse := getWorkflow(targetKeyspace, workflowName) - workflowOutput, err := vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", "customer", "show", "--workflow", "wf1") - require.NoError(t, err) - var moveTablesResponse vtctldatapb.GetWorkflowsResponse - err = protojson.Unmarshal([]byte(moveTablesOutput), &moveTablesResponse) - require.NoError(t, err) - - var workflowResponse vtctldatapb.GetWorkflowsResponse - err = protojson.Unmarshal([]byte(workflowOutput), &workflowResponse) - require.NoError(t, err) - - moveTablesResponse.Workflows[0].MaxVReplicationTransactionLag = 0 - moveTablesResponse.Workflows[0].MaxVReplicationLag = 0 - workflowResponse.Workflows[0].MaxVReplicationTransactionLag = 0 - workflowResponse.Workflows[0].MaxVReplicationLag = 0 // also validates that MoveTables Show and Workflow Show return the same output. - require.EqualValues(t, moveTablesResponse.CloneVT(), workflowResponse.CloneVT()) + require.EqualValues(t, moveTablesResponse.CloneVT(), workflowResponse) // Validate that the flags are set correctly in the database. - validateWorkflow1(t, workflowResponse.Workflows) + validateMoveTablesWorkflow(t, workflowResponse.Workflows) // Since we used --no-routing-rules, there should be no routing rules. confirmNoRoutingRules(t) } +func getMoveTablesShowResponse(mt *iMoveTables) *vtctldatapb.GetWorkflowsResponse { + moveTablesOutput := (*mt).GetLastOutput() + var moveTablesResponse vtctldatapb.GetWorkflowsResponse + err := protojson.Unmarshal([]byte(moveTablesOutput), &moveTablesResponse) + require.NoError(vc.t, err) + moveTablesResponse.Workflows[0].MaxVReplicationTransactionLag = 0 + moveTablesResponse.Workflows[0].MaxVReplicationLag = 0 + return moveTablesResponse.CloneVT() +} + // Validates some of the flags created from the previous test. func testMoveTablesFlags2(t *testing.T, mt *iMoveTables, sourceKeyspace, targetKeyspace, workflowName string, targetTabs map[string]*cluster.VttabletProcess) { ksWorkflow := fmt.Sprintf("%s.%s", targetKeyspace, workflowName) @@ -184,6 +193,135 @@ func createMoveTables(t *testing.T, sourceKeyspace, targetKeyspace, workflowName return mt } +// reshard helpers + +func splitShard(t *testing.T, keyspace, workflowName, sourceShards, targetShards string, targetTabs map[string]*cluster.VttabletProcess) { + createFlags := []string{"--auto-start=false", "--defer-secondary-keys=false", "--stop-after-copy", + "--on-ddl", "STOP", "--tablet-types", "primary,rdonly", "--tablet-types-in-preference-order=true", + "--all-cells", "--format=json", + } + rs := newReshard(vc, &reshardWorkflow{ + workflowInfo: &workflowInfo{ + vc: vc, + workflowName: workflowName, + targetKeyspace: keyspace, + }, + sourceShards: sourceShards, + targetShards: targetShards, + createFlags: createFlags, + }, workflowFlavorVtctld) + + ksWorkflow := fmt.Sprintf("%s.%s", keyspace, workflowName) + rs.Create() + validateReshardResponse(rs) + workflowResponse := getWorkflow(keyspace, workflowName) + reshardShowResponse := getReshardShowResponse(&rs) + require.EqualValues(t, reshardShowResponse, workflowResponse) + validateReshardWorkflow(t, workflowResponse.Workflows) + waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", keyspace, workflowName), binlogdatapb.VReplicationWorkflowState_Stopped.String()) + rs.Start() + waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Stopped.String()) + for _, tab := range targetTabs { + alias := fmt.Sprintf("zone1-%d", tab.TabletUID) + query := "update _vt.vreplication set source := replace(source, 'stop_after_copy:true', 'stop_after_copy:false') where db_name = 'vt_customer' and workflow = '" + workflowName + "'" + output, err := vc.VtctlClient.ExecuteCommandWithOutput("ExecuteFetchAsDba", alias, query) + require.NoError(t, err, output) + } + rs.Start() + waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", keyspace, workflowName), binlogdatapb.VReplicationWorkflowState_Running.String()) + rs.Stop() + waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Stopped.String()) + rs.Start() + waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", keyspace, workflowName), binlogdatapb.VReplicationWorkflowState_Running.String()) + for _, targetTab := range targetTabs { + catchup(t, targetTab, workflowName, "Reshard") + } + vdiff(t, keyspace, workflowName, "zone1", false, true, nil) + + rs.SwitchReadsAndWrites() + waitForLowLag(t, keyspace, workflowName+"_reverse") + vdiff(t, keyspace, workflowName+"_reverse", "zone1", true, false, nil) + + rs.ReverseReadsAndWrites() + waitForLowLag(t, keyspace, workflowName) + vdiff(t, keyspace, workflowName, "zone1", false, true, nil) + rs.SwitchReadsAndWrites() + rs.Complete() +} + +func getReshardShowResponse(rs *iReshard) *vtctldatapb.GetWorkflowsResponse { + (*rs).Show() + reshardOutput := (*rs).GetLastOutput() + var reshardResponse vtctldatapb.GetWorkflowsResponse + err := protojson.Unmarshal([]byte(reshardOutput), &reshardResponse) + require.NoError(vc.t, err) + reshardResponse.Workflows[0].MaxVReplicationTransactionLag = 0 + reshardResponse.Workflows[0].MaxVReplicationLag = 0 + return reshardResponse.CloneVT() +} + +func validateReshardResponse(rs iReshard) { + resp := getReshardResponse(rs) + require.NotNil(vc.t, resp) + require.NotNil(vc.t, resp.ShardStreams) + require.Equal(vc.t, len(resp.ShardStreams), 2) + keyspace := "customer" + for _, shard := range []string{"-40", "40-80"} { + streams := resp.ShardStreams[fmt.Sprintf("%s/%s", keyspace, shard)] + require.Equal(vc.t, 1, len(streams.Streams)) + require.Equal(vc.t, binlogdatapb.VReplicationWorkflowState_Stopped.String(), streams.Streams[0].Status) + } +} + +func validateReshardWorkflow(t *testing.T, workflows []*vtctldatapb.Workflow) { + require.Equal(t, 1, len(workflows)) + wf := workflows[0] + require.Equal(t, "reshard", wf.Name) + require.Equal(t, binlogdatapb.VReplicationWorkflowType_Reshard.String(), wf.WorkflowType) + require.Equal(t, "None", wf.WorkflowSubType) + require.Equal(t, "customer", wf.Target.Keyspace) + require.Equal(t, 2, len(wf.Target.Shards)) + require.Equal(t, "customer", wf.Source.Keyspace) + require.Equal(t, 1, len(wf.Source.Shards)) + require.False(t, wf.DeferSecondaryKeys) + + require.GreaterOrEqual(t, len(wf.ShardStreams), int(1)) + oneStream := maps.Values(wf.ShardStreams)[0] + require.NotNil(t, oneStream) + + stream := oneStream.Streams[0] + require.Equal(t, binlogdatapb.VReplicationWorkflowState_Stopped.String(), stream.State) + require.Equal(t, stream.TabletSelectionPreference, tabletmanagerdatapb.TabletSelectionPreference_INORDER) + require.True(t, slices.Equal([]topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_RDONLY}, stream.TabletTypes)) + require.True(t, slices.Equal([]string{"zone1", "zone2"}, stream.Cells)) + + bls := stream.BinlogSource + require.Equal(t, binlogdatapb.OnDDLAction_STOP, bls.OnDdl) + require.True(t, bls.StopAfterCopy) + +} + +func getReshardResponse(rs iReshard) *vtctldatapb.WorkflowStatusResponse { + reshardOutput := rs.GetLastOutput() + var reshardResponse vtctldatapb.WorkflowStatusResponse + err := protojson.Unmarshal([]byte(reshardOutput), &reshardResponse) + require.NoError(vc.t, err) + return reshardResponse.CloneVT() +} + +// helper functions + +func getWorkflow(targetKeyspace, workflow string) *vtctldatapb.GetWorkflowsResponse { + workflowOutput, err := vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", targetKeyspace, "show", "--workflow", workflow) + require.NoError(vc.t, err) + var workflowResponse vtctldatapb.GetWorkflowsResponse + err = protojson.Unmarshal([]byte(workflowOutput), &workflowResponse) + require.NoError(vc.t, err) + workflowResponse.Workflows[0].MaxVReplicationTransactionLag = 0 + workflowResponse.Workflows[0].MaxVReplicationLag = 0 + return workflowResponse.CloneVT() +} + func checkTablesExist(t *testing.T, tabletAlias string, tables []string) bool { tablesResponse, err := vc.VtctldClient.ExecuteCommandWithOutput("GetSchema", tabletAlias, "--tables", strings.Join(tables, ","), "--table-names-only") require.NoError(t, err) @@ -211,6 +349,7 @@ func getRoutingRules(t *testing.T) *vschemapb.RoutingRules { require.NoError(t, err) return &routingRulesResponse } + func confirmNoRoutingRules(t *testing.T) { routingRulesResponse := getRoutingRules(t) require.Zero(t, len(routingRulesResponse.Rules)) @@ -223,7 +362,7 @@ func confirmRoutingRulesExist(t *testing.T) { // We only want to validate non-standard attributes that are set by the CLI. The other end-to-end tests validate the rest. // We also check some of the standard attributes to make sure they are set correctly. -func validateWorkflow1(t *testing.T, workflows []*vtctldatapb.Workflow) { +func validateMoveTablesWorkflow(t *testing.T, workflows []*vtctldatapb.Workflow) { require.Equal(t, 1, len(workflows)) wf := workflows[0] require.Equal(t, "wf1", wf.Name) diff --git a/go/test/endtoend/vreplication/wrappers_test.go b/go/test/endtoend/vreplication/wrappers_test.go index 5470aeb2bd5..2d4949b60dc 100644 --- a/go/test/endtoend/vreplication/wrappers_test.go +++ b/go/test/endtoend/vreplication/wrappers_test.go @@ -74,9 +74,10 @@ type moveTablesWorkflow struct { tables string atomicCopy bool sourceShards string - createFlags []string // currently only used by vtctld + // currently only used by vtctld lastOutput string + createFlags []string completeFlags []string switchFlags []string } @@ -270,7 +271,12 @@ type reshardWorkflow struct { targetShards string skipSchemaCopy bool - lastOutput string + // currently only used by vtctld + lastOutput string + createFlags []string + completeFlags []string + cancelFlags []string + switchFlags []string } type iReshard interface { @@ -379,8 +385,9 @@ func (v VtctldReshard) Flavor() string { func (v VtctldReshard) exec(args ...string) { args2 := []string{"Reshard", "--workflow=" + v.workflowName, "--target-keyspace=" + v.targetKeyspace} args2 = append(args2, args...) - if err := vc.VtctldClient.ExecuteCommand(args2...); err != nil { - v.vc.t.Fatalf("failed to create Reshard workflow: %v", err) + var err error + if v.lastOutput, err = vc.VtctldClient.ExecuteCommandWithOutput(args2...); err != nil { + v.vc.t.Fatalf("failed to create Reshard workflow: %v: %s", err, v.lastOutput) } } @@ -395,11 +402,14 @@ func (v VtctldReshard) Create() { if v.skipSchemaCopy { args = append(args, "--skip-schema-copy="+strconv.FormatBool(v.skipSchemaCopy)) } + args = append(args, v.createFlags...) v.exec(args...) } func (v VtctldReshard) SwitchReadsAndWrites() { - v.exec("SwitchTraffic") + args := []string{"SwitchTraffic"} + args = append(args, v.switchFlags...) + v.exec(args...) } func (v VtctldReshard) ReverseReadsAndWrites() { @@ -407,8 +417,7 @@ func (v VtctldReshard) ReverseReadsAndWrites() { } func (v VtctldReshard) Show() { - //TODO implement me - panic("implement me") + v.exec("Show") } func (v VtctldReshard) SwitchReads() { @@ -422,11 +431,15 @@ func (v VtctldReshard) SwitchWrites() { } func (v VtctldReshard) Cancel() { - v.exec("Cancel") + args := []string{"Cancel"} + args = append(args, v.cancelFlags...) + v.exec(args...) } func (v VtctldReshard) Complete() { - v.exec("Complete") + args := []string{"Complete"} + args = append(args, v.completeFlags...) + v.exec(args...) } func (v VtctldReshard) GetLastOutput() string { diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 5e9f3fc9300..fc215b84c22 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -1672,7 +1672,11 @@ func (s *Server) ReshardCreate(ctx context.Context, req *vtctldatapb.ReshardCrea log.Errorf("%w", err2) return nil, err } - rs, err := s.buildResharder(ctx, keyspace, req.Workflow, req.SourceShards, req.TargetShards, strings.Join(cells, ","), "") + tabletTypesStr := topoproto.MakeStringTypeCSV(req.TabletTypes) + if req.TabletSelectionPreference == tabletmanagerdatapb.TabletSelectionPreference_INORDER { + tabletTypesStr = discovery.InOrderHint + tabletTypesStr + } + rs, err := s.buildResharder(ctx, keyspace, req.Workflow, req.SourceShards, req.TargetShards, strings.Join(cells, ","), tabletTypesStr) if err != nil { return nil, vterrors.Wrap(err, "buildResharder") } @@ -1695,7 +1699,10 @@ func (s *Server) ReshardCreate(ctx context.Context, req *vtctldatapb.ReshardCrea } else { log.Warningf("Streams will not be started since --auto-start is set to false") } - return nil, nil + return s.WorkflowStatus(ctx, &vtctldatapb.WorkflowStatusRequest{ + Keyspace: keyspace, + Workflow: req.Workflow, + }) } // VDiffCreate is part of the vtctlservicepb.VtctldServer interface.