Skip to content

Commit

Permalink
VReplication: Force flag for traffic switching (#16709)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Sep 17, 2024
1 parent 8816a2d commit 069651a
Show file tree
Hide file tree
Showing 14 changed files with 330 additions and 103 deletions.
2 changes: 2 additions & 0 deletions go/cmd/vtctldclient/command/vreplication/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ var SwitchTrafficOptions = struct {
Direction workflow.TrafficSwitchDirection
InitializeTargetSequences bool
Shards []string
Force bool
}{}

func AddCommonSwitchTrafficFlags(cmd *cobra.Command, initializeTargetSequences bool) {
Expand All @@ -259,6 +260,7 @@ func AddCommonSwitchTrafficFlags(cmd *cobra.Command, initializeTargetSequences b
cmd.Flags().DurationVar(&SwitchTrafficOptions.MaxReplicationLagAllowed, "max-replication-lag-allowed", MaxReplicationLagDefault, "Allow traffic to be switched only if VReplication lag is below this.")
cmd.Flags().BoolVar(&SwitchTrafficOptions.EnableReverseReplication, "enable-reverse-replication", true, "Setup replication going back to the original source keyspace to support rolling back the traffic cutover.")
cmd.Flags().BoolVar(&SwitchTrafficOptions.DryRun, "dry-run", false, "Print the actions that would be taken and report any known errors that would have occurred.")
cmd.Flags().BoolVar(&SwitchTrafficOptions.Force, "force", false, "Force the traffic switch even if some potentially non-critical actions cannot be performed; for example the tablet refresh fails on some tablets in the keyspace. WARNING: this should be used with extreme caution and only in emergency situations!")
if initializeTargetSequences {
cmd.Flags().BoolVar(&SwitchTrafficOptions.InitializeTargetSequences, "initialize-target-sequences", false, "When moving tables from an unsharded keyspace to a sharded keyspace, initialize any sequences that are being used on the target when switching writes.")
}
Expand Down
152 changes: 81 additions & 71 deletions go/vt/proto/vtctldata/vtctldata.pb.go

Large diffs are not rendered by default.

34 changes: 34 additions & 0 deletions go/vt/proto/vtctldata/vtctldata_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions go/vt/vtctl/grpcvtctldserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5212,6 +5212,9 @@ func (s *VtctldServer) WorkflowDelete(ctx context.Context, req *vtctldatapb.Work

span.Annotate("keyspace", req.Keyspace)
span.Annotate("workflow", req.Workflow)
span.Annotate("keep_data", req.KeepData)
span.Annotate("keep_routing_rules", req.KeepRoutingRules)
span.Annotate("shards", req.Shards)

resp, err = s.ws.WorkflowDelete(ctx, req)
return resp, err
Expand Down Expand Up @@ -5243,6 +5246,7 @@ func (s *VtctldServer) WorkflowSwitchTraffic(ctx context.Context, req *vtctldata
span.Annotate("tablet-types", req.TabletTypes)
span.Annotate("direction", req.Direction)
span.Annotate("enable-reverse-replication", req.EnableReverseReplication)
span.Annotate("force", req.Force)

resp, err = s.ws.WorkflowSwitchTraffic(ctx, req)
return resp, err
Expand Down
26 changes: 25 additions & 1 deletion go/vt/vtctl/workflow/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func initSrvKeyspace(t *testing.T, topo *topo.Server, keyspace string, sources,
for _, shard := range shards {
keyRange, err := key.ParseShardingSpec(shard)
require.NoError(t, err)
require.Equal(t, 1, len(keyRange))
require.Len(t, keyRange, 1)
partition.ShardReferences = append(partition.ShardReferences, &topodatapb.ShardReference{
Name: shard,
KeyRange: keyRange[0],
Expand Down Expand Up @@ -177,6 +177,7 @@ func (env *testEnv) addTablet(t *testing.T, ctx context.Context, id int, keyspac
Shard: shard,
KeyRange: &topodatapb.KeyRange{},
Type: tabletType,
Hostname: "localhost", // Without a hostname the RefreshState call is skipped
PortMap: map[string]int32{
"test": int32(id),
},
Expand Down Expand Up @@ -256,6 +257,7 @@ type testTMClient struct {
readVReplicationWorkflowRequests map[uint32]*tabletmanagerdatapb.ReadVReplicationWorkflowRequest
primaryPositions map[uint32]string
vdiffRequests map[uint32]*vdiffRequestResponse
refreshStateErrors map[uint32]error

// Stack of ReadVReplicationWorkflowsResponse to return, in order, for each shard
readVReplicationWorkflowsResponses map[string][]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse
Expand All @@ -277,6 +279,8 @@ func newTestTMClient(env *testEnv) *testTMClient {
readVReplicationWorkflowRequests: make(map[uint32]*tabletmanagerdatapb.ReadVReplicationWorkflowRequest),
readVReplicationWorkflowsResponses: make(map[string][]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse),
primaryPositions: make(map[uint32]string),
vdiffRequests: make(map[uint32]*vdiffRequestResponse),
refreshStateErrors: make(map[uint32]error),
env: env,
}
}
Expand Down Expand Up @@ -604,6 +608,26 @@ func (tmc *testTMClient) VReplicationWaitForPos(ctx context.Context, tablet *top
return nil
}

func (tmc *testTMClient) SetRefreshStateError(tablet *topodatapb.Tablet, err error) {
tmc.mu.Lock()
defer tmc.mu.Unlock()

if tmc.refreshStateErrors == nil {
tmc.refreshStateErrors = make(map[uint32]error)
}
tmc.refreshStateErrors[tablet.Alias.Uid] = err
}

func (tmc *testTMClient) RefreshState(ctx context.Context, tablet *topodatapb.Tablet) error {
tmc.mu.Lock()
defer tmc.mu.Unlock()

if tmc.refreshStateErrors == nil {
tmc.refreshStateErrors = make(map[uint32]error)
}
return tmc.refreshStateErrors[tablet.Alias.Uid]
}

func (tmc *testTMClient) AddVReplicationWorkflowsResponse(key string, resp *tabletmanagerdatapb.ReadVReplicationWorkflowsResponse) {
tmc.mu.Lock()
defer tmc.mu.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/workflow/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func (mz *materializer) deploySchema() error {
removeAutoInc := false
if mz.workflowType == binlogdatapb.VReplicationWorkflowType_MoveTables &&
(mz.targetVSchema != nil && mz.targetVSchema.Keyspace != nil && mz.targetVSchema.Keyspace.Sharded) &&
(mz.ms != nil && mz.ms.GetWorkflowOptions().GetStripShardedAutoIncrement()) {
(mz.ms.GetWorkflowOptions() != nil && mz.ms.GetWorkflowOptions().StripShardedAutoIncrement) {
removeAutoInc = true
}

Expand Down
67 changes: 49 additions & 18 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/mysqlctl/tmutils"
"vitess.io/vitess/go/vt/schema"
Expand Down Expand Up @@ -3009,7 +3010,7 @@ func (s *Server) updateShardRecords(ctx context.Context, keyspace string, shards
}

// refreshPrimaryTablets will just RPC-ping all the primary tablets with RefreshState
func (s *Server) refreshPrimaryTablets(ctx context.Context, shards []*topo.ShardInfo) error {
func (s *Server) refreshPrimaryTablets(ctx context.Context, shards []*topo.ShardInfo, force bool) error {
wg := sync.WaitGroup{}
rec := concurrency.AllErrorRecorder{}
for _, si := range shards {
Expand All @@ -3023,9 +3024,11 @@ func (s *Server) refreshPrimaryTablets(ctx context.Context, shards []*topo.Shard
}

if err := s.tmc.RefreshState(ctx, ti.Tablet); err != nil {
rec.RecordError(err)
} else {
s.Logger().Infof("%v responded", topoproto.TabletAliasString(si.PrimaryAlias))
if !force {
rec.RecordError(err)
return
}
s.Logger().Warningf("%v encountered error on tablet refresh: %v", topoproto.TabletAliasString(si.PrimaryAlias), err)
}
}(si)
}
Expand Down Expand Up @@ -3081,6 +3084,16 @@ func (s *Server) finalizeMigrateWorkflow(ctx context.Context, ts *trafficSwitche

// WorkflowSwitchTraffic switches traffic in the direction passed for specified tablet types.
func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.WorkflowSwitchTrafficRequest) (*vtctldatapb.WorkflowSwitchTrafficResponse, error) {
span, ctx := trace.NewSpan(ctx, "workflow.Server.WorkflowSwitchTraffic")
defer span.Finish()

span.Annotate("keyspace", req.Keyspace)
span.Annotate("workflow", req.Workflow)
span.Annotate("tablet-types", req.TabletTypes)
span.Annotate("direction", req.Direction)
span.Annotate("enable-reverse-replication", req.EnableReverseReplication)
span.Annotate("force", req.Force)

var (
dryRunResults []string
rdDryRunResults, wrDryRunResults *[]string
Expand Down Expand Up @@ -3130,12 +3143,16 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "cannot reverse traffic for multi-tenant migrations")
}
}
reason, err := s.canSwitch(ctx, ts, startState, direction, int64(maxReplicationLagAllowed.Seconds()), req.Shards)

ts.force = req.GetForce()

reason, err := s.canSwitch(ctx, ts, startState, direction, int64(maxReplicationLagAllowed.Seconds()), req.GetShards())
if err != nil {
return nil, err
}
if reason != "" {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cannot switch traffic for workflow %s at this time: %s", startState.Workflow, reason)
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cannot switch traffic for workflow %s at this time: %s",
startState.Workflow, reason)
}
hasReplica, hasRdonly, hasPrimary, err = parseTabletTypes(req.TabletTypes)
if err != nil {
Expand Down Expand Up @@ -3172,7 +3189,8 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor
s.Logger().Infof("%s done for workflow %s.%s", cmd, req.Keyspace, req.Workflow)
resp := &vtctldatapb.WorkflowSwitchTrafficResponse{}
if req.DryRun {
resp.Summary = fmt.Sprintf("%s dry run results for workflow %s.%s at %v", cmd, req.Keyspace, req.Workflow, time.Now().UTC().Format(time.RFC822))
resp.Summary = fmt.Sprintf("%s dry run results for workflow %s.%s at %v",
cmd, req.Keyspace, req.Workflow, time.Now().UTC().Format(time.RFC822))
resp.DryRunResults = dryRunResults
} else {
s.Logger().Infof("%s done for workflow %s.%s", cmd, req.Keyspace, req.Workflow)
Expand Down Expand Up @@ -3222,10 +3240,12 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc

cellsStr := strings.Join(req.Cells, ",")

s.Logger().Infof("Switching reads: %s.%s tablet types: %s, cells: %s, workflow state: %s", ts.targetKeyspace, ts.workflow, roTypesToSwitchStr, cellsStr, state.String())
s.Logger().Infof("Switching reads: %s.%s tablet types: %s, cells: %s, workflow state: %s",
ts.targetKeyspace, ts.workflow, roTypesToSwitchStr, cellsStr, state.String())
if !switchReplica && !switchRdonly {
return defaultErrorHandler(ts.Logger(), "invalid tablet types",
vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "tablet types must be REPLICA or RDONLY: %s", roTypesToSwitchStr))
vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "tablet types must be REPLICA or RDONLY: %s",
roTypesToSwitchStr))
}
// For partial (shard-by-shard migrations) or multi-tenant migrations, traffic for all tablet types
// is expected to be switched at once. For other MoveTables migrations where we use table routing rules
Expand All @@ -3245,9 +3265,15 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc
if err != nil {
return nil, err
}
if len(req.GetCells()) != 0 && len(req.GetCells()) != len(allCells) {
return defaultErrorHandler(ts.Logger(), "invalid request", vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT,
"requesting read traffic for multi-tenant migrations must include all cells"))

if len(req.GetCells()) > 0 {
slices.Sort(req.GetCells())
slices.Sort(allCells)
if !slices.Equal(req.GetCells(), allCells) {
return defaultErrorHandler(ts.Logger(), "invalid request", vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT,
"requesting switch of read traffic for multi-tenant migrations must include all cells; all cells: %v, requested cells: %v",
strings.Join(allCells, ","), strings.Join(req.GetCells(), ",")))
}
}
}

Expand Down Expand Up @@ -3277,14 +3303,14 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc
}
}

// If journals exist notify user and fail.
journalsExist, _, err := ts.checkJournals(ctx)
if err != nil {
if err != nil && !req.GetForce() {
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to read journal in the %s keyspace", ts.SourceKeyspaceName()), err)
}
if journalsExist {
s.Logger().Infof("Found a previous journal entry for %d", ts.id)
}

var sw iswitcher
if req.DryRun {
sw = &switcherDryRun{ts: ts, drLog: NewLogRecorder()}
Expand Down Expand Up @@ -3664,10 +3690,15 @@ func (s *Server) canSwitch(ctx context.Context, ts *trafficSwitcher, state *Stat
defer wg.Done()
for _, si := range shards {
if partial, partialDetails, err := topotools.RefreshTabletsByShard(rtbsCtx, s.ts, s.tmc, si, nil, ts.Logger()); err != nil || partial {
m.Lock()
refreshErrors.WriteString(fmt.Sprintf("failed to successfully refresh all tablets in the %s/%s %s shard (%v):\n %v\n",
si.Keyspace(), si.ShardName(), stype, err, partialDetails))
m.Unlock()
msg := fmt.Sprintf("failed to successfully refresh all tablets in the %s/%s %s shard (%v):\n %v\n",
si.Keyspace(), si.ShardName(), stype, err, partialDetails)
if partial && ts.force {
log.Warning(msg)
} else {
m.Lock()
refreshErrors.WriteString(msg)
m.Unlock()
}
}
}
}
Expand Down
Loading

0 comments on commit 069651a

Please sign in to comment.