Skip to content

Commit

Permalink
Trim down PR
Browse files Browse the repository at this point in the history
These changes did not end up being necessary.

Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Jun 10, 2024
1 parent f66d6c2 commit 4fc0037
Show file tree
Hide file tree
Showing 14 changed files with 86 additions and 90 deletions.
4 changes: 2 additions & 2 deletions go/test/endtoend/vreplication/fk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (ls *fkLoadSimulator) simulateLoad() {

func (ls *fkLoadSimulator) getNumRowsParent(vtgateConn *mysql.Conn) int {
t := ls.t
qr := execQueryWithDatabase(t, vtgateConn, "fksource", "SELECT COUNT(*) FROM parent")
qr := execVtgateQuery(t, vtgateConn, "fksource", "SELECT COUNT(*) FROM parent")
require.NotNil(t, qr)
numRows, err := strconv.Atoi(qr.Rows[0][0].ToString())
require.NoError(t, err)
Expand Down Expand Up @@ -296,7 +296,7 @@ func (ls *fkLoadSimulator) exec(query string) *sqltypes.Result {
t := ls.t
vtgateConn, closeConn := getVTGateConn()
defer closeConn()
qr := execQueryWithDatabase(t, vtgateConn, "fksource", query)
qr := execVtgateQuery(t, vtgateConn, "fksource", query)
require.NotNil(t, qr)
return qr
}
Expand Down
23 changes: 9 additions & 14 deletions go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func execMultipleQueries(t *testing.T, conn *mysql.Conn, database string, lines
if strings.HasPrefix(query, "--") {
continue
}
execQueryWithDatabase(t, conn, database, string(query))
execVtgateQuery(t, conn, database, string(query))
}
}

Expand Down Expand Up @@ -134,7 +134,7 @@ func getConnection(t *testing.T, hostname string, port int) *mysql.Conn {
return conn
}

func execQueryWithDatabase(t *testing.T, conn *mysql.Conn, database string, query string) *sqltypes.Result {
func execVtgateQuery(t *testing.T, conn *mysql.Conn, database string, query string) *sqltypes.Result {
if strings.TrimSpace(query) == "" {
return nil
}
Expand All @@ -158,7 +158,7 @@ func waitForQueryResult(t *testing.T, conn *mysql.Conn, database string, query s
timer := time.NewTimer(defaultTimeout)
defer timer.Stop()
for {
qr := execQueryWithDatabase(t, conn, database, query)
qr := execVtgateQuery(t, conn, database, query)
require.NotNil(t, qr)
if want == fmt.Sprintf("%v", qr.Rows) {
return
Expand Down Expand Up @@ -232,7 +232,7 @@ func waitForNoWorkflowLag(t *testing.T, vc *VitessCluster, keyspace, worfklow st
// verifyNoInternalTables can e.g. be used to confirm that no internal tables were
// copied from a source to a target during a MoveTables or Reshard operation.
func verifyNoInternalTables(t *testing.T, conn *mysql.Conn, keyspaceShard string) {
qr := execQueryWithDatabase(t, conn, keyspaceShard, "show tables")
qr := execVtgateQuery(t, conn, keyspaceShard, "show tables")
require.NotNil(t, qr)
require.NotNil(t, qr.Rows)
for _, row := range qr.Rows {
Expand All @@ -247,7 +247,7 @@ func waitForRowCount(t *testing.T, conn *mysql.Conn, database string, table stri
timer := time.NewTimer(defaultTimeout)
defer timer.Stop()
for {
qr := execQueryWithDatabase(t, conn, database, query)
qr := execVtgateQuery(t, conn, database, query)
require.NotNil(t, qr)
if wantRes == fmt.Sprintf("%v", qr.Rows) {
return
Expand Down Expand Up @@ -319,7 +319,7 @@ func executeOnTablet(t *testing.T, conn *mysql.Conn, tablet *cluster.VttabletPro

count0, body0 := getQueryCount(t, queryStatsURL, matchQuery)

qr := execQueryWithDatabase(t, conn, ksName, query)
qr := execVtgateQuery(t, conn, ksName, query)
require.NotNil(t, qr)

count1, body1 := getQueryCount(t, queryStatsURL, matchQuery)
Expand Down Expand Up @@ -595,15 +595,10 @@ func expectNumberOfStreams(t *testing.T, vtgateConn *mysql.Conn, name string, wo

// confirmAllStreamsRunning confirms that all of the workflow's streams are
// in the running state.
func confirmAllStreamsRunning(t *testing.T, keyspace, shard string) {
func confirmAllStreamsRunning(t *testing.T, vtgateConn *mysql.Conn, database string) {
query := sqlparser.BuildParsedQuery("select count(*) from %s.vreplication where state != '%s'",
sidecarDBIdentifier, binlogdatapb.VReplicationWorkflowState_Running.String()).Query
tablet := vc.getPrimaryTablet(t, keyspace, shard)
// Query the tablet's mysqld directly as the target may have denied table entries.
dbc, err := tablet.TabletConn(keyspace, true)
require.NoError(t, err)
defer dbc.Close()
waitForQueryResult(t, dbc, sidecarDBName, query, `[[INT64(0)]]`)
waitForQueryResult(t, vtgateConn, database, query, `[[INT64(0)]]`)
}

func printShardPositions(vc *VitessCluster, ksShards []string) {
Expand Down Expand Up @@ -1005,7 +1000,7 @@ func vexplain(t *testing.T, database, query string) *VExplainPlan {
vtgateConn := vc.GetVTGateConn(t)
defer vtgateConn.Close()

qr := execQueryWithDatabase(t, vtgateConn, database, fmt.Sprintf("vexplain %s", query))
qr := execVtgateQuery(t, vtgateConn, database, fmt.Sprintf("vexplain %s", query))
require.NotNil(t, qr)
require.Equal(t, 1, len(qr.Rows))
json := qr.Rows[0][0].ToString()
Expand Down
30 changes: 15 additions & 15 deletions go/test/endtoend/vreplication/initial_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ func insertInitialData(t *testing.T) {
log.Infof("Inserting initial data")
lines, _ := os.ReadFile("unsharded_init_data.sql")
execMultipleQueries(t, vtgateConn, "product:0", string(lines))
execQueryWithDatabase(t, vtgateConn, "product:0", "insert into customer_seq(id, next_id, cache) values(0, 100, 100);")
execQueryWithDatabase(t, vtgateConn, "product:0", "insert into order_seq(id, next_id, cache) values(0, 100, 100);")
execQueryWithDatabase(t, vtgateConn, "product:0", "insert into customer_seq2(id, next_id, cache) values(0, 100, 100);")
execVtgateQuery(t, vtgateConn, "product:0", "insert into customer_seq(id, next_id, cache) values(0, 100, 100);")
execVtgateQuery(t, vtgateConn, "product:0", "insert into order_seq(id, next_id, cache) values(0, 100, 100);")
execVtgateQuery(t, vtgateConn, "product:0", "insert into customer_seq2(id, next_id, cache) values(0, 100, 100);")
log.Infof("Done inserting initial data")

waitForRowCount(t, vtgateConn, "product:0", "product", 2)
Expand All @@ -52,12 +52,12 @@ func insertJSONValues(t *testing.T) {
// insert null value combinations
vtgateConn, closeConn := getVTGateConn()
defer closeConn()
execQueryWithDatabase(t, vtgateConn, "product:0", "insert into json_tbl(id, j3) values(1, \"{}\")")
execQueryWithDatabase(t, vtgateConn, "product:0", "insert into json_tbl(id, j1, j3) values(2, \"{}\", \"{}\")")
execQueryWithDatabase(t, vtgateConn, "product:0", "insert into json_tbl(id, j2, j3) values(3, \"{}\", \"{}\")")
execQueryWithDatabase(t, vtgateConn, "product:0", "insert into json_tbl(id, j1, j2, j3) values(4, NULL, 'null', '\"null\"')")
execQueryWithDatabase(t, vtgateConn, "product:0", "insert into json_tbl(id, j3) values(5, JSON_QUOTE('null'))")
execQueryWithDatabase(t, vtgateConn, "product:0", "insert into json_tbl(id, j3) values(6, '{}')")
execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j3) values(1, \"{}\")")
execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j1, j3) values(2, \"{}\", \"{}\")")
execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j2, j3) values(3, \"{}\", \"{}\")")
execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j1, j2, j3) values(4, NULL, 'null', '\"null\"')")
execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j3) values(5, JSON_QUOTE('null'))")
execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j3) values(6, '{}')")

id := 8 // 6 inserted above and one after copy phase is done

Expand All @@ -68,7 +68,7 @@ func insertJSONValues(t *testing.T) {
j1 := rand.IntN(numJsonValues)
j2 := rand.IntN(numJsonValues)
query := fmt.Sprintf(q, id, jsonValues[j1], jsonValues[j2])
execQueryWithDatabase(t, vtgateConn, "product:0", query)
execVtgateQuery(t, vtgateConn, "product:0", query)
}
}

Expand Down Expand Up @@ -97,28 +97,28 @@ func insertMoreCustomers(t *testing.T, numCustomers int) {
}
cid++
}
execQueryWithDatabase(t, vtgateConn, "customer", sql)
execVtgateQuery(t, vtgateConn, "customer", sql)
}

func insertMoreProducts(t *testing.T) {
vtgateConn, closeConn := getVTGateConn()
defer closeConn()
sql := "insert into product(pid, description) values(3, 'cpu'),(4, 'camera'),(5, 'mouse');"
execQueryWithDatabase(t, vtgateConn, "product", sql)
execVtgateQuery(t, vtgateConn, "product", sql)
}

func insertMoreProductsForSourceThrottler(t *testing.T) {
vtgateConn, closeConn := getVTGateConn()
defer closeConn()
sql := "insert into product(pid, description) values(103, 'new-cpu'),(104, 'new-camera'),(105, 'new-mouse');"
execQueryWithDatabase(t, vtgateConn, "product", sql)
execVtgateQuery(t, vtgateConn, "product", sql)
}

func insertMoreProductsForTargetThrottler(t *testing.T) {
vtgateConn, closeConn := getVTGateConn()
defer closeConn()
sql := "insert into product(pid, description) values(203, 'new-cpu'),(204, 'new-camera'),(205, 'new-mouse');"
execQueryWithDatabase(t, vtgateConn, "product", sql)
execVtgateQuery(t, vtgateConn, "product", sql)
}

var blobTableQueries = []string{
Expand All @@ -137,6 +137,6 @@ func insertIntoBlobTable(t *testing.T) {
vtgateConn, closeConn := getVTGateConn()
defer closeConn()
for _, query := range blobTableQueries {
execQueryWithDatabase(t, vtgateConn, "product:0", query)
execVtgateQuery(t, vtgateConn, "product:0", query)
}
}
2 changes: 1 addition & 1 deletion go/test/endtoend/vreplication/materialize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func testMaterialize(t *testing.T, useVtctldClient bool) {
waitForQueryResult(t, vtgateConn, targetKs, "select id, val, ts, day, month, x from mat2", want)

// insert data to test the replication phase
execQueryWithDatabase(t, vtgateConn, sourceKs, "insert into mat(id, val, ts) values (3, 'ghi', '2021-12-11 16:17:36')")
execVtgateQuery(t, vtgateConn, sourceKs, "insert into mat(id, val, ts) values (3, 'ghi', '2021-12-11 16:17:36')")

// validate data after the replication phase
waitForQueryResult(t, vtgateConn, targetKs, "select count(*) from mat2", "[[INT64(3)]]")
Expand Down
22 changes: 11 additions & 11 deletions go/test/endtoend/vreplication/migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ import (
func insertInitialDataIntoExternalCluster(t *testing.T, conn *mysql.Conn) {
t.Run("insertInitialData", func(t *testing.T) {
fmt.Printf("Inserting initial data\n")
execQueryWithDatabase(t, conn, "rating:0", "insert into review(rid, pid, review) values(1, 1, 'review1');")
execQueryWithDatabase(t, conn, "rating:0", "insert into review(rid, pid, review) values(2, 1, 'review2');")
execQueryWithDatabase(t, conn, "rating:0", "insert into review(rid, pid, review) values(3, 2, 'review3');")
execQueryWithDatabase(t, conn, "rating:0", "insert into rating(gid, pid, rating) values(1, 1, 4);")
execQueryWithDatabase(t, conn, "rating:0", "insert into rating(gid, pid, rating) values(2, 2, 5);")
execVtgateQuery(t, conn, "rating:0", "insert into review(rid, pid, review) values(1, 1, 'review1');")
execVtgateQuery(t, conn, "rating:0", "insert into review(rid, pid, review) values(2, 1, 'review2');")
execVtgateQuery(t, conn, "rating:0", "insert into review(rid, pid, review) values(3, 2, 'review3');")
execVtgateQuery(t, conn, "rating:0", "insert into rating(gid, pid, rating) values(1, 1, 4);")
execVtgateQuery(t, conn, "rating:0", "insert into rating(gid, pid, rating) values(2, 2, 5);")
})
}

Expand Down Expand Up @@ -109,8 +109,8 @@ func TestVtctlMigrate(t *testing.T) {
expectNumberOfStreams(t, vtgateConn, "migrate", "e1", "product:0", 1)
waitForRowCount(t, vtgateConn, "product:0", "rating", 2)
waitForRowCount(t, vtgateConn, "product:0", "review", 3)
execQueryWithDatabase(t, extVtgateConn, "rating", "insert into review(rid, pid, review) values(4, 1, 'review4');")
execQueryWithDatabase(t, extVtgateConn, "rating", "insert into rating(gid, pid, rating) values(3, 1, 3);")
execVtgateQuery(t, extVtgateConn, "rating", "insert into review(rid, pid, review) values(4, 1, 'review4');")
execVtgateQuery(t, extVtgateConn, "rating", "insert into rating(gid, pid, rating) values(3, 1, 3);")
waitForRowCount(t, vtgateConn, "product:0", "rating", 3)
waitForRowCount(t, vtgateConn, "product:0", "review", 4)
vdiffSideBySide(t, ksWorkflow, "extcell1")
Expand All @@ -122,7 +122,7 @@ func TestVtctlMigrate(t *testing.T) {
expectNumberOfStreams(t, vtgateConn, "migrate", "e1", "product:0", 0)
})
t.Run("cancel migrate workflow", func(t *testing.T) {
execQueryWithDatabase(t, vtgateConn, "product", "drop table review,rating")
execVtgateQuery(t, vtgateConn, "product", "drop table review,rating")

if output, err = vc.VtctlClient.ExecuteCommandWithOutput("Migrate", "--", "--all", "--auto_start=false", "--cells=extcell1",
"--source=ext1.rating", "create", ksWorkflow); err != nil {
Expand Down Expand Up @@ -234,8 +234,8 @@ func TestVtctldMigrate(t *testing.T) {
expectNumberOfStreams(t, vtgateConn, "migrate", "e1", "product:0", 1)
waitForRowCount(t, vtgateConn, "product:0", "rating", 2)
waitForRowCount(t, vtgateConn, "product:0", "review", 3)
execQueryWithDatabase(t, extVtgateConn, "rating", "insert into review(rid, pid, review) values(4, 1, 'review4');")
execQueryWithDatabase(t, extVtgateConn, "rating", "insert into rating(gid, pid, rating) values(3, 1, 3);")
execVtgateQuery(t, extVtgateConn, "rating", "insert into review(rid, pid, review) values(4, 1, 'review4');")
execVtgateQuery(t, extVtgateConn, "rating", "insert into rating(gid, pid, rating) values(3, 1, 3);")
waitForRowCount(t, vtgateConn, "product:0", "rating", 3)
waitForRowCount(t, vtgateConn, "product:0", "review", 4)
vdiffSideBySide(t, ksWorkflow, "extcell1")
Expand All @@ -261,7 +261,7 @@ func TestVtctldMigrate(t *testing.T) {
expectNumberOfStreams(t, vtgateConn, "migrate", "e1", "product:0", 0)
})
t.Run("cancel migrate workflow", func(t *testing.T) {
execQueryWithDatabase(t, vtgateConn, "product", "drop table review,rating")
execVtgateQuery(t, vtgateConn, "product", "drop table review,rating")
output, err = vc.VtctldClient.ExecuteCommandWithOutput("Migrate",
"--target-keyspace", "product", "--workflow", "e1", "Create", "--source-keyspace", "rating",
"--mount-name", "ext1", "--all-tables", "--auto-start=false", "--cells=extcell1")
Expand Down
6 changes: 3 additions & 3 deletions go/test/endtoend/vreplication/partial_movetables_seq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ var lastCustomerId int64
func getCustomerCount(t *testing.T, msg string) int64 {
vtgateConn, closeConn := getVTGateConn()
defer closeConn()
qr := execQueryWithDatabase(t, vtgateConn, "", "select count(*) from customer")
qr := execVtgateQuery(t, vtgateConn, "", "select count(*) from customer")
require.NotNil(t, qr)
count, err := qr.Rows[0][0].ToInt64()
require.NoError(t, err)
Expand All @@ -542,7 +542,7 @@ func getCustomerCount(t *testing.T, msg string) int64 {
func confirmLastCustomerIdHasIncreased(t *testing.T) {
vtgateConn, closeConn := getVTGateConn()
defer closeConn()
qr := execQueryWithDatabase(t, vtgateConn, "", "select cid from customer order by cid desc limit 1")
qr := execVtgateQuery(t, vtgateConn, "", "select cid from customer order by cid desc limit 1")
require.NotNil(t, qr)
currentCustomerId, err := qr.Rows[0][0].ToInt64()
require.NoError(t, err)
Expand All @@ -554,7 +554,7 @@ func insertCustomers(t *testing.T) {
vtgateConn, closeConn := getVTGateConn()
defer closeConn()
for i := int64(1); i < newCustomerCount+1; i++ {
execQueryWithDatabase(t, vtgateConn, "customer@primary", fmt.Sprintf("insert into customer(name) values ('name-%d')", currentCustomerCount+i))
execVtgateQuery(t, vtgateConn, "customer@primary", fmt.Sprintf("insert into customer(name) values ('name-%d')", currentCustomerCount+i))
}
customerCount = getCustomerCount(t, "")
require.Equal(t, currentCustomerCount+newCustomerCount, customerCount)
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/vreplication/reference_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func TestReferenceTableMaterializationAndRouting(t *testing.T) {
execRefQuery(t, "update sks.mfg2 set name = concat(name, '-updated') where id = 4")

waitForRowCount(t, vtgateConn, uks, "mfg", 8)
qr := execQueryWithDatabase(t, vtgateConn, "uks", "select count(*) from uks.mfg where name like '%updated%'")
qr := execVtgateQuery(t, vtgateConn, "uks", "select count(*) from uks.mfg where name like '%updated%'")
require.NotNil(t, qr)
require.Equal(t, "4", qr.Rows[0][0].ToString())

Expand Down
Loading

0 comments on commit 4fc0037

Please sign in to comment.