Skip to content

Commit

Permalink
Sort all the things in dry run switcher
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Jun 7, 2024
1 parent 0c8a9ab commit 1955b9a
Showing 1 changed file with 108 additions and 19 deletions.
127 changes: 108 additions & 19 deletions go/vt/vtctl/workflow/switcher_dry_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func (dr *switcherDryRun) switchShardReads(ctx context.Context, cells []string,
for _, target := range dr.ts.Targets() {
targetShards = append(targetShards, target.GetShard().ShardName())
}
// Sort the slices for deterministic output.
sort.Strings(sourceShards)
sort.Strings(targetShards)
if direction == DirectionForward {
Expand All @@ -103,6 +104,7 @@ func (dr *switcherDryRun) switchTableReads(ctx context.Context, cells []string,
for _, servedType := range servedTypes {
tabletTypes = append(tabletTypes, servedType.String())
}
sort.Strings(dr.ts.Tables()) // For deterministic output
tables := strings.Join(dr.ts.Tables(), ",")
dr.drLog.Logf("Switch reads for tables [%s] to keyspace %s for tablet types [%s]", tables, ks, strings.Join(tabletTypes, ","))
dr.drLog.Logf("Routing rules for tables [%s] will be updated", tables)
Expand All @@ -114,13 +116,15 @@ func (dr *switcherDryRun) switchTableReads(ctx context.Context, cells []string,

func (dr *switcherDryRun) createJournals(ctx context.Context, sourceWorkflows []string) error {
dr.drLog.Log("Create journal entries on source databases")
sort.Strings(sourceWorkflows) // For deterministic output
if len(sourceWorkflows) > 0 {
dr.drLog.Logf("Source workflows found: [%s]", strings.Join(sourceWorkflows, ","))
}
return nil
}

func (dr *switcherDryRun) allowTargetWrites(ctx context.Context) error {
sort.Strings(dr.ts.Tables()) // For deterministic output
dr.drLog.Logf("Enable writes on keyspace %s for tables [%s]", dr.ts.TargetKeyspaceName(), strings.Join(dr.ts.Tables(), ","))
return nil
}
Expand All @@ -129,6 +133,7 @@ func (dr *switcherDryRun) changeRouting(ctx context.Context) error {
dr.drLog.Logf("Switch routing from keyspace %s to keyspace %s", dr.ts.SourceKeyspaceName(), dr.ts.TargetKeyspaceName())
var deleteLogs, addLogs []string
if dr.ts.MigrationType() == binlogdatapb.MigrationType_TABLES {
sort.Strings(dr.ts.Tables()) // For deterministic output
tables := strings.Join(dr.ts.Tables(), ",")
dr.drLog.Logf("Routing rules for tables [%s] will be updated", tables)
return nil
Expand Down Expand Up @@ -174,7 +179,12 @@ func (dr *switcherDryRun) streamMigraterfinalize(ctx context.Context, ts *traffi

func (dr *switcherDryRun) startReverseVReplication(ctx context.Context) error {
logs := make([]string, 0)
for _, t := range dr.ts.Sources() {
sources := maps.Values(dr.ts.Sources())
// Sort the slice for deterministic output.
sort.Slice(sources, func(i, j int) bool {
return sources[i].GetPrimary().Alias.Uid < sources[j].GetPrimary().Alias.Uid
})
for _, t := range sources {
logs = append(logs, fmt.Sprintf("tablet:%d", t.GetPrimary().Alias.Uid))
}
dr.drLog.Logf("Start reverse vreplication streams on: [%s]", strings.Join(logs, ","))
Expand All @@ -195,17 +205,33 @@ func (dr *switcherDryRun) migrateStreams(ctx context.Context, sm *StreamMigrator
logs := make([]string, 0)

dr.drLog.Logf("Migrate streams to %s:", dr.ts.TargetKeyspaceName())
for key, streams := range sm.Streams() {
for _, stream := range streams {
logs = append(logs, fmt.Sprintf("shard:%s;id:%d;workflow:%s;position:%s;binlogsource:%v", key, stream.ID, stream.Workflow, replication.EncodePosition(stream.Position), stream.BinlogSource))
allStreams := sm.Streams()
// Sort the keys and slices for deterministic output.
shards := maps.Keys(sm.Streams())
sort.Strings(shards)
for _, shard := range shards {
shardStreams := allStreams[shard]
sort.Slice(shardStreams, func(i, j int) bool {
return shardStreams[i].ID < shardStreams[j].ID
})
for _, stream := range shardStreams {
logs = append(logs, fmt.Sprintf("shard:%s;id:%d;workflow:%s;position:%s;binlogsource:%v", shard, stream.ID, stream.Workflow, replication.EncodePosition(stream.Position), stream.BinlogSource))
}
}
if len(logs) > 0 {
dr.drLog.Logf("Migrate source streams: [%s]", strings.Join(logs, ","))
logs = nil
}
for _, target := range dr.ts.Targets() {
// Sort the keys and slices for deterministic output.
targets := maps.Values(dr.ts.Targets())
sort.Slice(targets, func(i, j int) bool {
return targets[i].GetPrimary().Alias.Uid < targets[j].GetPrimary().Alias.Uid
})
for _, target := range targets {
tabletStreams := templates
sort.Slice(tabletStreams, func(i, j int) bool {
return tabletStreams[i].ID < tabletStreams[j].ID
})
for _, vrs := range tabletStreams {
logs = append(logs, fmt.Sprintf("keyspace:%s;shard:%s;tablet:%d;workflow:%s;id:%d,position:%v;binlogsource:%s",
vrs.BinlogSource.Keyspace, vrs.BinlogSource.Shard, target.GetPrimary().Alias.Uid, vrs.Workflow, vrs.ID, replication.EncodePosition(vrs.Position), vrs.BinlogSource))
Expand All @@ -224,10 +250,16 @@ func (dr *switcherDryRun) waitForCatchup(ctx context.Context, filteredReplicatio

func (dr *switcherDryRun) stopSourceWrites(ctx context.Context) error {
logs := make([]string, 0)
for _, source := range dr.ts.Sources() {
sources := maps.Values(dr.ts.Sources())
// Sort the slice for deterministic output.
sort.Slice(sources, func(i, j int) bool {
return sources[i].GetPrimary().Alias.Uid < sources[j].GetPrimary().Alias.Uid
})
for _, source := range sources {
position, _ := dr.ts.TabletManagerClient().PrimaryPosition(ctx, source.GetPrimary().Tablet)
logs = append(logs, fmt.Sprintf("keyspace:%s;shard:%s;position:%s", dr.ts.SourceKeyspaceName(), source.GetShard().ShardName(), position))
}
sort.Strings(dr.ts.Tables()) // For deterministic output
if len(logs) > 0 {
dr.drLog.Logf("Stop writes on keyspace %s for tables [%s]: [%s]", dr.ts.SourceKeyspaceName(),
strings.Join(dr.ts.Tables(), ","), strings.Join(logs, ","))
Expand All @@ -237,8 +269,16 @@ func (dr *switcherDryRun) stopSourceWrites(ctx context.Context) error {

func (dr *switcherDryRun) stopStreams(ctx context.Context, sm *StreamMigrator) ([]string, error) {
logs := make([]string, 0)
for _, streams := range sm.Streams() {
for _, stream := range streams {
allStreams := sm.Streams()
// Sort the keys and slices for deterministic output.
shards := maps.Keys(sm.Streams())
sort.Strings(shards)
for _, shard := range shards {
shardStreams := allStreams[shard]
sort.Slice(shardStreams, func(i, j int) bool {
return shardStreams[i].ID < shardStreams[j].ID
})
for _, stream := range shardStreams {
logs = append(logs, fmt.Sprintf("id:%d;keyspace:%s;shard:%s;rules:%s;position:%v",
stream.ID, stream.BinlogSource.Keyspace, stream.BinlogSource.Shard, stream.BinlogSource.Filter, stream.Position))
}
Expand All @@ -262,7 +302,13 @@ func (dr *switcherDryRun) lockKeyspace(ctx context.Context, keyspace, _ string)

func (dr *switcherDryRun) removeSourceTables(ctx context.Context, removalType TableRemovalType) error {
logs := make([]string, 0)
for _, source := range dr.ts.Sources() {
sort.Strings(dr.ts.Tables()) // For deterministic output
sources := maps.Values(dr.ts.Sources())
// Sort the slice for deterministic output.
sort.Slice(sources, func(i, j int) bool {
return sources[i].GetPrimary().Alias.Uid < sources[j].GetPrimary().Alias.Uid
})
for _, source := range sources {
for _, tableName := range dr.ts.Tables() {
logs = append(logs, fmt.Sprintf("keyspace:%s;shard:%s;dbname:%s;tablet:%d;table:%s",
source.GetPrimary().Keyspace, source.GetPrimary().Shard, source.GetPrimary().DbName(), source.GetPrimary().Alias.Uid, tableName))
Expand All @@ -282,7 +328,12 @@ func (dr *switcherDryRun) removeSourceTables(ctx context.Context, removalType Ta
func (dr *switcherDryRun) dropSourceShards(ctx context.Context) error {
logs := make([]string, 0)
tabletsList := make(map[string][]string)
for _, si := range dr.ts.SourceShards() {
// Sort the slice for deterministic output.
sourceShards := dr.ts.SourceShards()
sort.Slice(sourceShards, func(i, j int) bool {
return sourceShards[i].PrimaryAlias.Uid < sourceShards[j].PrimaryAlias.Uid
})
for _, si := range sourceShards {
tabletAliases, err := dr.ts.TopoServer().FindAllTabletAliasesInShard(ctx, si.Keyspace(), si.ShardName())
if err != nil {
return err
Expand All @@ -291,7 +342,7 @@ func (dr *switcherDryRun) dropSourceShards(ctx context.Context) error {
for _, t := range tabletAliases {
tabletsList[si.ShardName()] = append(tabletsList[si.ShardName()], fmt.Sprintf("%d", t.Uid))
}
sort.Strings(tabletsList[si.ShardName()])
sort.Strings(tabletsList[si.ShardName()]) // For deterministic output
logs = append(logs, fmt.Sprintf("cell:%s;keyspace:%s;shards:[%s]",
si.Shard.PrimaryAlias.Cell, si.Keyspace(), si.ShardName()), strings.Join(tabletsList[si.ShardName()], ","))
}
Expand All @@ -308,7 +359,12 @@ func (dr *switcherDryRun) validateWorkflowHasCompleted(ctx context.Context) erro

func (dr *switcherDryRun) dropTargetVReplicationStreams(ctx context.Context) error {
logs := make([]string, 0)
for _, t := range dr.ts.Targets() {
// Sort the keys and slices for deterministic output.
targets := maps.Values(dr.ts.Targets())
sort.Slice(targets, func(i, j int) bool {
return targets[i].GetPrimary().Alias.Uid < targets[j].GetPrimary().Alias.Uid
})
for _, t := range targets {
logs = append(logs, fmt.Sprintf("keyspace:%s;shard:%s;workflow:%s;dbname:%s;tablet:%d",
t.GetShard().Keyspace(), t.GetShard().ShardName(), dr.ts.WorkflowName(), t.GetPrimary().DbName(), t.GetPrimary().Alias.Uid))
}
Expand All @@ -318,7 +374,12 @@ func (dr *switcherDryRun) dropTargetVReplicationStreams(ctx context.Context) err

func (dr *switcherDryRun) dropSourceReverseVReplicationStreams(ctx context.Context) error {
logs := make([]string, 0)
for _, t := range dr.ts.Sources() {
sources := maps.Values(dr.ts.Sources())
// Sort the slice for deterministic output.
sort.Slice(sources, func(i, j int) bool {
return sources[i].GetPrimary().Alias.Uid < sources[j].GetPrimary().Alias.Uid
})
for _, t := range sources {
logs = append(logs, fmt.Sprintf("keyspace:%s;shard:%s;workflow:%s;dbname:%s;tablet:%d",
t.GetShard().Keyspace(), t.GetShard().ShardName(), ReverseWorkflowName(dr.ts.WorkflowName()), t.GetPrimary().DbName(), t.GetPrimary().Alias.Uid))
}
Expand All @@ -328,7 +389,12 @@ func (dr *switcherDryRun) dropSourceReverseVReplicationStreams(ctx context.Conte

func (dr *switcherDryRun) freezeTargetVReplication(ctx context.Context) error {
logs := make([]string, 0)
for _, target := range dr.ts.Targets() {
// Sort the keys and slices for deterministic output.
targets := maps.Values(dr.ts.Targets())
sort.Slice(targets, func(i, j int) bool {
return targets[i].GetPrimary().Alias.Uid < targets[j].GetPrimary().Alias.Uid
})
for _, target := range targets {
logs = append(logs, fmt.Sprintf("keyspace:%s;shard:%s;tablet:%d;workflow:%s;dbname:%s",
target.GetPrimary().Keyspace, target.GetPrimary().Shard, target.GetPrimary().Alias.Uid, dr.ts.WorkflowName(), target.GetPrimary().DbName()))
}
Expand All @@ -340,7 +406,12 @@ func (dr *switcherDryRun) freezeTargetVReplication(ctx context.Context) error {

func (dr *switcherDryRun) dropSourceDeniedTables(ctx context.Context) error {
logs := make([]string, 0)
for _, si := range dr.ts.SourceShards() {
// Sort the slice for deterministic output.
sourceShards := dr.ts.SourceShards()
sort.Slice(sourceShards, func(i, j int) bool {
return sourceShards[i].PrimaryAlias.Uid < sourceShards[j].PrimaryAlias.Uid
})
for _, si := range sourceShards {
logs = append(logs, fmt.Sprintf("keyspace:%s;shard:%s;tablet:%d", si.Keyspace(), si.ShardName(), si.PrimaryAlias.Uid))
}
if len(logs) > 0 {
Expand All @@ -351,7 +422,12 @@ func (dr *switcherDryRun) dropSourceDeniedTables(ctx context.Context) error {

func (dr *switcherDryRun) dropTargetDeniedTables(ctx context.Context) error {
logs := make([]string, 0)
for _, si := range dr.ts.TargetShards() {
// Sort the slice for deterministic output.
targetShards := dr.ts.TargetShards()
sort.Slice(targetShards, func(i, j int) bool {
return targetShards[i].PrimaryAlias.Uid < targetShards[j].PrimaryAlias.Uid
})
for _, si := range targetShards {
logs = append(logs, fmt.Sprintf("keyspace:%s;shard:%s;tablet:%d", si.Keyspace(), si.ShardName(), si.PrimaryAlias.Uid))
}
if len(logs) > 0 {
Expand All @@ -366,7 +442,13 @@ func (dr *switcherDryRun) logs() *[]string {

func (dr *switcherDryRun) removeTargetTables(ctx context.Context) error {
logs := make([]string, 0)
for _, target := range dr.ts.Targets() {
sort.Strings(dr.ts.Tables()) // For deterministic output
// Sort the keys and slices for deterministic output.
targets := maps.Values(dr.ts.Targets())
sort.Slice(targets, func(i, j int) bool {
return targets[i].GetPrimary().Alias.Uid < targets[j].GetPrimary().Alias.Uid
})
for _, target := range targets {
for _, tableName := range dr.ts.Tables() {
logs = append(logs, fmt.Sprintf("keyspace:%s;shard:%s;dbname:%s;tablet:%d;table:%s",
target.GetPrimary().Keyspace, target.GetPrimary().Shard, target.GetPrimary().DbName(), target.GetPrimary().Alias.Uid, tableName))
Expand All @@ -382,7 +464,13 @@ func (dr *switcherDryRun) removeTargetTables(ctx context.Context) error {
func (dr *switcherDryRun) dropTargetShards(ctx context.Context) error {
logs := make([]string, 0)
tabletsList := make(map[string][]string)
for _, si := range dr.ts.TargetShards() {
sort.Strings(dr.ts.Tables()) // For deterministic output
// Sort the slice for deterministic output.
targetShards := dr.ts.TargetShards()
sort.Slice(targetShards, func(i, j int) bool {
return targetShards[i].PrimaryAlias.Uid < targetShards[j].PrimaryAlias.Uid
})
for _, si := range targetShards {
tabletAliases, err := dr.ts.TopoServer().FindAllTabletAliasesInShard(ctx, si.Keyspace(), si.ShardName())
if err != nil {
return err
Expand All @@ -391,7 +479,7 @@ func (dr *switcherDryRun) dropTargetShards(ctx context.Context) error {
for _, t := range tabletAliases {
tabletsList[si.ShardName()] = append(tabletsList[si.ShardName()], fmt.Sprintf("%d", t.Uid))
}
sort.Strings(tabletsList[si.ShardName()])
sort.Strings(tabletsList[si.ShardName()]) // For deterministic output
logs = append(logs, fmt.Sprintf("cell:%s;keyspace:%s;shards:[%s]",
si.Shard.PrimaryAlias.Cell, si.Keyspace(), si.ShardName()), strings.Join(tabletsList[si.ShardName()], ","))
}
Expand All @@ -416,6 +504,7 @@ func (dr *switcherDryRun) resetSequences(ctx context.Context) error {
}

func (dr *switcherDryRun) initializeTargetSequences(ctx context.Context, sequencesByBackingTable map[string]*sequenceMetadata) error {
// Sort keys for deterministic output.
sortedBackingTableNames := maps.Keys(sequencesByBackingTable)
slices.Sort(sortedBackingTableNames)
dr.drLog.Log(fmt.Sprintf("The following sequence backing tables used by tables being moved will be initialized: %s",
Expand Down

0 comments on commit 1955b9a

Please sign in to comment.