Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Buffering related backports #128

Merged
merged 12 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions go/test/endtoend/reparent/newfeaturetest/reparent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ package newfeaturetest
import (
"context"
"fmt"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/reparent/utils"
)
Expand Down Expand Up @@ -146,3 +149,63 @@ func TestChangeTypeWithoutSemiSync(t *testing.T) {
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", replica.Alias, "replica")
require.NoError(t, err)
}

func TestBufferingWithMultipleDisruptions(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := utils.SetupShardedReparentCluster(t, "semi_sync")
defer utils.TeardownCluster(clusterInstance)

// Stop all VTOrc instances, so that they don't interfere with the test.
for _, vtorc := range clusterInstance.VTOrcProcesses {
err := vtorc.TearDown()
require.NoError(t, err)
}

// Start by reparenting all the shards to the first tablet.
keyspace := clusterInstance.Keyspaces[0]
shards := keyspace.Shards
for _, shard := range shards {
err := clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspace.Name, shard.Name, shard.Vttablets[0].Alias)
require.NoError(t, err)
}

// We simulate start of external reparent or a PRS where the healthcheck update from the tablet gets lost in transit
// to vtgate by just setting the primary read only. This is also why we needed to shutdown all VTOrcs, so that they don't
// fix this.
utils.RunSQL(context.Background(), t, "set global read_only=1", shards[0].Vttablets[0])
utils.RunSQL(context.Background(), t, "set global read_only=1", shards[1].Vttablets[0])

wg := sync.WaitGroup{}
rowCount := 10
vtParams := clusterInstance.GetVTParams(keyspace.Name)
// We now spawn writes for a bunch of go routines.
// The ones going to shard 1 and shard 2 should block, since
// they're in the midst of a reparenting operation (as seen by the buffering code).
for i := 1; i <= rowCount; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
conn, err := mysql.Connect(context.Background(), &vtParams)
if err != nil {
return
}
defer conn.Close()
_, err = conn.ExecuteFetch(utils.GetInsertQuery(i), 0, false)
require.NoError(t, err)
}(i)
}

// Now, run a PRS call on the last shard. This shouldn't unbuffer the queries that are buffered for shards 1 and 2
// since the disruption on the two shards hasn't stopped.
err := clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspace.Name, shards[2].Name, shards[2].Vttablets[1].Alias)
require.NoError(t, err)
// We wait a second just to make sure the PRS changes are processed by the buffering logic in vtgate.
time.Sleep(1 * time.Second)
// Finally, we'll now make the 2 shards healthy again by running PRS.
err = clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspace.Name, shards[0].Name, shards[0].Vttablets[1].Alias)
require.NoError(t, err)
err = clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspace.Name, shards[1].Name, shards[1].Vttablets[1].Alias)
require.NoError(t, err)
// Wait for all the writes to have succeeded.
wg.Wait()
}
54 changes: 53 additions & 1 deletion go/test/endtoend/reparent/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ var (
id bigint,
msg varchar(64),
primary key (id)
) Engine=InnoDB
) Engine=InnoDB
`
cell1 = "zone1"
cell2 = "zone2"
Expand All @@ -75,6 +75,58 @@ func SetupRangeBasedCluster(ctx context.Context, t *testing.T) *cluster.LocalPro
return setupCluster(ctx, t, ShardName, []string{cell1}, []int{2}, "semi_sync")
}

// SetupShardedReparentCluster is used to setup a sharded cluster for testing
func SetupShardedReparentCluster(t *testing.T, durability string) *cluster.LocalProcessCluster {
clusterInstance := cluster.NewCluster(cell1, Hostname)
// Start topo server
err := clusterInstance.StartTopo()
require.NoError(t, err)

clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs,
"--lock_tables_timeout", "5s",
// Fast health checks help find corner cases.
"--health_check_interval", "1s",
"--track_schema_versions=true",
"--queryserver_enable_online_ddl=false")
clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs,
"--enable_buffer",
// Long timeout in case failover is slow.
"--buffer_window", "10m",
"--buffer_max_failover_duration", "10m",
"--buffer_min_time_between_failovers", "20m",
)

// Start keyspace
keyspace := &cluster.Keyspace{
Name: KeyspaceName,
SchemaSQL: sqlSchema,
VSchema: `{"sharded": true, "vindexes": {"hash_index": {"type": "hash"}}, "tables": {"vt_insert_test": {"column_vindexes": [{"column": "id", "name": "hash_index"}]}}}`,
}
err = clusterInstance.StartKeyspace(*keyspace, []string{"-40", "40-80", "80-"}, 2, false)
require.NoError(t, err)

if clusterInstance.VtctlMajorVersion >= 14 {
clusterInstance.VtctldClientProcess = *cluster.VtctldClientProcessInstance("localhost", clusterInstance.VtctldProcess.GrpcPort, clusterInstance.TmpDirectory)
out, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", KeyspaceName, fmt.Sprintf("--durability-policy=%s", durability))
require.NoError(t, err, out)
}

// Start Vtgate
err = clusterInstance.StartVtgate()
require.NoError(t, err)
return clusterInstance
}

// GetInsertQuery returns a built insert query to insert a row.
func GetInsertQuery(idx int) string {
return fmt.Sprintf(insertSQL, idx, idx)
}

// GetSelectionQuery returns a built selection query read the data.
func GetSelectionQuery() string {
return `select * from vt_insert_test`
}

// TeardownCluster is used to teardown the reparent cluster. When
// run in a CI environment -- which is considered true when the
// "CI" env variable is set to "true" -- the teardown also removes
Expand Down
5 changes: 3 additions & 2 deletions go/test/endtoend/vreplication/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ var (
sidecarDBIdentifier = sqlparser.String(sqlparser.NewIdentifierCS(sidecarDBName))
mainClusterConfig *ClusterConfig
externalClusterConfig *ClusterConfig
extraVTGateArgs = []string{"--tablet_refresh_interval", "10ms", "--enable_buffer", "--buffer_window", loadTestBufferingWindowDurationStr,
"--buffer_size", "100000", "--buffer_min_time_between_failovers", "0s", "--buffer_max_failover_duration", loadTestBufferingWindowDurationStr}
extraVTGateArgs = []string{"--tablet_refresh_interval", "10ms", "--enable_buffer", "--buffer_window", loadTestBufferingWindowDuration.String(),
"--buffer_size", "250000", "--buffer_min_time_between_failovers", "1s", "--buffer_max_failover_duration", loadTestBufferingWindowDuration.String(),
"--buffer_drain_concurrency", "10"}
extraVtctldArgs = []string{"--remote_operation_timeout", "600s", "--topo_etcd_lease_ttl", "120"}
// This variable can be used within specific tests to alter vttablet behavior
extraVTTabletArgs = []string{}
Expand Down
127 changes: 74 additions & 53 deletions go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@ package vreplication

import (
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"math/rand"
"net/http"
"os/exec"
"regexp"
"sort"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -75,9 +76,10 @@ func execQuery(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result {

func getConnection(t *testing.T, hostname string, port int) *mysql.Conn {
vtParams := mysql.ConnParams{
Host: hostname,
Port: port,
Uname: "vt_dba",
Host: hostname,
Port: port,
Uname: "vt_dba",
ConnectTimeoutMs: 1000,
}
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
Expand Down Expand Up @@ -714,94 +716,113 @@ func isBinlogRowImageNoBlob(t *testing.T, tablet *cluster.VttabletProcess) bool
}

const (
loadTestBufferingWindowDurationStr = "30s"
loadTestPostBufferingInsertWindow = 60 * time.Second // should be greater than loadTestBufferingWindowDurationStr
loadTestWaitForCancel = 30 * time.Second
loadTestWaitBetweenQueries = 2 * time.Millisecond
loadTestBufferingWindowDuration = 10 * time.Second
loadTestAvgWaitBetweenQueries = 500 * time.Microsecond
loadTestDefaultConnections = 100
)

type loadGenerator struct {
t *testing.T
vc *VitessCluster
ctx context.Context
cancel context.CancelFunc
t *testing.T
vc *VitessCluster
ctx context.Context
cancel context.CancelFunc
connections int
wg sync.WaitGroup
}

func newLoadGenerator(t *testing.T, vc *VitessCluster) *loadGenerator {
return &loadGenerator{
t: t,
vc: vc,
t: t,
vc: vc,
connections: loadTestDefaultConnections,
}
}

func (lg *loadGenerator) stop() {
time.Sleep(loadTestPostBufferingInsertWindow) // wait for buffering to stop and additional records to be inserted by startLoad after traffic is switched
// Wait for buffering to stop and additional records to be inserted by start
// after traffic is switched.
time.Sleep(loadTestBufferingWindowDuration * 2)
log.Infof("Canceling load")
lg.cancel()
time.Sleep(loadTestWaitForCancel) // wait for cancel to take effect
lg.wg.Wait()
log.Flush()

}

func (lg *loadGenerator) start() {
t := lg.t
lg.ctx, lg.cancel = context.WithCancel(context.Background())
var connectionCount atomic.Int64

var id int64
log.Infof("startLoad: starting")
log.Infof("loadGenerator: starting")
queryTemplate := "insert into loadtest(id, name) values (%d, 'name-%d')"
var totalQueries, successfulQueries int64
var deniedErrors, ambiguousErrors, reshardedErrors, tableNotFoundErrors, otherErrors int64
lg.wg.Add(1)
defer func() {

log.Infof("startLoad: totalQueries: %d, successfulQueries: %d, deniedErrors: %d, ambiguousErrors: %d, reshardedErrors: %d, tableNotFoundErrors: %d, otherErrors: %d",
defer lg.wg.Done()
log.Infof("loadGenerator: totalQueries: %d, successfulQueries: %d, deniedErrors: %d, ambiguousErrors: %d, reshardedErrors: %d, tableNotFoundErrors: %d, otherErrors: %d",
totalQueries, successfulQueries, deniedErrors, ambiguousErrors, reshardedErrors, tableNotFoundErrors, otherErrors)
}()
logOnce := true
for {
select {
case <-lg.ctx.Done():
log.Infof("startLoad: context cancelled")
log.Infof("startLoad: deniedErrors: %d, ambiguousErrors: %d, reshardedErrors: %d, tableNotFoundErrors: %d, otherErrors: %d",
log.Infof("loadGenerator: context cancelled")
log.Infof("loadGenerator: deniedErrors: %d, ambiguousErrors: %d, reshardedErrors: %d, tableNotFoundErrors: %d, otherErrors: %d",
deniedErrors, ambiguousErrors, reshardedErrors, tableNotFoundErrors, otherErrors)
require.Equal(t, int64(0), deniedErrors)
require.Equal(t, int64(0), otherErrors)
require.Equal(t, int64(0), reshardedErrors)
require.Equal(t, totalQueries, successfulQueries)
return
default:
go func() {
conn := vc.GetVTGateConn(t)
defer conn.Close()
atomic.AddInt64(&id, 1)
query := fmt.Sprintf(queryTemplate, id, id)
_, err := conn.ExecuteFetch(query, 1, false)
atomic.AddInt64(&totalQueries, 1)
if err != nil {
sqlErr := err.(*sqlerror.SQLError)
if strings.Contains(strings.ToLower(err.Error()), "denied tables") {
log.Infof("startLoad: denied tables error executing query: %d:%v", sqlErr.Number(), err)
atomic.AddInt64(&deniedErrors, 1)
} else if strings.Contains(strings.ToLower(err.Error()), "ambiguous") {
// this can happen when a second keyspace is setup with the same tables, but there are no routing rules
// set yet by MoveTables. So we ignore these errors.
atomic.AddInt64(&ambiguousErrors, 1)
} else if strings.Contains(strings.ToLower(err.Error()), "current keyspace is being resharded") {
atomic.AddInt64(&reshardedErrors, 1)
} else if strings.Contains(strings.ToLower(err.Error()), "not found") {
atomic.AddInt64(&tableNotFoundErrors, 1)
} else {
if logOnce {
log.Infof("startLoad: error executing query: %d:%v", sqlErr.Number(), err)
logOnce = false
if int(connectionCount.Load()) < lg.connections {
connectionCount.Add(1)
lg.wg.Add(1)
go func() {
defer lg.wg.Done()
defer connectionCount.Add(-1)
conn := vc.GetVTGateConn(t)
defer conn.Close()
for {
select {
case <-lg.ctx.Done():
return
default:
}
newID := atomic.AddInt64(&id, 1)
query := fmt.Sprintf(queryTemplate, newID, newID)
_, err := conn.ExecuteFetch(query, 1, false)
atomic.AddInt64(&totalQueries, 1)
if err != nil {
sqlErr := err.(*sqlerror.SQLError)
if strings.Contains(strings.ToLower(err.Error()), "denied tables") {
if debugMode {
t.Logf("loadGenerator: denied tables error executing query: %d:%v", sqlErr.Number(), err)
}
atomic.AddInt64(&deniedErrors, 1)
} else if strings.Contains(strings.ToLower(err.Error()), "ambiguous") {
// This can happen when a second keyspace is setup with the same tables, but
// there are no routing rules set yet by MoveTables. So we ignore these errors.
atomic.AddInt64(&ambiguousErrors, 1)
} else if strings.Contains(strings.ToLower(err.Error()), "current keyspace is being resharded") {
atomic.AddInt64(&reshardedErrors, 1)
} else if strings.Contains(strings.ToLower(err.Error()), "not found") {
atomic.AddInt64(&tableNotFoundErrors, 1)
} else {
if debugMode {
t.Logf("loadGenerator: error executing query: %d:%v", sqlErr.Number(), err)
}
atomic.AddInt64(&otherErrors, 1)
}
} else {
atomic.AddInt64(&successfulQueries, 1)
}
atomic.AddInt64(&otherErrors, 1)
time.Sleep(time.Duration(int64(float64(loadTestAvgWaitBetweenQueries.Microseconds()) * rand.Float64())))
}
time.Sleep(loadTestWaitBetweenQueries)
} else {
atomic.AddInt64(&successfulQueries, 1)
}
}()
time.Sleep(loadTestWaitBetweenQueries)
}()
}
}
}
}
Expand Down
9 changes: 7 additions & 2 deletions go/test/endtoend/vreplication/movetables_buffering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package vreplication

import (
"testing"
"time"

"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -34,8 +35,12 @@ func TestMoveTablesBuffering(t *testing.T) {
catchup(t, targetTab2, workflowName, "MoveTables")
vdiffSideBySide(t, ksWorkflow, "")
waitForLowLag(t, "customer", workflowName)
tstWorkflowSwitchReads(t, "", "")
tstWorkflowSwitchWrites(t)
for i := 0; i < 10; i++ {
tstWorkflowSwitchReadsAndWrites(t)
time.Sleep(loadTestBufferingWindowDuration + 1*time.Second)
tstWorkflowReverseReadsAndWrites(t)
time.Sleep(loadTestBufferingWindowDuration + 1*time.Second)
}
log.Infof("SwitchWrites done")
lg.stop()

Expand Down
Loading
Loading