Skip to content

Commit

Permalink
feat: move code out of test files to go file so that they can be used…
Browse files Browse the repository at this point in the history
… by other packages

Signed-off-by: Manan Gupta <manan@planetscale.com>
  • Loading branch information
GuptaManan100 committed Aug 28, 2024
1 parent e86ad02 commit 844c62f
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"os/exec"
"path"
"runtime"
"strconv"
"strings"
"sync"
"testing"
Expand All @@ -35,9 +36,12 @@ import (
"vitess.io/vitess/go/test/endtoend/throttler"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/mysqlctl"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext"
"vitess.io/vitess/go/vt/vttablet"
throttlebase "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"

vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"

Expand Down Expand Up @@ -65,6 +69,33 @@ var (
parallelInsertWorkers = "--vreplication-parallel-insert-workers=4"

throttlerConfig = throttler.Config{Threshold: 15}

vc *VitessCluster
defaultRdonly int
defaultReplicas int
sourceKsOpts = make(map[string]string)
targetKsOpts = make(map[string]string)
httpClient = throttlebase.SetupHTTPClient(time.Second)
sourceThrottlerAppName = throttlerapp.VStreamerName
targetThrottlerAppName = throttlerapp.VPlayerName

lastOutput string
currentWorkflowType binlogdatapb.VReplicationWorkflowType
)

const (
// Only used when debugging tests.
queryLog = "queries.txt"
tickInterval = 1 * time.Second

workflowActionCreate = "Create"
workflowActionMirrorTraffic = "Mirror"
workflowActionSwitchTraffic = "SwitchTraffic"
workflowActionReverseTraffic = "ReverseTraffic"
workflowActionComplete = "Complete"
workflowActionCancel = "Cancel"

BypassLagCheck = true // temporary fix for flakiness seen only in CI when lag check is introduced
)

// ClusterConfig defines the parameters like ports, tmpDir, tablet types which uniquely define a vitess cluster
Expand Down Expand Up @@ -979,3 +1010,71 @@ func setupDBTypeVersion(t *testing.T, value string) func() {
unsetVtMySQLRoot()
}
}

// tstWorkflowExecVtctl executes a MoveTables or Reshard workflow command using
// vtctlclient. It should operate exactly the same way as tstWorkflowExec, but
// using the legacy client.
func tstWorkflowExecVtctl(t *testing.T, cells, workflow, sourceKs, targetKs, tables, action, tabletTypes,
sourceShards, targetShards string, options *workflowExecOptions) error {

var args []string
if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_MoveTables {
args = append(args, "MoveTables")
} else {
args = append(args, "Reshard")
}

args = append(args, "--")

if BypassLagCheck {
args = append(args, "--max_replication_lag_allowed=2542087h")
}
if options.atomicCopy {
args = append(args, "--atomic-copy")
}
switch action {
case workflowActionCreate:
if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_MoveTables {
args = append(args, "--source", sourceKs)
if tables != "" {
args = append(args, "--tables", tables)
} else {
args = append(args, "--all")
}
if sourceShards != "" {
args = append(args, "--source_shards", sourceShards)
}
} else {
args = append(args, "--source_shards", sourceShards, "--target_shards", targetShards)
}
// Test new experimental --defer-secondary-keys flag
switch currentWorkflowType {
case binlogdatapb.VReplicationWorkflowType_MoveTables, binlogdatapb.VReplicationWorkflowType_Migrate, binlogdatapb.VReplicationWorkflowType_Reshard:
if !options.atomicCopy && options.deferSecondaryKeys {
args = append(args, "--defer-secondary-keys")
}
args = append(args, "--initialize-target-sequences") // Only used for MoveTables
}
case workflowActionMirrorTraffic:
args = append(args, "--percent", strconv.FormatFloat(float64(options.percent), byte('f'), -1, 32))
default:
if options.shardSubset != "" {
args = append(args, "--shards", options.shardSubset)
}
}
if cells != "" {
args = append(args, "--cells", cells)
}
if tabletTypes != "" {
args = append(args, "--tablet_types", tabletTypes)
}
args = append(args, "--timeout", time.Minute.String())
ksWorkflow := fmt.Sprintf("%s.%s", targetKs, workflow)
args = append(args, action, ksWorkflow)
output, err := vc.VtctlClient.ExecuteCommandWithOutput(args...)
lastOutput = output
if err != nil {
return fmt.Errorf("%s: %s", err, output)
}
return nil
}
4 changes: 0 additions & 4 deletions go/test/endtoend/vreplication/fk_ext_load_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,11 @@ import (
)

const (
// Only used when debugging tests.
queryLog = "queries.txt"

LoadGeneratorStateLoading = "loading"
LoadGeneratorStateRunning = "running"
LoadGeneratorStateStopped = "stopped"

dataLoadTimeout = 1 * time.Minute
tickInterval = 1 * time.Second
queryTimeout = 1 * time.Minute

getRandomIdQuery = "SELECT id FROM %s.parent ORDER BY RAND() LIMIT 1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ import (
"fmt"
"io"
"math/rand/v2"
"net"
"net/http"
"os"
"os/exec"
"regexp"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -202,6 +204,18 @@ func waitForTabletThrottlingStatus(t *testing.T, tablet *cluster.VttabletProcess
}
}

func throttlerCheckSelf(tablet *cluster.VttabletProcess, throttlerApp throttlerapp.Name) (respBody string, err error) {
apiURL := fmt.Sprintf("http://%s:%d/throttler/check-self?app=%s", tablet.TabletHostname, tablet.Port, throttlerApp.String())
resp, err := httpClient.Get(apiURL)
if err != nil {
return "", err
}
defer resp.Body.Close()
b, err := io.ReadAll(resp.Body)
respBody = string(b)
return respBody, err
}

// waitForNoWorkflowLag waits for the VReplication workflow's MaxVReplicationTransactionLag
// value to be 0.
func waitForNoWorkflowLag(t *testing.T, vc *VitessCluster, keyspace, worfklow string) {
Expand Down Expand Up @@ -713,6 +727,10 @@ func getShardRoutingRules(t *testing.T) string {
return output
}

func getVtctldGRPCURL() string {
return net.JoinHostPort("localhost", strconv.Itoa(vc.Vtctld.GrpcPort))
}

func verifyCopyStateIsOptimized(t *testing.T, tablet *cluster.VttabletProcess) {
// Update information_schem with the latest data
_, err := tablet.QueryTablet(sqlparser.BuildParsedQuery("analyze table %s.copy_state", sidecarDBIdentifier).Query, "", false)
Expand Down
97 changes: 0 additions & 97 deletions go/test/endtoend/vreplication/resharding_workflows_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
"encoding/json"
"fmt"
"math/rand/v2"
"net"
"strconv"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -50,34 +48,11 @@ const (
defaultCellName = "zone1"
)

const (
workflowActionCreate = "Create"
workflowActionMirrorTraffic = "Mirror"
workflowActionSwitchTraffic = "SwitchTraffic"
workflowActionReverseTraffic = "ReverseTraffic"
workflowActionComplete = "Complete"
workflowActionCancel = "Cancel"
)

var (
targetTab1, targetTab2, targetReplicaTab1, targetRdonlyTab1 *cluster.VttabletProcess
sourceTab, sourceReplicaTab, sourceRdonlyTab *cluster.VttabletProcess

lastOutput string
currentWorkflowType binlogdatapb.VReplicationWorkflowType
)

type workflowExecOptions struct {
deferSecondaryKeys bool
atomicCopy bool
shardSubset string
percent float32
}

var defaultWorkflowExecOptions = &workflowExecOptions{
deferSecondaryKeys: true,
}

func createReshardWorkflow(t *testing.T, sourceShards, targetShards string) error {
err := tstWorkflowExec(t, defaultCellName, workflowName, targetKs, targetKs,
"", workflowActionCreate, "", sourceShards, targetShards, defaultWorkflowExecOptions)
Expand Down Expand Up @@ -180,74 +155,6 @@ func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables,
return nil
}

// tstWorkflowExecVtctl executes a MoveTables or Reshard workflow command using
// vtctlclient. It should operate exactly the same way as tstWorkflowExec, but
// using the legacy client.
func tstWorkflowExecVtctl(t *testing.T, cells, workflow, sourceKs, targetKs, tables, action, tabletTypes,
sourceShards, targetShards string, options *workflowExecOptions) error {

var args []string
if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_MoveTables {
args = append(args, "MoveTables")
} else {
args = append(args, "Reshard")
}

args = append(args, "--")

if BypassLagCheck {
args = append(args, "--max_replication_lag_allowed=2542087h")
}
if options.atomicCopy {
args = append(args, "--atomic-copy")
}
switch action {
case workflowActionCreate:
if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_MoveTables {
args = append(args, "--source", sourceKs)
if tables != "" {
args = append(args, "--tables", tables)
} else {
args = append(args, "--all")
}
if sourceShards != "" {
args = append(args, "--source_shards", sourceShards)
}
} else {
args = append(args, "--source_shards", sourceShards, "--target_shards", targetShards)
}
// Test new experimental --defer-secondary-keys flag
switch currentWorkflowType {
case binlogdatapb.VReplicationWorkflowType_MoveTables, binlogdatapb.VReplicationWorkflowType_Migrate, binlogdatapb.VReplicationWorkflowType_Reshard:
if !options.atomicCopy && options.deferSecondaryKeys {
args = append(args, "--defer-secondary-keys")
}
args = append(args, "--initialize-target-sequences") // Only used for MoveTables
}
case workflowActionMirrorTraffic:
args = append(args, "--percent", strconv.FormatFloat(float64(options.percent), byte('f'), -1, 32))
default:
if options.shardSubset != "" {
args = append(args, "--shards", options.shardSubset)
}
}
if cells != "" {
args = append(args, "--cells", cells)
}
if tabletTypes != "" {
args = append(args, "--tablet_types", tabletTypes)
}
args = append(args, "--timeout", time.Minute.String())
ksWorkflow := fmt.Sprintf("%s.%s", targetKs, workflow)
args = append(args, action, ksWorkflow)
output, err := vc.VtctlClient.ExecuteCommandWithOutput(args...)
lastOutput = output
if err != nil {
return fmt.Errorf("%s: %s", err, output)
}
return nil
}

func tstWorkflowSwitchReads(t *testing.T, tabletTypes, cells string) {
if tabletTypes == "" {
tabletTypes = "replica,rdonly"
Expand Down Expand Up @@ -424,10 +331,6 @@ func TestBasicV2Workflows(t *testing.T) {
testReshardV2Workflow(t)
}

func getVtctldGRPCURL() string {
return net.JoinHostPort("localhost", strconv.Itoa(vc.Vtctld.GrpcPort))
}

func applyShardRoutingRules(t *testing.T, rules string) {
output, err := osExec(t, "vtctldclient", []string{"--server", getVtctldGRPCURL(), "ApplyShardRoutingRules", "--rules", rules})
log.Infof("ApplyShardRoutingRules err: %+v, output: %+v", err, output)
Expand Down
25 changes: 0 additions & 25 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,6 @@ import (
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
throttlebase "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base"
)

var (
vc *VitessCluster
defaultRdonly int
defaultReplicas int
sourceKsOpts = make(map[string]string)
targetKsOpts = make(map[string]string)
httpClient = throttlebase.SetupHTTPClient(time.Second)
sourceThrottlerAppName = throttlerapp.VStreamerName
targetThrottlerAppName = throttlerapp.VPlayerName
)

const (
Expand All @@ -71,7 +59,6 @@ const (

merchantKeyspace = "merchant-type"
maxWait = 60 * time.Second
BypassLagCheck = true // temporary fix for flakiness seen only in CI when lag check is introduced
throttlerStatusThrottled = http.StatusExpectationFailed // 417
throttlerStatusNotThrottled = http.StatusOK // 200
)
Expand Down Expand Up @@ -101,18 +88,6 @@ func unthrottleApp(tablet *cluster.VttabletProcess, throttlerApp throttlerapp.Na
return throttleResponse(tablet, fmt.Sprintf("throttler/unthrottle-app?app=%s", throttlerApp.String()))
}

func throttlerCheckSelf(tablet *cluster.VttabletProcess, throttlerApp throttlerapp.Name) (respBody string, err error) {
apiURL := fmt.Sprintf("http://%s:%d/throttler/check-self?app=%s", tablet.TabletHostname, tablet.Port, throttlerApp.String())
resp, err := httpClient.Get(apiURL)
if err != nil {
return "", err
}
defer resp.Body.Close()
b, err := io.ReadAll(resp.Body)
respBody = string(b)
return respBody, err
}

// TestVReplicationDDLHandling tests the DDL handling in
// VReplication for the values of IGNORE, STOP, and EXEC.
// NOTE: this is a manual test. It is not executed in the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,17 @@ import (
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)

type workflowExecOptions struct {
deferSecondaryKeys bool
atomicCopy bool
shardSubset string
percent float32
}

var defaultWorkflowExecOptions = &workflowExecOptions{
deferSecondaryKeys: true,
}

type iWorkflow interface {
Create()
Show()
Expand Down

0 comments on commit 844c62f

Please sign in to comment.