Skip to content

Commit

Permalink
feat: run move tables in stress test
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <manan@planetscale.com>
  • Loading branch information
GuptaManan100 committed Aug 28, 2024
1 parent 844c62f commit 073ad7c
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 12 deletions.
2 changes: 1 addition & 1 deletion go/test/endtoend/transaction/twopc/stress/fuzzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ func onlineDDLFuzzer(t *testing.T) {
return
}
fmt.Println("Running online DDL with uuid: ", output)
WaitForMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
waitForMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
}

func mysqlRestarts(t *testing.T) {
Expand Down
28 changes: 19 additions & 9 deletions go/test/endtoend/transaction/twopc/stress/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ import (
)

var (
clusterInstance *cluster.LocalProcessCluster
vtParams mysql.ConnParams
vtgateGrpcAddress string
keyspaceName = "ks"
cell = "zone1"
hostname = "localhost"
sidecarDBName = "vt_ks"
clusterInstance *cluster.LocalProcessCluster
vtParams mysql.ConnParams
vtgateGrpcAddress string
keyspaceName = "ks"
unshardedKeyspaceName = "uks"
cell = "zone1"
hostname = "localhost"

//go:embed schema.sql
SchemaSQL string
Expand Down Expand Up @@ -79,18 +79,28 @@ func TestMain(m *testing.M) {
Name: keyspaceName,
SchemaSQL: SchemaSQL,
VSchema: VSchema,
SidecarDBName: sidecarDBName,
DurabilityPolicy: "semi_sync",
}
if err := clusterInstance.StartKeyspace(*keyspace, []string{"-40", "40-80", "80-"}, 2, false); err != nil {
return 1
}

// Start an unsharded keyspace
unshardedKeyspace := &cluster.Keyspace{
Name: unshardedKeyspaceName,
SchemaSQL: "",
VSchema: "{}",
DurabilityPolicy: "semi_sync",
}
if err := clusterInstance.StartUnshardedKeyspace(*unshardedKeyspace, 2, false); err != nil {
return 1
}

// Start Vtgate
if err := clusterInstance.StartVtgate(); err != nil {
return 1
}
vtParams = clusterInstance.GetVTParams(keyspaceName)
vtParams = clusterInstance.GetVTParams("")
vtgateGrpcAddress = fmt.Sprintf("%s:%d", clusterInstance.Hostname, clusterInstance.VtgateGrpcPort)

return m.Run()
Expand Down
37 changes: 35 additions & 2 deletions go/test/endtoend/transaction/twopc/stress/stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"vitess.io/vitess/go/test/endtoend/onlineddl"
twopcutil "vitess.io/vitess/go/test/endtoend/transaction/twopc/utils"
"vitess.io/vitess/go/test/endtoend/utils"
"vitess.io/vitess/go/test/endtoend/vreplication"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
)
Expand Down Expand Up @@ -76,6 +77,11 @@ func TestDisruptions(t *testing.T) {
commitDelayTime: "20",
disruption: onlineDDL,
},
{
disruptionName: "MoveTables",
commitDelayTime: "20",
disruption: moveTables,
},
{
disruptionName: "EmergencyReparentShard",
commitDelayTime: "5",
Expand Down Expand Up @@ -239,6 +245,33 @@ func mysqlRestartShard3(t *testing.T) error {
return syscallutil.Kill(pid, syscall.SIGKILL)
}

// moveTables runs a move tables command.
func moveTables(t *testing.T) error {
workflow := "TestDisruptions"
mvt := vreplication.NewExternalMoveTables(t, clusterInstance, workflow, unshardedKeyspaceName, keyspaceName, "twopc_t1")
// Initiate MoveTables for twopc_t1.
mvt.Create()
// Wait for vreplication to catchup. Should be very fast since we don't have a lot of rows.
for _, ks := range clusterInstance.Keyspaces {
if ks.Name != unshardedKeyspaceName {
continue
}
for _, shard := range ks.Shards {
vttablet := shard.PrimaryTablet().VttabletProcess
vttablet.WaitForVReplicationToCatchup(t, workflow, fmt.Sprintf("vt_%s", vttablet.Keyspace), "", 10*time.Second)
}
}
// SwitchTraffic
mvt.SwitchReadsAndWrites()
// Wait for a couple of seconds and then switch the traffic back
time.Sleep(2 * time.Second)
mvt.ReverseReadsAndWrites()
// Wait another couple of seconds and then cancel the workflow
time.Sleep(2 * time.Second)
mvt.Cancel()
return nil
}

var orderedDDL = []string{
"alter table twopc_t1 add column extra_col1 varchar(20)",
"alter table twopc_t1 add column extra_col2 varchar(20)",
Expand All @@ -256,13 +289,13 @@ func onlineDDL(t *testing.T) error {
require.NoError(t, err)
count++
fmt.Println("uuid: ", output)
status := WaitForMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
status := waitForMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
onlineddl.CheckMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), status)
require.Equal(t, schema.OnlineDDLStatusComplete, status)
return nil
}

func WaitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, timeout time.Duration, expectStatuses ...schema.OnlineDDLStatus) schema.OnlineDDLStatus {
func waitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, timeout time.Duration, expectStatuses ...schema.OnlineDDLStatus) schema.OnlineDDLStatus {
shardNames := map[string]bool{}
for _, shard := range shards {
shardNames[shard.Name] = true
Expand Down

0 comments on commit 073ad7c

Please sign in to comment.