Skip to content

Commit

Permalink
Merge pull request #128 from github/arthur/buffering-backports
Browse files Browse the repository at this point in the history
Buffering related backports
  • Loading branch information
arthurschreiber authored Nov 8, 2024
2 parents f30abe0 + fd8c3cb commit eb53017
Show file tree
Hide file tree
Showing 32 changed files with 1,288 additions and 369 deletions.
63 changes: 63 additions & 0 deletions go/test/endtoend/reparent/newfeaturetest/reparent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ package newfeaturetest
import (
"context"
"fmt"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/reparent/utils"
)
Expand Down Expand Up @@ -146,3 +149,63 @@ func TestChangeTypeWithoutSemiSync(t *testing.T) {
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", replica.Alias, "replica")
require.NoError(t, err)
}

func TestBufferingWithMultipleDisruptions(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := utils.SetupShardedReparentCluster(t, "semi_sync")
defer utils.TeardownCluster(clusterInstance)

// Stop all VTOrc instances, so that they don't interfere with the test.
for _, vtorc := range clusterInstance.VTOrcProcesses {
err := vtorc.TearDown()
require.NoError(t, err)
}

// Start by reparenting all the shards to the first tablet.
keyspace := clusterInstance.Keyspaces[0]
shards := keyspace.Shards
for _, shard := range shards {
err := clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspace.Name, shard.Name, shard.Vttablets[0].Alias)
require.NoError(t, err)
}

// We simulate start of external reparent or a PRS where the healthcheck update from the tablet gets lost in transit
// to vtgate by just setting the primary read only. This is also why we needed to shutdown all VTOrcs, so that they don't
// fix this.
utils.RunSQL(context.Background(), t, "set global read_only=1", shards[0].Vttablets[0])
utils.RunSQL(context.Background(), t, "set global read_only=1", shards[1].Vttablets[0])

wg := sync.WaitGroup{}
rowCount := 10
vtParams := clusterInstance.GetVTParams(keyspace.Name)
// We now spawn writes for a bunch of go routines.
// The ones going to shard 1 and shard 2 should block, since
// they're in the midst of a reparenting operation (as seen by the buffering code).
for i := 1; i <= rowCount; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
conn, err := mysql.Connect(context.Background(), &vtParams)
if err != nil {
return
}
defer conn.Close()
_, err = conn.ExecuteFetch(utils.GetInsertQuery(i), 0, false)
require.NoError(t, err)
}(i)
}

// Now, run a PRS call on the last shard. This shouldn't unbuffer the queries that are buffered for shards 1 and 2
// since the disruption on the two shards hasn't stopped.
err := clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspace.Name, shards[2].Name, shards[2].Vttablets[1].Alias)
require.NoError(t, err)
// We wait a second just to make sure the PRS changes are processed by the buffering logic in vtgate.
time.Sleep(1 * time.Second)
// Finally, we'll now make the 2 shards healthy again by running PRS.
err = clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspace.Name, shards[0].Name, shards[0].Vttablets[1].Alias)
require.NoError(t, err)
err = clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspace.Name, shards[1].Name, shards[1].Vttablets[1].Alias)
require.NoError(t, err)
// Wait for all the writes to have succeeded.
wg.Wait()
}
54 changes: 53 additions & 1 deletion go/test/endtoend/reparent/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ var (
id bigint,
msg varchar(64),
primary key (id)
) Engine=InnoDB
) Engine=InnoDB
`
cell1 = "zone1"
cell2 = "zone2"
Expand All @@ -75,6 +75,58 @@ func SetupRangeBasedCluster(ctx context.Context, t *testing.T) *cluster.LocalPro
return setupCluster(ctx, t, ShardName, []string{cell1}, []int{2}, "semi_sync")
}

// SetupShardedReparentCluster is used to setup a sharded cluster for testing
func SetupShardedReparentCluster(t *testing.T, durability string) *cluster.LocalProcessCluster {
clusterInstance := cluster.NewCluster(cell1, Hostname)
// Start topo server
err := clusterInstance.StartTopo()
require.NoError(t, err)

clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs,
"--lock_tables_timeout", "5s",
// Fast health checks help find corner cases.
"--health_check_interval", "1s",
"--track_schema_versions=true",
"--queryserver_enable_online_ddl=false")
clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs,
"--enable_buffer",
// Long timeout in case failover is slow.
"--buffer_window", "10m",
"--buffer_max_failover_duration", "10m",
"--buffer_min_time_between_failovers", "20m",
)

// Start keyspace
keyspace := &cluster.Keyspace{
Name: KeyspaceName,
SchemaSQL: sqlSchema,
VSchema: `{"sharded": true, "vindexes": {"hash_index": {"type": "hash"}}, "tables": {"vt_insert_test": {"column_vindexes": [{"column": "id", "name": "hash_index"}]}}}`,
}
err = clusterInstance.StartKeyspace(*keyspace, []string{"-40", "40-80", "80-"}, 2, false)
require.NoError(t, err)

if clusterInstance.VtctlMajorVersion >= 14 {
clusterInstance.VtctldClientProcess = *cluster.VtctldClientProcessInstance("localhost", clusterInstance.VtctldProcess.GrpcPort, clusterInstance.TmpDirectory)
out, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", KeyspaceName, fmt.Sprintf("--durability-policy=%s", durability))
require.NoError(t, err, out)
}

// Start Vtgate
err = clusterInstance.StartVtgate()
require.NoError(t, err)
return clusterInstance
}

// GetInsertQuery returns a built insert query to insert a row.
func GetInsertQuery(idx int) string {
return fmt.Sprintf(insertSQL, idx, idx)
}

// GetSelectionQuery returns a built selection query read the data.
func GetSelectionQuery() string {
return `select * from vt_insert_test`
}

// TeardownCluster is used to teardown the reparent cluster. When
// run in a CI environment -- which is considered true when the
// "CI" env variable is set to "true" -- the teardown also removes
Expand Down
5 changes: 3 additions & 2 deletions go/test/endtoend/vreplication/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ var (
sidecarDBIdentifier = sqlparser.String(sqlparser.NewIdentifierCS(sidecarDBName))
mainClusterConfig *ClusterConfig
externalClusterConfig *ClusterConfig
extraVTGateArgs = []string{"--tablet_refresh_interval", "10ms", "--enable_buffer", "--buffer_window", loadTestBufferingWindowDurationStr,
"--buffer_size", "100000", "--buffer_min_time_between_failovers", "0s", "--buffer_max_failover_duration", loadTestBufferingWindowDurationStr}
extraVTGateArgs = []string{"--tablet_refresh_interval", "10ms", "--enable_buffer", "--buffer_window", loadTestBufferingWindowDuration.String(),
"--buffer_size", "250000", "--buffer_min_time_between_failovers", "1s", "--buffer_max_failover_duration", loadTestBufferingWindowDuration.String(),
"--buffer_drain_concurrency", "10"}
extraVtctldArgs = []string{"--remote_operation_timeout", "600s", "--topo_etcd_lease_ttl", "120"}
// This variable can be used within specific tests to alter vttablet behavior
extraVTTabletArgs = []string{}
Expand Down
127 changes: 74 additions & 53 deletions go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@ package vreplication

import (
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"math/rand"
"net/http"
"os/exec"
"regexp"
"sort"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -75,9 +76,10 @@ func execQuery(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result {

func getConnection(t *testing.T, hostname string, port int) *mysql.Conn {
vtParams := mysql.ConnParams{
Host: hostname,
Port: port,
Uname: "vt_dba",
Host: hostname,
Port: port,
Uname: "vt_dba",
ConnectTimeoutMs: 1000,
}
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
Expand Down Expand Up @@ -714,94 +716,113 @@ func isBinlogRowImageNoBlob(t *testing.T, tablet *cluster.VttabletProcess) bool
}

const (
loadTestBufferingWindowDurationStr = "30s"
loadTestPostBufferingInsertWindow = 60 * time.Second // should be greater than loadTestBufferingWindowDurationStr
loadTestWaitForCancel = 30 * time.Second
loadTestWaitBetweenQueries = 2 * time.Millisecond
loadTestBufferingWindowDuration = 10 * time.Second
loadTestAvgWaitBetweenQueries = 500 * time.Microsecond
loadTestDefaultConnections = 100
)

type loadGenerator struct {
t *testing.T
vc *VitessCluster
ctx context.Context
cancel context.CancelFunc
t *testing.T
vc *VitessCluster
ctx context.Context
cancel context.CancelFunc
connections int
wg sync.WaitGroup
}

func newLoadGenerator(t *testing.T, vc *VitessCluster) *loadGenerator {
return &loadGenerator{
t: t,
vc: vc,
t: t,
vc: vc,
connections: loadTestDefaultConnections,
}
}

func (lg *loadGenerator) stop() {
time.Sleep(loadTestPostBufferingInsertWindow) // wait for buffering to stop and additional records to be inserted by startLoad after traffic is switched
// Wait for buffering to stop and additional records to be inserted by start
// after traffic is switched.
time.Sleep(loadTestBufferingWindowDuration * 2)
log.Infof("Canceling load")
lg.cancel()
time.Sleep(loadTestWaitForCancel) // wait for cancel to take effect
lg.wg.Wait()
log.Flush()

}

func (lg *loadGenerator) start() {
t := lg.t
lg.ctx, lg.cancel = context.WithCancel(context.Background())
var connectionCount atomic.Int64

var id int64
log.Infof("startLoad: starting")
log.Infof("loadGenerator: starting")
queryTemplate := "insert into loadtest(id, name) values (%d, 'name-%d')"
var totalQueries, successfulQueries int64
var deniedErrors, ambiguousErrors, reshardedErrors, tableNotFoundErrors, otherErrors int64
lg.wg.Add(1)
defer func() {

log.Infof("startLoad: totalQueries: %d, successfulQueries: %d, deniedErrors: %d, ambiguousErrors: %d, reshardedErrors: %d, tableNotFoundErrors: %d, otherErrors: %d",
defer lg.wg.Done()
log.Infof("loadGenerator: totalQueries: %d, successfulQueries: %d, deniedErrors: %d, ambiguousErrors: %d, reshardedErrors: %d, tableNotFoundErrors: %d, otherErrors: %d",
totalQueries, successfulQueries, deniedErrors, ambiguousErrors, reshardedErrors, tableNotFoundErrors, otherErrors)
}()
logOnce := true
for {
select {
case <-lg.ctx.Done():
log.Infof("startLoad: context cancelled")
log.Infof("startLoad: deniedErrors: %d, ambiguousErrors: %d, reshardedErrors: %d, tableNotFoundErrors: %d, otherErrors: %d",
log.Infof("loadGenerator: context cancelled")
log.Infof("loadGenerator: deniedErrors: %d, ambiguousErrors: %d, reshardedErrors: %d, tableNotFoundErrors: %d, otherErrors: %d",
deniedErrors, ambiguousErrors, reshardedErrors, tableNotFoundErrors, otherErrors)
require.Equal(t, int64(0), deniedErrors)
require.Equal(t, int64(0), otherErrors)
require.Equal(t, int64(0), reshardedErrors)
require.Equal(t, totalQueries, successfulQueries)
return
default:
go func() {
conn := vc.GetVTGateConn(t)
defer conn.Close()
atomic.AddInt64(&id, 1)
query := fmt.Sprintf(queryTemplate, id, id)
_, err := conn.ExecuteFetch(query, 1, false)
atomic.AddInt64(&totalQueries, 1)
if err != nil {
sqlErr := err.(*sqlerror.SQLError)
if strings.Contains(strings.ToLower(err.Error()), "denied tables") {
log.Infof("startLoad: denied tables error executing query: %d:%v", sqlErr.Number(), err)
atomic.AddInt64(&deniedErrors, 1)
} else if strings.Contains(strings.ToLower(err.Error()), "ambiguous") {
// this can happen when a second keyspace is setup with the same tables, but there are no routing rules
// set yet by MoveTables. So we ignore these errors.
atomic.AddInt64(&ambiguousErrors, 1)
} else if strings.Contains(strings.ToLower(err.Error()), "current keyspace is being resharded") {
atomic.AddInt64(&reshardedErrors, 1)
} else if strings.Contains(strings.ToLower(err.Error()), "not found") {
atomic.AddInt64(&tableNotFoundErrors, 1)
} else {
if logOnce {
log.Infof("startLoad: error executing query: %d:%v", sqlErr.Number(), err)
logOnce = false
if int(connectionCount.Load()) < lg.connections {
connectionCount.Add(1)
lg.wg.Add(1)
go func() {
defer lg.wg.Done()
defer connectionCount.Add(-1)
conn := vc.GetVTGateConn(t)
defer conn.Close()
for {
select {
case <-lg.ctx.Done():
return
default:
}
newID := atomic.AddInt64(&id, 1)
query := fmt.Sprintf(queryTemplate, newID, newID)
_, err := conn.ExecuteFetch(query, 1, false)
atomic.AddInt64(&totalQueries, 1)
if err != nil {
sqlErr := err.(*sqlerror.SQLError)
if strings.Contains(strings.ToLower(err.Error()), "denied tables") {
if debugMode {
t.Logf("loadGenerator: denied tables error executing query: %d:%v", sqlErr.Number(), err)
}
atomic.AddInt64(&deniedErrors, 1)
} else if strings.Contains(strings.ToLower(err.Error()), "ambiguous") {
// This can happen when a second keyspace is setup with the same tables, but
// there are no routing rules set yet by MoveTables. So we ignore these errors.
atomic.AddInt64(&ambiguousErrors, 1)
} else if strings.Contains(strings.ToLower(err.Error()), "current keyspace is being resharded") {
atomic.AddInt64(&reshardedErrors, 1)
} else if strings.Contains(strings.ToLower(err.Error()), "not found") {
atomic.AddInt64(&tableNotFoundErrors, 1)
} else {
if debugMode {
t.Logf("loadGenerator: error executing query: %d:%v", sqlErr.Number(), err)
}
atomic.AddInt64(&otherErrors, 1)
}
} else {
atomic.AddInt64(&successfulQueries, 1)
}
atomic.AddInt64(&otherErrors, 1)
time.Sleep(time.Duration(int64(float64(loadTestAvgWaitBetweenQueries.Microseconds()) * rand.Float64())))
}
time.Sleep(loadTestWaitBetweenQueries)
} else {
atomic.AddInt64(&successfulQueries, 1)
}
}()
time.Sleep(loadTestWaitBetweenQueries)
}()
}
}
}
}
Expand Down
9 changes: 7 additions & 2 deletions go/test/endtoend/vreplication/movetables_buffering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package vreplication

import (
"testing"
"time"

"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -34,8 +35,12 @@ func TestMoveTablesBuffering(t *testing.T) {
catchup(t, targetTab2, workflowName, "MoveTables")
vdiffSideBySide(t, ksWorkflow, "")
waitForLowLag(t, "customer", workflowName)
tstWorkflowSwitchReads(t, "", "")
tstWorkflowSwitchWrites(t)
for i := 0; i < 10; i++ {
tstWorkflowSwitchReadsAndWrites(t)
time.Sleep(loadTestBufferingWindowDuration + 1*time.Second)
tstWorkflowReverseReadsAndWrites(t)
time.Sleep(loadTestBufferingWindowDuration + 1*time.Second)
}
log.Infof("SwitchWrites done")
lg.stop()

Expand Down
Loading

0 comments on commit eb53017

Please sign in to comment.