Skip to content

Commit

Permalink
Add Traffic Switching DryRun unit test
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Jun 7, 2024
1 parent 1955b9a commit 392c8f6
Showing 1 changed file with 200 additions and 46 deletions.
246 changes: 200 additions & 46 deletions go/vt/vtctl/workflow/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,13 +479,9 @@ func TestMoveTablesTrafficSwitching(t *testing.T) {
testcases := []struct {
name string
sourceKeyspace, targetKeyspace *testKeyspace
preFunc func(t *testing.T, env *testEnv)
req *vtctldatapb.WorkflowSwitchTrafficRequest
expectedSourceQueries []*queryResult
expectedTargetQueries []*queryResult
want *vtctldatapb.WorkflowSwitchTrafficResponse
wantErr bool
postFunc func(t *testing.T, env *testEnv)
}{
{
name: "basic forward",
Expand Down Expand Up @@ -540,15 +536,6 @@ func TestMoveTablesTrafficSwitching(t *testing.T) {
env := newTestEnv(t, ctx, defaultCellName, tc.sourceKeyspace, tc.targetKeyspace)
defer env.close()
env.tmc.schema = schema
if tc.preFunc != nil {
tc.preFunc(t, env)
}
if tc.expectedSourceQueries != nil {
require.NotNil(t, env.tablets[tc.sourceKeyspace.KeyspaceName])
for _, eq := range tc.expectedSourceQueries {
env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, eq)
}
}
if tc.req.Direction == int32(DirectionForward) {
env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, copyTableQR)
env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, cutoverQR)
Expand Down Expand Up @@ -601,51 +588,218 @@ func TestMoveTablesTrafficSwitching(t *testing.T) {
env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, createJournalQR)
env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, freezeReverseWFQR)
}
if tc.expectedTargetQueries != nil {
require.NotNil(t, env.tablets[tc.targetKeyspace.KeyspaceName])
for _, eq := range tc.expectedTargetQueries {
env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, eq)
}
}
got, err := env.ws.WorkflowSwitchTraffic(ctx, tc.req)
if (err != nil) != tc.wantErr {
require.Fail(t, "unexpected error value", "Server.WorkflowSwitchTraffic() error = %v, wantErr %v", err, tc.wantErr)
return
}
require.Equal(t, tc.want.String(), got.String(), "Server.WorkflowSwitchTraffic() = %v, want %v", got, tc.want)
if tc.postFunc != nil {
tc.postFunc(t, env)
} else { // Default post checks
// Confirm that we have the expected routing rules.
rr, err := env.ts.GetRoutingRules(ctx)
require.NoError(t, err)
to := fmt.Sprintf("%s.%s", tc.targetKeyspace.KeyspaceName, tableName)
if tc.req.Direction == int32(DirectionBackward) {
to = fmt.Sprintf("%s.%s", tc.sourceKeyspace.KeyspaceName, tableName)

// Confirm that we have the expected routing rules.
rr, err := env.ts.GetRoutingRules(ctx)
require.NoError(t, err)
to := fmt.Sprintf("%s.%s", tc.targetKeyspace.KeyspaceName, tableName)
if tc.req.Direction == int32(DirectionBackward) {
to = fmt.Sprintf("%s.%s", tc.sourceKeyspace.KeyspaceName, tableName)
}
for _, rr := range rr.Rules {
for _, tt := range rr.ToTables {
require.Equal(t, to, tt)
}
for _, rr := range rr.Rules {
for _, tt := range rr.ToTables {
require.Equal(t, to, tt)
}
// Confirm that we have the expected denied tables entires.
for _, keyspace := range []*testKeyspace{tc.sourceKeyspace, tc.targetKeyspace} {
for _, shardName := range keyspace.ShardNames {
si, err := env.ts.GetShard(ctx, keyspace.KeyspaceName, shardName)
require.NoError(t, err)
switch {
case keyspace == tc.sourceKeyspace && tc.req.Direction == int32(DirectionForward):
require.True(t, hasDeniedTableEntry(si))
case keyspace == tc.sourceKeyspace && tc.req.Direction == int32(DirectionBackward):
require.False(t, hasDeniedTableEntry(si))
case keyspace == tc.targetKeyspace && tc.req.Direction == int32(DirectionForward):
require.False(t, hasDeniedTableEntry(si))
case keyspace == tc.targetKeyspace && tc.req.Direction == int32(DirectionBackward):
require.True(t, hasDeniedTableEntry(si))
}
}
// Confirm that we have the expected denied tables entires.
for _, keyspace := range []*testKeyspace{tc.sourceKeyspace, tc.targetKeyspace} {
for _, shardName := range keyspace.ShardNames {
si, err := env.ts.GetShard(ctx, keyspace.KeyspaceName, shardName)
require.NoError(t, err)
switch {
case keyspace == tc.sourceKeyspace && tc.req.Direction == int32(DirectionForward):
require.True(t, hasDeniedTableEntry(si))
case keyspace == tc.sourceKeyspace && tc.req.Direction == int32(DirectionBackward):
require.False(t, hasDeniedTableEntry(si))
case keyspace == tc.targetKeyspace && tc.req.Direction == int32(DirectionForward):
require.False(t, hasDeniedTableEntry(si))
case keyspace == tc.targetKeyspace && tc.req.Direction == int32(DirectionBackward):
require.True(t, hasDeniedTableEntry(si))
}
}
})
}
}

func TestMoveTablesTrafficSwitchingDryRun(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

workflowName := "wf1"
tableName := "t1"
sourceKeyspaceName := "sourceks"
targetKeyspaceName := "targetks"
vrID := 1
tabletTypes := []topodatapb.TabletType{
topodatapb.TabletType_PRIMARY,
topodatapb.TabletType_REPLICA,
topodatapb.TabletType_RDONLY,
}
schema := map[string]*tabletmanagerdatapb.SchemaDefinition{
"t1": {
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{
{
Name: tableName,
Schema: fmt.Sprintf("CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))", tableName),
},
},
},
}
copyTableQR := &queryResult{
query: fmt.Sprintf("select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in (%d) and id in (select max(id) from _vt.copy_state where vrepl_id in (%d) group by vrepl_id, table_name)",
vrID, vrID),
result: &querypb.QueryResult{},
}
journalQR := &queryResult{
query: "/select val from _vt.resharding_journal.*",
result: &querypb.QueryResult{},
}
lockTableQR := &queryResult{
query: fmt.Sprintf("LOCK TABLES `%s` READ", tableName),
result: &querypb.QueryResult{},
}

testcases := []struct {
name string
sourceKeyspace, targetKeyspace *testKeyspace
req *vtctldatapb.WorkflowSwitchTrafficRequest
want []string
}{
{
name: "basic forward",
sourceKeyspace: &testKeyspace{
KeyspaceName: sourceKeyspaceName,
ShardNames: []string{"0"},
},
targetKeyspace: &testKeyspace{
KeyspaceName: targetKeyspaceName,
ShardNames: []string{"-80", "80-"},
},
req: &vtctldatapb.WorkflowSwitchTrafficRequest{
Keyspace: targetKeyspaceName,
Workflow: workflowName,
Direction: int32(DirectionForward),
TabletTypes: tabletTypes,
DryRun: true,
},
want: []string{
fmt.Sprintf("Lock keyspace %s", sourceKeyspaceName),
fmt.Sprintf("Switch reads for tables [%s] to keyspace %s for tablet types [REPLICA,RDONLY]", tableName, targetKeyspaceName),
fmt.Sprintf("Routing rules for tables [%s] will be updated", tableName),
fmt.Sprintf("Unlock keyspace %s", sourceKeyspaceName),
fmt.Sprintf("Lock keyspace %s", sourceKeyspaceName),
fmt.Sprintf("Lock keyspace %s", targetKeyspaceName),
fmt.Sprintf("Stop writes on keyspace %s for tables [%s]: [keyspace:%s;shard:0;position:%s]", sourceKeyspaceName, tableName, sourceKeyspaceName, position),
"Wait for vreplication on stopped streams to catchup for up to 30s",
fmt.Sprintf("Create reverse vreplication workflow %s", ReverseWorkflowName(workflowName)),
"Create journal entries on source databases",
fmt.Sprintf("Enable writes on keyspace %s for tables [%s]", targetKeyspaceName, tableName),
fmt.Sprintf("Switch routing from keyspace %s to keyspace %s", sourceKeyspaceName, targetKeyspaceName),
fmt.Sprintf("Routing rules for tables [%s] will be updated", tableName),
fmt.Sprintf("Switch writes completed, freeze and delete vreplication streams on: [tablet:%d,tablet:%d]", startingTargetTabletUID, startingTargetTabletUID+tabletUIDStep),
fmt.Sprintf("Mark vreplication streams frozen on: [keyspace:%s;shard:-80;tablet:%d;workflow:%s;dbname:vt_%s,keyspace:%s;shard:80-;tablet:%d;workflow:%s;dbname:vt_%s]",
targetKeyspaceName, startingTargetTabletUID, workflowName, targetKeyspaceName, targetKeyspaceName, startingTargetTabletUID+tabletUIDStep, workflowName, targetKeyspaceName),
fmt.Sprintf("Unlock keyspace %s", targetKeyspaceName),
fmt.Sprintf("Unlock keyspace %s", sourceKeyspaceName),
},
},
{
name: "basic backward",
sourceKeyspace: &testKeyspace{
KeyspaceName: sourceKeyspaceName,
ShardNames: []string{"0"},
},
targetKeyspace: &testKeyspace{
KeyspaceName: targetKeyspaceName,
ShardNames: []string{"-80", "80-"},
},
req: &vtctldatapb.WorkflowSwitchTrafficRequest{
Keyspace: targetKeyspaceName,
Workflow: workflowName,
Direction: int32(DirectionBackward),
TabletTypes: tabletTypes,
DryRun: true,
},
want: []string{
fmt.Sprintf("Lock keyspace %s", targetKeyspaceName),
fmt.Sprintf("Switch reads for tables [%s] to keyspace %s for tablet types [REPLICA,RDONLY]", tableName, targetKeyspaceName),
fmt.Sprintf("Routing rules for tables [%s] will be updated", tableName),
fmt.Sprintf("Unlock keyspace %s", targetKeyspaceName),
fmt.Sprintf("Lock keyspace %s", targetKeyspaceName),
fmt.Sprintf("Lock keyspace %s", sourceKeyspaceName),
fmt.Sprintf("Stop writes on keyspace %s for tables [%s]: [keyspace:%s;shard:-80;position:%s,keyspace:%s;shard:80-;position:%s]",
targetKeyspaceName, tableName, targetKeyspaceName, position, targetKeyspaceName, position),
"Wait for vreplication on stopped streams to catchup for up to 30s",
fmt.Sprintf("Create reverse vreplication workflow %s", workflowName),
"Create journal entries on source databases",
fmt.Sprintf("Enable writes on keyspace %s for tables [%s]", sourceKeyspaceName, tableName),
fmt.Sprintf("Switch routing from keyspace %s to keyspace %s", targetKeyspaceName, sourceKeyspaceName),
fmt.Sprintf("Routing rules for tables [%s] will be updated", tableName),
fmt.Sprintf("Switch writes completed, freeze and delete vreplication streams on: [tablet:%d]", startingSourceTabletUID),
fmt.Sprintf("Mark vreplication streams frozen on: [keyspace:%s;shard:0;tablet:%d;workflow:%s;dbname:vt_%s]",
sourceKeyspaceName, startingSourceTabletUID, ReverseWorkflowName(workflowName), sourceKeyspaceName),
fmt.Sprintf("Unlock keyspace %s", sourceKeyspaceName),
fmt.Sprintf("Unlock keyspace %s", targetKeyspaceName),
},
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
require.NotNil(t, tc.sourceKeyspace)
require.NotNil(t, tc.targetKeyspace)
require.NotNil(t, tc.req)
env := newTestEnv(t, ctx, defaultCellName, tc.sourceKeyspace, tc.targetKeyspace)
defer env.close()
env.tmc.schema = schema
if tc.req.Direction == int32(DirectionForward) {
env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, copyTableQR)
for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream
env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, journalQR)
}
for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream
env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, lockTableQR)
}
} else {
env.tmc.reverse.Store(true)
// Setup the routing rules as they would be after having previously done SwitchTraffic.
ks := env.targetKeyspace.KeyspaceName
toTarget := []string{ks + "." + tableName}
rules := make(map[string][]string)
for _, tabletType := range tabletTypes {
tt := strings.ToLower(tabletType.String())
if tabletType == topodatapb.TabletType_PRIMARY {
rules[tableName] = toTarget
rules[ks+"."+tableName] = toTarget
rules[env.sourceKeyspace.KeyspaceName+"."+tableName] = toTarget
} else {
rules[tableName+"@"+tt] = toTarget
rules[ks+"."+tableName+"@"+tt] = toTarget
rules[env.sourceKeyspace.KeyspaceName+"."+tableName+"@"+tt] = toTarget
}
}
err := topotools.SaveRoutingRules(ctx, env.ts, rules)
require.NoError(t, err)
err = env.ts.RebuildSrvVSchema(ctx, nil)
require.NoError(t, err)
env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, copyTableQR)
for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream
env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, journalQR)
}
for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream
env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, lockTableQR)
}
}
got, err := env.ws.WorkflowSwitchTraffic(ctx, tc.req)
require.NoError(t, err)

require.EqualValues(t, tc.want, got.DryRunResults, "Server.WorkflowSwitchTraffic(DryRun:true) = %v, want %v", got.DryRunResults, tc.want)
})
}
}

0 comments on commit 392c8f6

Please sign in to comment.