From 073ad7cdc7e707d0429fb8b1a80214bf8f0f8998 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Wed, 28 Aug 2024 21:53:53 +0530 Subject: [PATCH] feat: run move tables in stress test Signed-off-by: Manan Gupta --- .../transaction/twopc/stress/fuzzer_test.go | 2 +- .../transaction/twopc/stress/main_test.go | 28 +++++++++----- .../transaction/twopc/stress/stress_test.go | 37 ++++++++++++++++++- 3 files changed, 55 insertions(+), 12 deletions(-) diff --git a/go/test/endtoend/transaction/twopc/stress/fuzzer_test.go b/go/test/endtoend/transaction/twopc/stress/fuzzer_test.go index 932fcae1217..3ba29e50713 100644 --- a/go/test/endtoend/transaction/twopc/stress/fuzzer_test.go +++ b/go/test/endtoend/transaction/twopc/stress/fuzzer_test.go @@ -443,7 +443,7 @@ func onlineDDLFuzzer(t *testing.T) { return } fmt.Println("Running online DDL with uuid: ", output) - WaitForMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + waitForMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) } func mysqlRestarts(t *testing.T) { diff --git a/go/test/endtoend/transaction/twopc/stress/main_test.go b/go/test/endtoend/transaction/twopc/stress/main_test.go index ef8e454be15..05525171d2d 100644 --- a/go/test/endtoend/transaction/twopc/stress/main_test.go +++ b/go/test/endtoend/transaction/twopc/stress/main_test.go @@ -32,13 +32,13 @@ import ( ) var ( - clusterInstance *cluster.LocalProcessCluster - vtParams mysql.ConnParams - vtgateGrpcAddress string - keyspaceName = "ks" - cell = "zone1" - hostname = "localhost" - sidecarDBName = "vt_ks" + clusterInstance *cluster.LocalProcessCluster + vtParams mysql.ConnParams + vtgateGrpcAddress string + keyspaceName = "ks" + unshardedKeyspaceName = "uks" + cell = "zone1" + hostname = "localhost" //go:embed schema.sql SchemaSQL string @@ -79,18 +79,28 @@ func TestMain(m *testing.M) { Name: keyspaceName, SchemaSQL: SchemaSQL, VSchema: VSchema, - SidecarDBName: sidecarDBName, DurabilityPolicy: "semi_sync", } if err := clusterInstance.StartKeyspace(*keyspace, []string{"-40", "40-80", "80-"}, 2, false); err != nil { return 1 } + // Start an unsharded keyspace + unshardedKeyspace := &cluster.Keyspace{ + Name: unshardedKeyspaceName, + SchemaSQL: "", + VSchema: "{}", + DurabilityPolicy: "semi_sync", + } + if err := clusterInstance.StartUnshardedKeyspace(*unshardedKeyspace, 2, false); err != nil { + return 1 + } + // Start Vtgate if err := clusterInstance.StartVtgate(); err != nil { return 1 } - vtParams = clusterInstance.GetVTParams(keyspaceName) + vtParams = clusterInstance.GetVTParams("") vtgateGrpcAddress = fmt.Sprintf("%s:%d", clusterInstance.Hostname, clusterInstance.VtgateGrpcPort) return m.Run() diff --git a/go/test/endtoend/transaction/twopc/stress/stress_test.go b/go/test/endtoend/transaction/twopc/stress/stress_test.go index 4dae0156b9d..40d78e7694d 100644 --- a/go/test/endtoend/transaction/twopc/stress/stress_test.go +++ b/go/test/endtoend/transaction/twopc/stress/stress_test.go @@ -38,6 +38,7 @@ import ( "vitess.io/vitess/go/test/endtoend/onlineddl" twopcutil "vitess.io/vitess/go/test/endtoend/transaction/twopc/utils" "vitess.io/vitess/go/test/endtoend/utils" + "vitess.io/vitess/go/test/endtoend/vreplication" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/schema" ) @@ -76,6 +77,11 @@ func TestDisruptions(t *testing.T) { commitDelayTime: "20", disruption: onlineDDL, }, + { + disruptionName: "MoveTables", + commitDelayTime: "20", + disruption: moveTables, + }, { disruptionName: "EmergencyReparentShard", commitDelayTime: "5", @@ -239,6 +245,33 @@ func mysqlRestartShard3(t *testing.T) error { return syscallutil.Kill(pid, syscall.SIGKILL) } +// moveTables runs a move tables command. +func moveTables(t *testing.T) error { + workflow := "TestDisruptions" + mvt := vreplication.NewExternalMoveTables(t, clusterInstance, workflow, unshardedKeyspaceName, keyspaceName, "twopc_t1") + // Initiate MoveTables for twopc_t1. + mvt.Create() + // Wait for vreplication to catchup. Should be very fast since we don't have a lot of rows. + for _, ks := range clusterInstance.Keyspaces { + if ks.Name != unshardedKeyspaceName { + continue + } + for _, shard := range ks.Shards { + vttablet := shard.PrimaryTablet().VttabletProcess + vttablet.WaitForVReplicationToCatchup(t, workflow, fmt.Sprintf("vt_%s", vttablet.Keyspace), "", 10*time.Second) + } + } + // SwitchTraffic + mvt.SwitchReadsAndWrites() + // Wait for a couple of seconds and then switch the traffic back + time.Sleep(2 * time.Second) + mvt.ReverseReadsAndWrites() + // Wait another couple of seconds and then cancel the workflow + time.Sleep(2 * time.Second) + mvt.Cancel() + return nil +} + var orderedDDL = []string{ "alter table twopc_t1 add column extra_col1 varchar(20)", "alter table twopc_t1 add column extra_col2 varchar(20)", @@ -256,13 +289,13 @@ func onlineDDL(t *testing.T) error { require.NoError(t, err) count++ fmt.Println("uuid: ", output) - status := WaitForMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + status := waitForMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) onlineddl.CheckMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), status) require.Equal(t, schema.OnlineDDLStatusComplete, status) return nil } -func WaitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, timeout time.Duration, expectStatuses ...schema.OnlineDDLStatus) schema.OnlineDDLStatus { +func waitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, timeout time.Duration, expectStatuses ...schema.OnlineDDLStatus) schema.OnlineDDLStatus { shardNames := map[string]bool{} for _, shard := range shards { shardNames[shard.Name] = true