Skip to content

Commit

Permalink
Add handlings to trafficSwitcher
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Sep 6, 2024
1 parent f12504e commit 675a2d8
Show file tree
Hide file tree
Showing 11 changed files with 2,244 additions and 2,116 deletions.
4,221 changes: 2,118 additions & 2,103 deletions go/vt/proto/vtctldata/vtctldata.pb.go

Large diffs are not rendered by default.

34 changes: 34 additions & 0 deletions go/vt/proto/vtctldata/vtctldata_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go/vt/vtctl/workflow/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func initSrvKeyspace(t *testing.T, topo *topo.Server, keyspace string, sources,
for _, shard := range shards {
keyRange, err := key.ParseShardingSpec(shard)
require.NoError(t, err)
require.Equal(t, 1, len(keyRange))
require.Len(t, keyRange, 1)
partition.ShardReferences = append(partition.ShardReferences, &topodatapb.ShardReference{
Name: shard,
KeyRange: keyRange[0],
Expand Down
23 changes: 18 additions & 5 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/mysqlctl/tmutils"
"vitess.io/vitess/go/vt/schema"
Expand Down Expand Up @@ -3078,6 +3079,13 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor
return nil, err
}

if req.GetForce() {
if ts.options == nil {
ts.options = &vtctldatapb.WorkflowOptions{}
}
ts.options.WarnOnPartialTabletRefresh = true
}

if startState.WorkflowType == TypeMigrate {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid action for Migrate workflow: SwitchTraffic")
}
Expand Down Expand Up @@ -3644,10 +3652,15 @@ func (s *Server) canSwitch(ctx context.Context, ts *trafficSwitcher, state *Stat
defer wg.Done()
for _, si := range shards {
if partial, partialDetails, err := topotools.RefreshTabletsByShard(rtbsCtx, s.ts, s.tmc, si, nil, ts.Logger()); err != nil || partial {
m.Lock()
refreshErrors.WriteString(fmt.Sprintf("failed to successfully refresh all tablets in the %s/%s %s shard (%v):\n %v\n",
si.Keyspace(), si.ShardName(), stype, err, partialDetails))
m.Unlock()
msg := fmt.Sprintf("failed to successfully refresh all tablets in the %s/%s %s shard (%v):\n %v\n",
si.Keyspace(), si.ShardName(), stype, err, partialDetails)
if partial && force {
log.Warning(msg)
} else {
m.Lock()
refreshErrors.WriteString(msg)
m.Unlock()
}
}
}
}
Expand All @@ -3656,7 +3669,7 @@ func (s *Server) canSwitch(ctx context.Context, ts *trafficSwitcher, state *Stat
wg.Add(1)
go refreshTablets(ts.TargetShards(), "target")
wg.Wait()
if refreshErrors.Len() > 0 && !force {
if refreshErrors.Len() > 0 {
return fmt.Sprintf(cannotSwitchFailedTabletRefresh, refreshErrors.String()), nil
}
return "", nil
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/workflow/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -942,7 +942,7 @@ func TestMoveTablesTrafficSwitching(t *testing.T) {
require.Equal(t, to, tt)
}
}
// Confirm that we have the expected denied tables entires.
// Confirm that we have the expected denied tables entries.
for _, keyspace := range []*testKeyspace{tc.sourceKeyspace, tc.targetKeyspace} {
for _, shardName := range keyspace.ShardNames {
si, err := env.ts.GetShard(ctx, keyspace.KeyspaceName, shardName)
Expand Down
41 changes: 37 additions & 4 deletions go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
Expand Down Expand Up @@ -472,7 +473,17 @@ func (ts *trafficSwitcher) dropSourceDeniedTables(ctx context.Context) error {
}
rtbsCtx, cancel := context.WithTimeout(ctx, shardTabletRefreshTimeout)
defer cancel()
_, _, err := topotools.RefreshTabletsByShard(rtbsCtx, ts.TopoServer(), ts.TabletManagerClient(), source.GetShard(), nil, ts.Logger())
isPartial, partialDetails, err := topotools.RefreshTabletsByShard(rtbsCtx, ts.TopoServer(), ts.TabletManagerClient(), source.GetShard(), nil, ts.Logger())
if isPartial {
msg := fmt.Sprintf("failed to successfully refresh all tablets in the %s/%s source shard (%v):\n %v",
source.GetShard().Keyspace(), source.GetShard().ShardName(), err, partialDetails)
if ts.options != nil && ts.options.WarnOnPartialTabletRefresh {
log.Warning(msg)
return nil
} else {
return errors.New(msg)
}
}
return err
})
}
Expand All @@ -486,7 +497,17 @@ func (ts *trafficSwitcher) dropTargetDeniedTables(ctx context.Context) error {
}
rtbsCtx, cancel := context.WithTimeout(ctx, shardTabletRefreshTimeout)
defer cancel()
_, _, err := topotools.RefreshTabletsByShard(rtbsCtx, ts.TopoServer(), ts.TabletManagerClient(), target.GetShard(), nil, ts.Logger())
isPartial, partialDetails, err := topotools.RefreshTabletsByShard(rtbsCtx, ts.TopoServer(), ts.TabletManagerClient(), target.GetShard(), nil, ts.Logger())
if isPartial {
msg := fmt.Sprintf("failed to successfully refresh all tablets in the %s/%s target shard (%v):\n %v",
target.GetShard().Keyspace(), target.GetShard().ShardName(), err, partialDetails)
if ts.options != nil && ts.options.WarnOnPartialTabletRefresh {
log.Warning(msg)
return nil
} else {
return errors.New(msg)
}
}
return err
})
}
Expand Down Expand Up @@ -1040,8 +1061,14 @@ func (ts *trafficSwitcher) switchDeniedTables(ctx context.Context) error {
defer cancel()
isPartial, partialDetails, err := topotools.RefreshTabletsByShard(rtbsCtx, ts.TopoServer(), ts.TabletManagerClient(), source.GetShard(), nil, ts.Logger())
if isPartial {
err = fmt.Errorf("failed to successfully refresh all tablets in the %s/%s source shard (%v):\n %v",
msg := fmt.Sprintf("failed to successfully refresh all tablets in the %s/%s source shard (%v):\n %v",
source.GetShard().Keyspace(), source.GetShard().ShardName(), err, partialDetails)
if ts.options != nil && ts.options.WarnOnPartialTabletRefresh {
log.Warning(msg)
return nil
} else {
return errors.New(msg)
}
}
return err
})
Expand All @@ -1057,8 +1084,14 @@ func (ts *trafficSwitcher) switchDeniedTables(ctx context.Context) error {
defer cancel()
isPartial, partialDetails, err := topotools.RefreshTabletsByShard(rtbsCtx, ts.TopoServer(), ts.TabletManagerClient(), target.GetShard(), nil, ts.Logger())
if isPartial {
err = fmt.Errorf("failed to successfully refresh all tablets in the %s/%s target shard (%v):\n %v",
msg := fmt.Sprintf("failed to successfully refresh all tablets in the %s/%s target shard (%v):\n %v",
target.GetShard().Keyspace(), target.GetShard().ShardName(), err, partialDetails)
if ts.options != nil && ts.options.WarnOnPartialTabletRefresh {
log.Warning(msg)
return nil
} else {
return errors.New(msg)
}
}
return err
})
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vtctl/workflow/traffic_switcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
"vitess.io/vitess/go/vt/proto/vschema"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vtgate/vindexes"

tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
)

type testTrafficSwitcher struct {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ type streamerPlan struct {
// filter: the list of filtering rules. If a rule has a select expression for its filter,
//
// the select list can only reference direct columns. No other expressions are allowed.
// The select expression is allowed to contain the special 'keyspace_id()' function which
// The select expression is allowed to contain the special 'in_keyrange()' function which
// will return the keyspace id of the row. Examples:
// "select * from t", same as an empty Filter,
// "select * from t where in_keyrange('-80')", same as "-80",
Expand Down
3 changes: 3 additions & 0 deletions proto/vtctldata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ message WorkflowOptions {
// Shards on which vreplication streams in the target keyspace are created for this workflow and to which the data
// from the source will be vreplicated.
repeated string shards = 3;
// Should we log a warning on partial tablet refreshes rather than produce
// an error?
bool warn_on_partial_tablet_refresh = 4;
}

// TODO: comment the hell out of this.
Expand Down
6 changes: 6 additions & 0 deletions web/vtadmin/src/proto/vtadmin.d.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions web/vtadmin/src/proto/vtadmin.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 675a2d8

Please sign in to comment.