Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Force flag for traffic switching #16709

Merged
merged 21 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go/cmd/vtctldclient/command/vreplication/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ var SwitchTrafficOptions = struct {
Direction workflow.TrafficSwitchDirection
InitializeTargetSequences bool
Shards []string
Force bool
}{}

func AddCommonSwitchTrafficFlags(cmd *cobra.Command, initializeTargetSequences bool) {
Expand All @@ -259,6 +260,7 @@ func AddCommonSwitchTrafficFlags(cmd *cobra.Command, initializeTargetSequences b
cmd.Flags().DurationVar(&SwitchTrafficOptions.MaxReplicationLagAllowed, "max-replication-lag-allowed", MaxReplicationLagDefault, "Allow traffic to be switched only if VReplication lag is below this.")
cmd.Flags().BoolVar(&SwitchTrafficOptions.EnableReverseReplication, "enable-reverse-replication", true, "Setup replication going back to the original source keyspace to support rolling back the traffic cutover.")
cmd.Flags().BoolVar(&SwitchTrafficOptions.DryRun, "dry-run", false, "Print the actions that would be taken and report any known errors that would have occurred.")
cmd.Flags().BoolVar(&SwitchTrafficOptions.Force, "force", false, "Force the traffic switch even if some potentially non-critical actions cannot be performed; for example the tablet refresh fails on some tablets in the keyspace. WARNING: this should be used with extreme caution and only in emergency situations!")
if initializeTargetSequences {
cmd.Flags().BoolVar(&SwitchTrafficOptions.InitializeTargetSequences, "initialize-target-sequences", false, "When moving tables from an unsharded keyspace to a sharded keyspace, initialize any sequences that are being used on the target when switching writes.")
}
Expand Down
151 changes: 80 additions & 71 deletions go/vt/proto/vtctldata/vtctldata.pb.go

Large diffs are not rendered by default.

34 changes: 34 additions & 0 deletions go/vt/proto/vtctldata/vtctldata_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions go/vt/vtctl/grpcvtctldserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5158,6 +5158,9 @@ func (s *VtctldServer) WorkflowDelete(ctx context.Context, req *vtctldatapb.Work

span.Annotate("keyspace", req.Keyspace)
span.Annotate("workflow", req.Workflow)
span.Annotate("keep_data", req.KeepData)
span.Annotate("keep_routing_rules", req.KeepRoutingRules)
span.Annotate("shards", req.Shards)

resp, err = s.ws.WorkflowDelete(ctx, req)
return resp, err
Expand Down Expand Up @@ -5189,6 +5192,7 @@ func (s *VtctldServer) WorkflowSwitchTraffic(ctx context.Context, req *vtctldata
span.Annotate("tablet-types", req.TabletTypes)
span.Annotate("direction", req.Direction)
span.Annotate("enable-reverse-replication", req.EnableReverseReplication)
span.Annotate("force", req.Force)

resp, err = s.ws.WorkflowSwitchTraffic(ctx, req)
return resp, err
Expand Down
26 changes: 25 additions & 1 deletion go/vt/vtctl/workflow/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func initSrvKeyspace(t *testing.T, topo *topo.Server, keyspace string, sources,
for _, shard := range shards {
keyRange, err := key.ParseShardingSpec(shard)
require.NoError(t, err)
require.Equal(t, 1, len(keyRange))
require.Len(t, keyRange, 1)
partition.ShardReferences = append(partition.ShardReferences, &topodatapb.ShardReference{
Name: shard,
KeyRange: keyRange[0],
Expand Down Expand Up @@ -177,6 +177,7 @@ func (env *testEnv) addTablet(t *testing.T, ctx context.Context, id int, keyspac
Shard: shard,
KeyRange: &topodatapb.KeyRange{},
Type: tabletType,
Hostname: "localhost", // Without a hostname the RefreshState call is skipped
PortMap: map[string]int32{
"test": int32(id),
},
Expand Down Expand Up @@ -256,6 +257,7 @@ type testTMClient struct {
readVReplicationWorkflowRequests map[uint32]*tabletmanagerdatapb.ReadVReplicationWorkflowRequest
primaryPositions map[uint32]string
vdiffRequests map[uint32]*vdiffRequestResponse
refreshStateErrors map[uint32]error

// Stack of ReadVReplicationWorkflowsResponse to return, in order, for each shard
readVReplicationWorkflowsResponses map[string][]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse
Expand All @@ -277,6 +279,8 @@ func newTestTMClient(env *testEnv) *testTMClient {
readVReplicationWorkflowRequests: make(map[uint32]*tabletmanagerdatapb.ReadVReplicationWorkflowRequest),
readVReplicationWorkflowsResponses: make(map[string][]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse),
primaryPositions: make(map[uint32]string),
vdiffRequests: make(map[uint32]*vdiffRequestResponse),
refreshStateErrors: make(map[uint32]error),
env: env,
}
}
Expand Down Expand Up @@ -604,6 +608,26 @@ func (tmc *testTMClient) VReplicationWaitForPos(ctx context.Context, tablet *top
return nil
}

func (tmc *testTMClient) SetRefreshStateError(tablet *topodatapb.Tablet, err error) {
tmc.mu.Lock()
defer tmc.mu.Unlock()

if tmc.refreshStateErrors == nil {
tmc.refreshStateErrors = make(map[uint32]error)
}
tmc.refreshStateErrors[tablet.Alias.Uid] = err
}

func (tmc *testTMClient) RefreshState(ctx context.Context, tablet *topodatapb.Tablet) error {
tmc.mu.Lock()
defer tmc.mu.Unlock()

if tmc.refreshStateErrors == nil {
tmc.refreshStateErrors = make(map[uint32]error)
}
return tmc.refreshStateErrors[tablet.Alias.Uid]
}

func (tmc *testTMClient) AddVReplicationWorkflowsResponse(key string, resp *tabletmanagerdatapb.ReadVReplicationWorkflowsResponse) {
tmc.mu.Lock()
defer tmc.mu.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/workflow/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func (mz *materializer) deploySchema() error {
removeAutoInc := false
if mz.workflowType == binlogdatapb.VReplicationWorkflowType_MoveTables &&
(mz.targetVSchema != nil && mz.targetVSchema.Keyspace != nil && mz.targetVSchema.Keyspace.Sharded) &&
(mz.ms != nil && mz.ms.GetWorkflowOptions().GetStripShardedAutoIncrement()) {
(mz.ms.GetWorkflowOptions() != nil && mz.ms.GetWorkflowOptions().StripShardedAutoIncrement) {
removeAutoInc = true
}

Expand Down
69 changes: 50 additions & 19 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/mysqlctl/tmutils"
"vitess.io/vitess/go/vt/schema"
Expand Down Expand Up @@ -3009,7 +3010,7 @@ func (s *Server) updateShardRecords(ctx context.Context, keyspace string, shards
}

// refreshPrimaryTablets will just RPC-ping all the primary tablets with RefreshState
func (s *Server) refreshPrimaryTablets(ctx context.Context, shards []*topo.ShardInfo) error {
func (s *Server) refreshPrimaryTablets(ctx context.Context, shards []*topo.ShardInfo, force bool) error {
wg := sync.WaitGroup{}
rec := concurrency.AllErrorRecorder{}
for _, si := range shards {
Expand All @@ -3023,9 +3024,11 @@ func (s *Server) refreshPrimaryTablets(ctx context.Context, shards []*topo.Shard
}

if err := s.tmc.RefreshState(ctx, ti.Tablet); err != nil {
rec.RecordError(err)
} else {
s.Logger().Infof("%v responded", topoproto.TabletAliasString(si.PrimaryAlias))
if !force {
rec.RecordError(err)
return
}
s.Logger().Warningf("%v encountered error on tablet refresh: %v", topoproto.TabletAliasString(si.PrimaryAlias), err)
}
}(si)
}
Expand Down Expand Up @@ -3081,6 +3084,16 @@ func (s *Server) finalizeMigrateWorkflow(ctx context.Context, ts *trafficSwitche

// WorkflowSwitchTraffic switches traffic in the direction passed for specified tablet types.
func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.WorkflowSwitchTrafficRequest) (*vtctldatapb.WorkflowSwitchTrafficResponse, error) {
span, ctx := trace.NewSpan(ctx, "workflow.Server.WorkflowSwitchTraffic")
defer span.Finish()

span.Annotate("keyspace", req.Keyspace)
span.Annotate("workflow", req.Workflow)
span.Annotate("tablet-types", req.TabletTypes)
span.Annotate("direction", req.Direction)
span.Annotate("enable-reverse-replication", req.EnableReverseReplication)
span.Annotate("force", req.Force)

var (
dryRunResults []string
rdDryRunResults, wrDryRunResults *[]string
Expand Down Expand Up @@ -3130,12 +3143,16 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "cannot reverse traffic for multi-tenant migrations")
}
}
reason, err := s.canSwitch(ctx, ts, startState, direction, int64(maxReplicationLagAllowed.Seconds()), req.Shards)

ts.force = req.GetForce()

reason, err := s.canSwitch(ctx, ts, startState, direction, int64(maxReplicationLagAllowed.Seconds()), req.GetShards(), req.GetForce())
mattlord marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
if reason != "" {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cannot switch traffic for workflow %s at this time: %s", startState.Workflow, reason)
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cannot switch traffic for workflow %s at this time: %s",
startState.Workflow, reason)
}
hasReplica, hasRdonly, hasPrimary, err = parseTabletTypes(req.TabletTypes)
if err != nil {
Expand Down Expand Up @@ -3172,7 +3189,8 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor
s.Logger().Infof("%s done for workflow %s.%s", cmd, req.Keyspace, req.Workflow)
resp := &vtctldatapb.WorkflowSwitchTrafficResponse{}
if req.DryRun {
resp.Summary = fmt.Sprintf("%s dry run results for workflow %s.%s at %v", cmd, req.Keyspace, req.Workflow, time.Now().UTC().Format(time.RFC822))
resp.Summary = fmt.Sprintf("%s dry run results for workflow %s.%s at %v",
cmd, req.Keyspace, req.Workflow, time.Now().UTC().Format(time.RFC822))
resp.DryRunResults = dryRunResults
} else {
s.Logger().Infof("%s done for workflow %s.%s", cmd, req.Keyspace, req.Workflow)
Expand Down Expand Up @@ -3222,10 +3240,12 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc

cellsStr := strings.Join(req.Cells, ",")

s.Logger().Infof("Switching reads: %s.%s tablet types: %s, cells: %s, workflow state: %s", ts.targetKeyspace, ts.workflow, roTypesToSwitchStr, cellsStr, state.String())
s.Logger().Infof("Switching reads: %s.%s tablet types: %s, cells: %s, workflow state: %s",
ts.targetKeyspace, ts.workflow, roTypesToSwitchStr, cellsStr, state.String())
if !switchReplica && !switchRdonly {
return defaultErrorHandler(ts.Logger(), "invalid tablet types",
vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "tablet types must be REPLICA or RDONLY: %s", roTypesToSwitchStr))
vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "tablet types must be REPLICA or RDONLY: %s",
roTypesToSwitchStr))
}
// For partial (shard-by-shard migrations) or multi-tenant migrations, traffic for all tablet types
// is expected to be switched at once. For other MoveTables migrations where we use table routing rules
Expand All @@ -3245,9 +3265,15 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc
if err != nil {
return nil, err
}
if len(req.GetCells()) != 0 && len(req.GetCells()) != len(allCells) {
return defaultErrorHandler(ts.Logger(), "invalid request", vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT,
"requesting read traffic for multi-tenant migrations must include all cells"))

if len(req.GetCells()) > 0 {
slices.Sort(req.GetCells())
slices.Sort(allCells)
if !slices.Equal(req.GetCells(), allCells) {
return defaultErrorHandler(ts.Logger(), "invalid request", vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT,
"requesting switch of read traffic for multi-tenant migrations must include all cells; all cells: %v, requested cells: %v",
strings.Join(allCells, ","), strings.Join(req.GetCells(), ",")))
}
}
}

Expand Down Expand Up @@ -3277,14 +3303,14 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc
}
}

// If journals exist notify user and fail.
journalsExist, _, err := ts.checkJournals(ctx)
if err != nil {
if err != nil && !req.GetForce() {
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to read journal in the %s keyspace", ts.SourceKeyspaceName()), err)
}
if journalsExist {
s.Logger().Infof("Found a previous journal entry for %d", ts.id)
}

var sw iswitcher
if req.DryRun {
sw = &switcherDryRun{ts: ts, drLog: NewLogRecorder()}
Expand Down Expand Up @@ -3625,7 +3651,7 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
}

func (s *Server) canSwitch(ctx context.Context, ts *trafficSwitcher, state *State, direction TrafficSwitchDirection,
maxAllowedReplLagSecs int64, shards []string) (reason string, err error) {
maxAllowedReplLagSecs int64, shards []string, force bool) (reason string, err error) {
if direction == DirectionForward && state.WritesSwitched ||
direction == DirectionBackward && !state.WritesSwitched {
s.Logger().Infof("writes already switched no need to check lag")
Expand Down Expand Up @@ -3664,10 +3690,15 @@ func (s *Server) canSwitch(ctx context.Context, ts *trafficSwitcher, state *Stat
defer wg.Done()
for _, si := range shards {
if partial, partialDetails, err := topotools.RefreshTabletsByShard(rtbsCtx, s.ts, s.tmc, si, nil, ts.Logger()); err != nil || partial {
m.Lock()
refreshErrors.WriteString(fmt.Sprintf("failed to successfully refresh all tablets in the %s/%s %s shard (%v):\n %v\n",
si.Keyspace(), si.ShardName(), stype, err, partialDetails))
m.Unlock()
msg := fmt.Sprintf("failed to successfully refresh all tablets in the %s/%s %s shard (%v):\n %v\n",
si.Keyspace(), si.ShardName(), stype, err, partialDetails)
if partial && force {
log.Warning(msg)
} else {
m.Lock()
refreshErrors.WriteString(msg)
m.Unlock()
}
}
}
}
Expand Down
Loading
Loading