diff --git a/go/test/endtoend/transaction/twopc/stress/fuzzer_test.go b/go/test/endtoend/transaction/twopc/fuzz/fuzzer_test.go similarity index 96% rename from go/test/endtoend/transaction/twopc/stress/fuzzer_test.go rename to go/test/endtoend/transaction/twopc/fuzz/fuzzer_test.go index 596b54d55e3..5bbb484ec1e 100644 --- a/go/test/endtoend/transaction/twopc/stress/fuzzer_test.go +++ b/go/test/endtoend/transaction/twopc/fuzz/fuzzer_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package stress +package fuzz import ( "context" @@ -36,6 +36,7 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/syscallutil" "vitess.io/vitess/go/test/endtoend/cluster" + twopcutil "vitess.io/vitess/go/test/endtoend/transaction/twopc/utils" "vitess.io/vitess/go/vt/log" topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/schema" @@ -131,7 +132,7 @@ func TestTwoPCFuzzTest(t *testing.T) { fz.stop() // Wait for all transactions to be resolved. - waitForResults(t, fmt.Sprintf(`show unresolved transactions for %v`, keyspaceName), "[]", 30*time.Second) + twopcutil.WaitForResults(t, &vtParams, fmt.Sprintf(`show unresolved transactions for %v`, keyspaceName), "[]", 30*time.Second) // Verify that all the transactions run were actually atomic and no data issues have occurred. fz.verifyTransactionsWereAtomic(t) @@ -428,12 +429,16 @@ func vttabletRestarts(t *testing.T) { } } -var orderedDDLFuzzer = []string{ - "alter table twopc_fuzzer_insert add column extra_col1 varchar(20)", - "alter table twopc_fuzzer_insert add column extra_col2 varchar(20)", - "alter table twopc_fuzzer_insert drop column extra_col1", - "alter table twopc_fuzzer_insert drop column extra_col2", -} +var ( + count = 0 + + orderedDDLFuzzer = []string{ + "alter table twopc_fuzzer_insert add column extra_col1 varchar(20)", + "alter table twopc_fuzzer_insert add column extra_col2 varchar(20)", + "alter table twopc_fuzzer_insert drop column extra_col1", + "alter table twopc_fuzzer_insert drop column extra_col2", + } +) // onlineDDLFuzzer runs an online DDL statement while ignoring any errors for the fuzzer. func onlineDDLFuzzer(t *testing.T) { @@ -445,7 +450,8 @@ func onlineDDLFuzzer(t *testing.T) { return } fmt.Println("Running online DDL with uuid: ", output) - waitForMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + twopcutil.WaitForMigrationStatus(t, &vtParams, keyspaceName, clusterInstance.Keyspaces[0].Shards, + strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) } var moveTablesCount int diff --git a/go/test/endtoend/transaction/twopc/fuzz/main_test.go b/go/test/endtoend/transaction/twopc/fuzz/main_test.go new file mode 100644 index 00000000000..86e524e648f --- /dev/null +++ b/go/test/endtoend/transaction/twopc/fuzz/main_test.go @@ -0,0 +1,129 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fuzz + +import ( + "context" + _ "embed" + "flag" + "fmt" + "os" + "testing" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/test/endtoend/transaction/twopc/utils" +) + +var ( + clusterInstance *cluster.LocalProcessCluster + vtParams mysql.ConnParams + vtgateGrpcAddress string + keyspaceName = "ks" + unshardedKeyspaceName = "uks" + cell = "zone1" + hostname = "localhost" + + //go:embed schema.sql + SchemaSQL string + + //go:embed vschema.json + VSchema string +) + +func TestMain(m *testing.M) { + defer cluster.PanicHandler(nil) + flag.Parse() + + exitcode := func() int { + clusterInstance = cluster.NewCluster(cell, hostname) + defer clusterInstance.Teardown() + + // Start topo server + if err := clusterInstance.StartTopo(); err != nil { + return 1 + } + + // Reserve vtGate port in order to pass it to vtTablet + clusterInstance.VtgateGrpcPort = clusterInstance.GetAndReservePort() + + // Set extra args for twopc + clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs, + "--transaction_mode", "TWOPC", + "--grpc_use_effective_callerid", + ) + clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs, + "--twopc_enable", + "--twopc_abandon_age", "1", + "--migration_check_interval", "2s", + ) + + // Start keyspace + keyspace := &cluster.Keyspace{ + Name: keyspaceName, + SchemaSQL: SchemaSQL, + VSchema: VSchema, + DurabilityPolicy: "semi_sync", + } + if err := clusterInstance.StartKeyspace(*keyspace, []string{"-40", "40-80", "80-"}, 2, false); err != nil { + return 1 + } + + // Start an unsharded keyspace + unshardedKeyspace := &cluster.Keyspace{ + Name: unshardedKeyspaceName, + SchemaSQL: "", + VSchema: "{}", + DurabilityPolicy: "semi_sync", + } + if err := clusterInstance.StartUnshardedKeyspace(*unshardedKeyspace, 2, false); err != nil { + return 1 + } + + // Start Vtgate + if err := clusterInstance.StartVtgate(); err != nil { + return 1 + } + vtParams = clusterInstance.GetVTParams("") + vtgateGrpcAddress = fmt.Sprintf("%s:%d", clusterInstance.Hostname, clusterInstance.VtgateGrpcPort) + + return m.Run() + }() + os.Exit(exitcode) +} + +func start(t *testing.T) (*mysql.Conn, func()) { + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + require.NoError(t, err) + cleanup(t) + + return conn, func() { + conn.Close() + cleanup(t) + } +} + +func cleanup(t *testing.T) { + cluster.PanicHandler(t) + + utils.ClearOutTable(t, vtParams, "twopc_fuzzer_insert") + utils.ClearOutTable(t, vtParams, "twopc_fuzzer_update") + utils.ClearOutTable(t, vtParams, "twopc_t1") +} diff --git a/go/test/endtoend/transaction/twopc/fuzz/schema.sql b/go/test/endtoend/transaction/twopc/fuzz/schema.sql new file mode 100644 index 00000000000..5173166bfd4 --- /dev/null +++ b/go/test/endtoend/transaction/twopc/fuzz/schema.sql @@ -0,0 +1,20 @@ +create table twopc_fuzzer_update ( + id bigint, + col bigint, + primary key (id) +) Engine=InnoDB; + +create table twopc_fuzzer_insert ( + id bigint, + updateSet bigint, + threadId bigint, + col bigint auto_increment, + key(col), + primary key (id, col) +) Engine=InnoDB; + +create table twopc_t1 ( + id bigint, + col bigint, + primary key (id) +) Engine=InnoDB; diff --git a/go/test/endtoend/transaction/twopc/fuzz/vschema.json b/go/test/endtoend/transaction/twopc/fuzz/vschema.json new file mode 100644 index 00000000000..415b5958f54 --- /dev/null +++ b/go/test/endtoend/transaction/twopc/fuzz/vschema.json @@ -0,0 +1,34 @@ +{ + "sharded":true, + "vindexes": { + "reverse_bits": { + "type": "reverse_bits" + } + }, + "tables": { + "twopc_fuzzer_update": { + "column_vindexes": [ + { + "column": "id", + "name": "reverse_bits" + } + ] + }, + "twopc_fuzzer_insert": { + "column_vindexes": [ + { + "column": "id", + "name": "reverse_bits" + } + ] + }, + "twopc_t1": { + "column_vindexes": [ + { + "column": "id", + "name": "reverse_bits" + } + ] + } + } +} \ No newline at end of file diff --git a/go/test/endtoend/transaction/twopc/stress/stress_test.go b/go/test/endtoend/transaction/twopc/stress/stress_test.go index f9f46ab4b40..819e7ea48d3 100644 --- a/go/test/endtoend/transaction/twopc/stress/stress_test.go +++ b/go/test/endtoend/transaction/twopc/stress/stress_test.go @@ -32,7 +32,6 @@ import ( "golang.org/x/exp/rand" "vitess.io/vitess/go/mysql" - "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/syscallutil" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/onlineddl" @@ -146,7 +145,7 @@ func TestDisruptions(t *testing.T) { // But since we are waiting in CommitPrepared, the decision to commit the transaction should have already been taken. wg.Wait() // Check the data in the table. - waitForResults(t, "select id, col from twopc_t1 where col = 4 order by id", `[[INT64(4) INT64(4)] [INT64(6) INT64(4)] [INT64(9) INT64(4)]]`, 30*time.Second) + twopcutil.WaitForResults(t, &vtParams, "select id, col from twopc_t1 where col = 4 order by id", `[[INT64(4) INT64(4)] [INT64(6) INT64(4)] [INT64(9) INT64(4)]]`, 30*time.Second) writeCancel() writerWg.Wait() @@ -184,32 +183,6 @@ func reparentToFirstTablet(t *testing.T) { } } -// waitForResults waits for the results of the query to be as expected. -func waitForResults(t *testing.T, query string, resultExpected string, waitTime time.Duration) { - timeout := time.After(waitTime) - var prevRes []sqltypes.Row - for { - select { - case <-timeout: - t.Fatalf("didn't reach expected results for %s. Last results - %v", query, prevRes) - default: - ctx := context.Background() - conn, err := mysql.Connect(ctx, &vtParams) - if err == nil { - res, _ := utils.ExecAllowError(t, conn, query) - conn.Close() - if res != nil { - prevRes = res.Rows - if fmt.Sprintf("%v", res.Rows) == resultExpected { - return - } - } - } - time.Sleep(100 * time.Millisecond) - } - } -} - /* Cluster Level Disruptions for the fuzzer */ @@ -328,58 +301,9 @@ func onlineDDL(t *testing.T) error { require.NoError(t, err) count++ fmt.Println("uuid: ", output) - status := waitForMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + status := twopcutil.WaitForMigrationStatus(t, &vtParams, keyspaceName, clusterInstance.Keyspaces[0].Shards, + strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) onlineddl.CheckMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), status) require.Equal(t, schema.OnlineDDLStatusComplete, status) return nil } - -func waitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, timeout time.Duration, expectStatuses ...schema.OnlineDDLStatus) schema.OnlineDDLStatus { - shardNames := map[string]bool{} - for _, shard := range shards { - shardNames[shard.Name] = true - } - query := fmt.Sprintf("show vitess_migrations from %s like '%s'", keyspaceName, uuid) - - statusesMap := map[string]bool{} - for _, status := range expectStatuses { - statusesMap[string(status)] = true - } - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - - lastKnownStatus := "" - for { - select { - case <-ctx.Done(): - return schema.OnlineDDLStatus(lastKnownStatus) - case <-ticker.C: - } - countMatchedShards := 0 - conn, err := mysql.Connect(ctx, vtParams) - if err != nil { - continue - } - r, err := utils.ExecAllowError(t, conn, query) - conn.Close() - if err != nil { - continue - } - for _, row := range r.Named().Rows { - shardName := row["shard"].ToString() - if !shardNames[shardName] { - // irrelevant shard - continue - } - lastKnownStatus = row["migration_status"].ToString() - if row["migration_uuid"].ToString() == uuid && statusesMap[lastKnownStatus] { - countMatchedShards++ - } - } - if countMatchedShards == len(shards) { - return schema.OnlineDDLStatus(lastKnownStatus) - } - } -} diff --git a/go/test/endtoend/transaction/twopc/utils/utils.go b/go/test/endtoend/transaction/twopc/utils/utils.go index 02c09a796df..ecc91245a67 100644 --- a/go/test/endtoend/transaction/twopc/utils/utils.go +++ b/go/test/endtoend/transaction/twopc/utils/utils.go @@ -27,7 +27,11 @@ import ( "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/test/endtoend/utils" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/schema" ) const ( @@ -90,3 +94,79 @@ func WriteTestCommunicationFile(t *testing.T, fileName string, content string) { func DeleteFile(fileName string) { _ = os.Remove(path.Join(os.Getenv("VTDATAROOT"), fileName)) } + +// WaitForResults waits for the results of the query to be as expected. +func WaitForResults(t *testing.T, vtParams *mysql.ConnParams, query string, resultExpected string, waitTime time.Duration) { + timeout := time.After(waitTime) + var prevRes []sqltypes.Row + for { + select { + case <-timeout: + t.Fatalf("didn't reach expected results for %s. Last results - %v", query, prevRes) + default: + ctx := context.Background() + conn, err := mysql.Connect(ctx, vtParams) + if err == nil { + res, _ := utils.ExecAllowError(t, conn, query) + conn.Close() + if res != nil { + prevRes = res.Rows + if fmt.Sprintf("%v", res.Rows) == resultExpected { + return + } + } + } + time.Sleep(100 * time.Millisecond) + } + } +} + +func WaitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, ks string, shards []cluster.Shard, uuid string, timeout time.Duration, expectStatuses ...schema.OnlineDDLStatus) schema.OnlineDDLStatus { + shardNames := map[string]bool{} + for _, shard := range shards { + shardNames[shard.Name] = true + } + query := fmt.Sprintf("show vitess_migrations from %s like '%s'", ks, uuid) + + statusesMap := map[string]bool{} + for _, status := range expectStatuses { + statusesMap[string(status)] = true + } + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + lastKnownStatus := "" + for { + select { + case <-ctx.Done(): + return schema.OnlineDDLStatus(lastKnownStatus) + case <-ticker.C: + } + countMatchedShards := 0 + conn, err := mysql.Connect(ctx, vtParams) + if err != nil { + continue + } + r, err := utils.ExecAllowError(t, conn, query) + conn.Close() + if err != nil { + continue + } + for _, row := range r.Named().Rows { + shardName := row["shard"].ToString() + if !shardNames[shardName] { + // irrelevant shard + continue + } + lastKnownStatus = row["migration_status"].ToString() + if row["migration_uuid"].ToString() == uuid && statusesMap[lastKnownStatus] { + countMatchedShards++ + } + } + if countMatchedShards == len(shards) { + return schema.OnlineDDLStatus(lastKnownStatus) + } + } +} diff --git a/test/config.json b/test/config.json index f1a8f1bcf74..d7abad8452b 100644 --- a/test/config.json +++ b/test/config.json @@ -851,6 +851,15 @@ "RetryMax": 1, "Tags": [] }, + "vtgate_transaction_twopc_fuzz": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/transaction/twopc/fuzz"], + "Command": [], + "Manual": false, + "Shard": "vtgate_transaction", + "RetryMax": 1, + "Tags": [] + }, "vtgate_transaction_partial_exec": { "File": "unused.go", "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/partialfailure"],