Skip to content

Commit

Permalink
OnlineDDL: reduce vrepl_stress workload in forks (#14302)
Browse files Browse the repository at this point in the history
Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
  • Loading branch information
shlomi-noach committed Oct 24, 2023
1 parent b5cbd36 commit 94572b3
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"math/rand"
"os"
"path"
"runtime"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -135,13 +136,14 @@ var (
writeMetrics WriteMetrics
)

var (
countIterations = 5
)

const (
maxTableRows = 4096
workloadDuration = 5 * time.Second
maxConcurrency = 20
singleConnectionSleepInterval = 2 * time.Millisecond
countIterations = 5
migrationWaitTimeout = 60 * time.Second
maxTableRows = 4096
workloadDuration = 5 * time.Second
migrationWaitTimeout = 60 * time.Second
)

func resetOpOrder() {
Expand Down Expand Up @@ -377,6 +379,9 @@ func checkTablesCount(t *testing.T, tablet *cluster.Vttablet, showTableName stri
query := fmt.Sprintf(`show tables like '%%%s%%';`, showTableName)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

rowcount := 0

for {
Expand All @@ -388,7 +393,7 @@ func checkTablesCount(t *testing.T, tablet *cluster.Vttablet, showTableName stri
}

select {
case <-time.After(time.Second):
case <-ticker.C:
continue // Keep looping
case <-ctx.Done():
// Break below to the assertion
Expand Down Expand Up @@ -491,7 +496,7 @@ func generateDelete(t *testing.T, conn *mysql.Conn) error {
return err
}

func runSingleConnection(ctx context.Context, t *testing.T) {
func runSingleConnection(ctx context.Context, t *testing.T, sleepInterval time.Duration) {
log.Infof("Running single connection")
conn, err := mysql.Connect(ctx, &vtParams)
require.Nil(t, err)
Expand All @@ -502,6 +507,9 @@ func runSingleConnection(ctx context.Context, t *testing.T) {
_, err = conn.ExecuteFetch("set transaction isolation level read committed", 1000, true)
require.Nil(t, err)

ticker := time.NewTicker(sleepInterval)
defer ticker.Stop()

for {
switch rand.Int31n(3) {
case 0:
Expand All @@ -515,20 +523,31 @@ func runSingleConnection(ctx context.Context, t *testing.T) {
case <-ctx.Done():
log.Infof("Terminating single connection")
return
case <-time.After(singleConnectionSleepInterval):
case <-ticker.C:
}
assert.Nil(t, err)
}
}

func runMultipleConnections(ctx context.Context, t *testing.T) {
log.Infof("Running multiple connections")
// The workload for a 16 vCPU machine is:
// - Concurrency of 16
// - 2ms interval between queries for each connection
// As the number of vCPUs decreases, so do we decrease concurrency, and increase intervals. For example, on a 8 vCPU machine
// we run concurrency of 8 and interval of 4ms. On a 4 vCPU machine we run concurrency of 4 and interval of 8ms.
maxConcurrency := runtime.NumCPU()
sleepModifier := 16.0 / float64(maxConcurrency)
baseSleepInterval := 2 * time.Millisecond
singleConnectionSleepIntervalNanoseconds := float64(baseSleepInterval.Nanoseconds()) * sleepModifier
sleepInterval := time.Duration(int64(singleConnectionSleepIntervalNanoseconds))

log.Infof("Running multiple connections: maxConcurrency=%v, sleep interval=%v", maxConcurrency, sleepInterval)
var wg sync.WaitGroup
for i := 0; i < maxConcurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
runSingleConnection(ctx, t)
runSingleConnection(ctx, t, sleepInterval)
}()
}
wg.Wait()
Expand Down
15 changes: 11 additions & 4 deletions go/test/endtoend/onlineddl/vtgate_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,13 @@ func WaitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []c
for _, status := range expectStatuses {
statusesMap[string(status)] = true
}
startTime := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

lastKnownStatus := ""
for time.Since(startTime) < timeout {
for {
countMatchedShards := 0
r := VtgateExecQuery(t, vtParams, query, "")
for _, row := range r.Named().Rows {
Expand All @@ -266,9 +270,12 @@ func WaitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []c
if countMatchedShards == len(shards) {
return schema.OnlineDDLStatus(lastKnownStatus)
}
time.Sleep(1 * time.Second)
select {
case <-ctx.Done():
return schema.OnlineDDLStatus(lastKnownStatus)
case <-ticker.C:
}
}
return schema.OnlineDDLStatus(lastKnownStatus)
}

// CheckMigrationArtifacts verifies given migration exists, and checks if it has artifacts
Expand Down

0 comments on commit 94572b3

Please sign in to comment.