diff --git a/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go b/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go index 107050c2708..983739a976d 100644 --- a/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go +++ b/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go @@ -25,7 +25,6 @@ import ( "path" "strings" "sync" - "sync/atomic" "testing" "time" @@ -138,6 +137,7 @@ var ( const ( maxTableRows = 4096 + workloadDuration = 5 * time.Second maxConcurrency = 20 singleConnectionSleepInterval = 2 * time.Millisecond countIterations = 5 @@ -227,6 +227,8 @@ func TestMain(m *testing.M) { func TestSchemaChange(t *testing.T) { defer cluster.PanicHandler(t) + ctx := context.Background() + shards = clusterInstance.Keyspaces[0].Shards require.Equal(t, 1, len(shards)) @@ -251,16 +253,17 @@ func TestSchemaChange(t *testing.T) { // that our testing/metrics logic is sound in the first place. testName := fmt.Sprintf("workload without ALTER TABLE %d/%d", (i + 1), countIterations) t.Run(testName, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) initTable(t) + + ctx, cancel := context.WithTimeout(ctx, workloadDuration) + defer cancel() + var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() runMultipleConnections(ctx, t) }() - time.Sleep(5 * time.Second) - cancel() // will cause runMultipleConnections() to terminate wg.Wait() testSelectTableMetrics(t) }) @@ -285,7 +288,7 @@ func TestSchemaChange(t *testing.T) { // the vreplication/ALTER TABLE did not corrupt our data and we are happy. testName := fmt.Sprintf("ALTER TABLE with workload %d/%d", (i + 1), countIterations) t.Run(testName, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + ctx := context.Background() t.Run("create schema", func(t *testing.T) { testWithInitialSchema(t) }) @@ -293,6 +296,9 @@ func TestSchemaChange(t *testing.T) { initTable(t) }) t.Run("migrate", func(t *testing.T) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + var wg sync.WaitGroup wg.Add(1) go func() { @@ -302,7 +308,7 @@ func TestSchemaChange(t *testing.T) { hint := fmt.Sprintf("hint-alter-with-workload-%d", i) uuid := testOnlineDDLStatement(t, fmt.Sprintf(alterHintStatement, hint), onlineDDLStrategy, "vtgate", hint) onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) - cancel() // will cause runMultipleConnections() to terminate + cancel() // Now that the migration is complete, we can stop the workload. wg.Wait() }) t.Run("validate metrics", func(t *testing.T) { @@ -485,7 +491,7 @@ func generateDelete(t *testing.T, conn *mysql.Conn) error { return err } -func runSingleConnection(ctx context.Context, t *testing.T, done *int64) { +func runSingleConnection(ctx context.Context, t *testing.T) { log.Infof("Running single connection") conn, err := mysql.Connect(ctx, &vtParams) require.Nil(t, err) @@ -497,10 +503,6 @@ func runSingleConnection(ctx context.Context, t *testing.T, done *int64) { require.Nil(t, err) for { - if atomic.LoadInt64(done) == 1 { - log.Infof("Terminating single connection") - return - } switch rand.Int31n(3) { case 0: err = generateInsert(t, conn) @@ -509,27 +511,28 @@ func runSingleConnection(ctx context.Context, t *testing.T, done *int64) { case 2: err = generateDelete(t, conn) } + select { + case <-ctx.Done(): + log.Infof("Terminating single connection") + return + case <-time.After(singleConnectionSleepInterval): + } assert.Nil(t, err) - time.Sleep(singleConnectionSleepInterval) } } func runMultipleConnections(ctx context.Context, t *testing.T) { log.Infof("Running multiple connections") - var done int64 var wg sync.WaitGroup for i := 0; i < maxConcurrency; i++ { wg.Add(1) go func() { defer wg.Done() - runSingleConnection(ctx, t, &done) + runSingleConnection(ctx, t) }() } - <-ctx.Done() - atomic.StoreInt64(&done, 1) - log.Infof("Running multiple connections: done") wg.Wait() - log.Infof("All connections cancelled") + log.Infof("Running multiple connections: done") } func initTable(t *testing.T) {