From 740ef56b7d98776e3339ae9d3abcaf96e24813cf Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 6 Sep 2024 22:28:58 -0400 Subject: [PATCH] Improve force handling Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/server.go | 23 ++++++++++++----------- go/vt/vtctl/workflow/traffic_switcher.go | 2 +- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 18fa0feb388..bbf71944cc0 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -2983,7 +2983,7 @@ func (s *Server) updateShardRecords(ctx context.Context, keyspace string, shards } // refreshPrimaryTablets will just RPC-ping all the primary tablets with RefreshState -func (s *Server) refreshPrimaryTablets(ctx context.Context, shards []*topo.ShardInfo) error { +func (s *Server) refreshPrimaryTablets(ctx context.Context, shards []*topo.ShardInfo, force bool) error { wg := sync.WaitGroup{} rec := concurrency.AllErrorRecorder{} for _, si := range shards { @@ -2996,10 +2996,10 @@ func (s *Server) refreshPrimaryTablets(ctx context.Context, shards []*topo.Shard return } - if err := s.tmc.RefreshState(ctx, ti.Tablet); err != nil { + if err := s.tmc.RefreshState(ctx, ti.Tablet); err != nil && !force { rec.RecordError(err) } else { - s.Logger().Infof("%v responded", topoproto.TabletAliasString(si.PrimaryAlias)) + s.Logger().Infof("%v responded (err: %v)", topoproto.TabletAliasString(si.PrimaryAlias), err) } }(si) } @@ -3089,13 +3089,6 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor return nil, err } - if req.GetForce() { - if ts.options == nil { - ts.options = &vtctldatapb.WorkflowOptions{} - } - ts.options.WarnOnPartialTabletRefresh = true - } - if startState.WorkflowType == TypeMigrate { return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid action for Migrate workflow: SwitchTraffic") } @@ -3121,6 +3114,14 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "cannot reverse traffic for multi-tenant migrations") } } + + if req.GetForce() { + if ts.options == nil { + ts.options = &vtctldatapb.WorkflowOptions{} + } + ts.options.WarnOnPartialTabletRefresh = true + } + reason, err := s.canSwitch(ctx, ts, startState, direction, int64(maxReplicationLagAllowed.Seconds()), req.GetShards(), req.GetForce()) if err != nil { return nil, err @@ -3275,7 +3276,6 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc } } - // If journals exist notify user and fail. journalsExist, _, err := ts.checkJournals(ctx) if err != nil && !req.GetForce() { return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to read journal in the %s keyspace", ts.SourceKeyspaceName()), err) @@ -3283,6 +3283,7 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc if journalsExist { s.Logger().Infof("Found a previous journal entry for %d", ts.id) } + var sw iswitcher if req.DryRun { sw = &switcherDryRun{ts: ts, drLog: NewLogRecorder()} diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index abe24d6c3d2..a87bd011a6d 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -727,7 +727,7 @@ func (ts *trafficSwitcher) changeShardsAccess(ctx context.Context, keyspace stri if err := ts.TopoServer().UpdateDisableQueryService(ctx, keyspace, shards, topodatapb.TabletType_PRIMARY, nil, access == disallowWrites /* disable */); err != nil { return err } - return ts.ws.refreshPrimaryTablets(ctx, shards) + return ts.ws.refreshPrimaryTablets(ctx, shards, ts.options != nil && ts.options.WarnOnPartialTabletRefresh) } func (ts *trafficSwitcher) allowTargetWrites(ctx context.Context) error {