Skip to content

Commit

Permalink
Improve target_shards handling
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Aug 27, 2024
1 parent 747d454 commit b358fda
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 10 deletions.
4 changes: 2 additions & 2 deletions go/cmd/vtctldclient/command/vreplication/vdiff/vdiff.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ vtctldclient --server localhost:15999 vdiff --workflow commerce2customer --targe

for _, shard := range resumeOptions.TargetShards {
if !key.IsValidKeyRange(shard) {
return fmt.Errorf("invalid target shard provided: %s", shard)
return fmt.Errorf("invalid target shard provided: %q", shard)
}
}

Expand Down Expand Up @@ -250,7 +250,7 @@ vtctldclient --server localhost:15999 vdiff --workflow commerce2customer --targe

for _, shard := range stopOptions.TargetShards {
if !key.IsValidKeyRange(shard) {
return fmt.Errorf("invalid target shard provided: %s", shard)
return fmt.Errorf("invalid target shard provided: %q", shard)
}
}

Expand Down
12 changes: 4 additions & 8 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1927,10 +1927,8 @@ func (s *Server) VDiffResume(ctx context.Context, req *vtctldatapb.VDiffResumeRe
}

if len(targetShards) > 0 {
for key, target := range ts.targets {
if !slices.Contains(targetShards, target.GetShard().ShardName()) {
delete(ts.targets, key)
}
if err := applyTargetShards(ts, targetShards); err != nil {
return nil, err
}
}

Expand Down Expand Up @@ -2012,10 +2010,8 @@ func (s *Server) VDiffStop(ctx context.Context, req *vtctldatapb.VDiffStopReques
}

if len(targetShards) > 0 {
for key, target := range ts.targets {
if !slices.Contains(targetShards, target.GetShard().ShardName()) {
delete(ts.targets, key)
}
if err := applyTargetShards(ts, targetShards); err != nil {
return nil, err
}
}

Expand Down
26 changes: 26 additions & 0 deletions go/vt/vtctl/workflow/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"hash/fnv"
"math"
"slices"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -967,3 +968,28 @@ func defaultErrorHandler(logger logutil.Logger, message string, err error) (*[]s
logger.Error(werr)
return nil, werr
}

// applyTargetShards applies the targetShards, coming from a command, to the trafficSwitcher.
// It will return an error if the targetShards list contains a shard that does not exist in
// the target keyspace.
// It will remove any target shards in the trafficSwitcher that are not in the targetShards list.
func applyTargetShards(ts *trafficSwitcher, targetShards []string) error {
if ts == nil {
return nil
}
if ts.targets == nil {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "no targets found for workflow %s", ts.workflow)
}
for _, targetShard := range targetShards {
if _, ok := ts.targets[targetShard]; !ok {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "specified target shard %s not a valid target for workflow %s",
targetShard, ts.workflow)
}
}
for key, target := range ts.targets {
if !slices.Contains(targetShards, target.GetShard().ShardName()) {
delete(ts.targets, key)
}
}
return nil
}

0 comments on commit b358fda

Please sign in to comment.