Skip to content

Commit

Permalink
Atomic Transactions handling with Online DDL (#16585)
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <harshit@planetscale.com>
Signed-off-by: Manan Gupta <manan@planetscale.com>
Co-authored-by: Manan Gupta <manan@planetscale.com>
  • Loading branch information
harshit-gangal and GuptaManan100 authored Aug 28, 2024
1 parent 28d831b commit 773a216
Show file tree
Hide file tree
Showing 21 changed files with 490 additions and 103 deletions.
14 changes: 14 additions & 0 deletions doc/design-docs/AtomicTransactionsWithDisruptions.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,17 @@ Therefore, the safest option was to always check if we need to redo the prepared

When Vttabet restarts, all the previous connections are dropped. It starts in a non-serving state, and then after reading the shard and tablet records from the topo, it transitions to a serving state.
As part of this transition we need to ensure that we redo the prepared transactions before we start accepting any writes. This is done as part of the `TxEngine.transition` function when we transition to an `AcceptingReadWrite` state. We call the same code for redoing the prepared transactions that we called for MySQL restarts, PRS and ERS.

## Online DDL

During an Online DDL cutover, we need to ensure that all the prepared transactions on the online DDL table needs to be completed before we can proceed with the cutover.
This is because the cutover involves a schema change and we cannot have any prepared transactions that are dependent on the old schema.

As part of the cut-over process, Online DDL adds query rules to buffer new queries on the table.
It then checks for any open prepared transaction on the table and waits for up to 100ms if found, then checks again.
If it finds no prepared transaction of the table, it moves forward with the cut-over, otherwise it fails. The Online DDL mechanism will later retry the cut-over.

In the Prepare code, we check the query rules before adding the transaction to the prepared list and re-check the rules before storing the transaction logs in the transaction redo table.
Any transaction that went past the first check will fail the second check if the cutover proceeds.

The check on both sides prevents either the cutover from proceeding or the transaction from being prepared.
48 changes: 36 additions & 12 deletions go/test/endtoend/transaction/twopc/stress/fuzzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ import (

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/syscallutil"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
)

var (
Expand Down Expand Up @@ -78,7 +80,7 @@ func TestTwoPCFuzzTest(t *testing.T) {
threads int
updateSets int
timeForTesting time.Duration
clusterDisruptions []func()
clusterDisruptions []func(t *testing.T)
disruptionProbability []int
}{
{
Expand All @@ -100,20 +102,20 @@ func TestTwoPCFuzzTest(t *testing.T) {
timeForTesting: 5 * time.Second,
},
{
name: "Multiple Threads - Multiple Set - PRS, ERS, and MySQL and Vttablet restart disruptions",
name: "Multiple Threads - Multiple Set - PRS, ERS, and MySQL & Vttablet restart, OnlineDDL disruptions",
threads: 15,
updateSets: 15,
timeForTesting: 5 * time.Second,
clusterDisruptions: []func(){prs, ers, mysqlRestarts, vttabletRestarts},
disruptionProbability: []int{5, 5, 5, 5},
clusterDisruptions: []func(t *testing.T){prs, ers, mysqlRestarts, vttabletRestarts, onlineDDLFuzzer},
disruptionProbability: []int{5, 5, 5, 5, 5},
},
}

for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
conn, closer := start(t)
defer closer()
fz := newFuzzer(tt.threads, tt.updateSets, tt.clusterDisruptions, tt.disruptionProbability)
fz := newFuzzer(t, tt.threads, tt.updateSets, tt.clusterDisruptions, tt.disruptionProbability)

fz.initialize(t, conn)
conn.Close()
Expand Down Expand Up @@ -190,6 +192,7 @@ func getThreadIDsForUpdateSetFromFuzzInsert(t *testing.T, conn *mysql.Conn, upda
type fuzzer struct {
threads int
updateSets int
t *testing.T

// shouldStop is an internal state variable, that tells the fuzzer
// whether it should stop or not.
Expand All @@ -199,14 +202,15 @@ type fuzzer struct {
// updateRowVals are the rows that we use to ensure 1 update on each shard with the same increment.
updateRowsVals [][]int
// clusterDisruptions are the cluster level disruptions that can happen in a running cluster.
clusterDisruptions []func()
clusterDisruptions []func(t *testing.T)
// disruptionProbability is the chance for the disruption to happen. We check this every 100 milliseconds.
disruptionProbability []int
}

// newFuzzer creates a new fuzzer struct.
func newFuzzer(threads int, updateSets int, clusterDisruptions []func(), disruptionProbability []int) *fuzzer {
func newFuzzer(t *testing.T, threads int, updateSets int, clusterDisruptions []func(t *testing.T), disruptionProbability []int) *fuzzer {
fz := &fuzzer{
t: t,
threads: threads,
updateSets: updateSets,
wg: sync.WaitGroup{},
Expand Down Expand Up @@ -364,7 +368,7 @@ func (fz *fuzzer) runClusterDisruptionThread(t *testing.T) {
func (fz *fuzzer) runClusterDisruption(t *testing.T) {
for idx, prob := range fz.disruptionProbability {
if rand.Intn(100) < prob {
fz.clusterDisruptions[idx]()
fz.clusterDisruptions[idx](fz.t)
return
}
}
Expand All @@ -374,7 +378,7 @@ func (fz *fuzzer) runClusterDisruption(t *testing.T) {
Cluster Level Disruptions for the fuzzer
*/

func prs() {
func prs(t *testing.T) {
shards := clusterInstance.Keyspaces[0].Shards
shard := shards[rand.Intn(len(shards))]
vttablets := shard.Vttablets
Expand All @@ -386,7 +390,7 @@ func prs() {
}
}

func ers() {
func ers(t *testing.T) {
shards := clusterInstance.Keyspaces[0].Shards
shard := shards[rand.Intn(len(shards))]
vttablets := shard.Vttablets
Expand All @@ -398,7 +402,7 @@ func ers() {
}
}

func vttabletRestarts() {
func vttabletRestarts(t *testing.T) {
shards := clusterInstance.Keyspaces[0].Shards
shard := shards[rand.Intn(len(shards))]
vttablets := shard.Vttablets
Expand All @@ -422,7 +426,27 @@ func vttabletRestarts() {
}
}

func mysqlRestarts() {
var orderedDDLFuzzer = []string{
"alter table twopc_fuzzer_insert add column extra_col1 varchar(20)",
"alter table twopc_fuzzer_insert add column extra_col2 varchar(20)",
"alter table twopc_fuzzer_insert drop column extra_col1",
"alter table twopc_fuzzer_insert drop column extra_col2",
}

// onlineDDLFuzzer runs an online DDL statement while ignoring any errors for the fuzzer.
func onlineDDLFuzzer(t *testing.T) {
output, err := clusterInstance.VtctldClientProcess.ApplySchemaWithOutput(keyspaceName, orderedDDLFuzzer[count%len(orderedDDLFuzzer)], cluster.ApplySchemaParams{
DDLStrategy: "vitess --force-cut-over-after=1ms",
})
count++
if err != nil {
return
}
fmt.Println("Running online DDL with uuid: ", output)
WaitForMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
}

func mysqlRestarts(t *testing.T) {
shards := clusterInstance.Keyspaces[0].Shards
shard := shards[rand.Intn(len(shards))]
vttablets := shard.Vttablets
Expand Down
1 change: 1 addition & 0 deletions go/test/endtoend/transaction/twopc/stress/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func TestMain(m *testing.M) {
clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs,
"--twopc_enable",
"--twopc_abandon_age", "1",
"--migration_check_interval", "2s",
)

// Start keyspace
Expand Down
106 changes: 95 additions & 11 deletions go/test/endtoend/transaction/twopc/stress/stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,25 @@ import (
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/syscallutil"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/onlineddl"
twopcutil "vitess.io/vitess/go/test/endtoend/transaction/twopc/utils"
"vitess.io/vitess/go/test/endtoend/utils"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
)

// TestDisruptions tests that atomic transactions persevere through various disruptions.
func TestDisruptions(t *testing.T) {
testcases := []struct {
disruptionName string
commitDelayTime string
disruption func() error
disruption func(t *testing.T) error
}{
{
disruptionName: "No Disruption",
commitDelayTime: "1",
disruption: func() error {
disruption: func(t *testing.T) error {
return nil
},
},
Expand All @@ -68,6 +71,11 @@ func TestDisruptions(t *testing.T) {
commitDelayTime: "5",
disruption: vttabletRestartShard3,
},
{
disruptionName: "OnlineDDL",
commitDelayTime: "20",
disruption: onlineDDL,
},
{
disruptionName: "EmergencyReparentShard",
commitDelayTime: "5",
Expand Down Expand Up @@ -119,7 +127,7 @@ func TestDisruptions(t *testing.T) {
}()
}
// Run the disruption.
err := tt.disruption()
err := tt.disruption(t)
require.NoError(t, err)
// Wait for the commit to have returned. We don't actually check for an error in the commit because the user might receive an error.
// But since we are waiting in CommitPrepared, the decision to commit the transaction should have already been taken.
Expand All @@ -145,6 +153,7 @@ func threadToWrite(t *testing.T, ctx context.Context, id int) {
continue
}
_, _ = utils.ExecAllowError(t, conn, fmt.Sprintf("insert into twopc_t1(id, col) values(%d, %d)", id, rand.Intn(10000)))
conn.Close()
}
}

Expand All @@ -170,11 +179,13 @@ func waitForResults(t *testing.T, query string, resultExpected string, waitTime
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
if err == nil {
res := utils.Exec(t, conn, query)
res, _ := utils.ExecAllowError(t, conn, query)
conn.Close()
prevRes = res.Rows
if fmt.Sprintf("%v", res.Rows) == resultExpected {
return
if res != nil {
prevRes = res.Rows
if fmt.Sprintf("%v", res.Rows) == resultExpected {
return
}
}
}
time.Sleep(100 * time.Millisecond)
Expand All @@ -187,29 +198,29 @@ Cluster Level Disruptions for the fuzzer
*/

// prsShard3 runs a PRS in shard 3 of the keyspace. It promotes the second tablet to be the new primary.
func prsShard3() error {
func prsShard3(t *testing.T) error {
shard := clusterInstance.Keyspaces[0].Shards[2]
newPrimary := shard.Vttablets[1]
return clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspaceName, shard.Name, newPrimary.Alias)
}

// ersShard3 runs a ERS in shard 3 of the keyspace. It promotes the second tablet to be the new primary.
func ersShard3() error {
func ersShard3(t *testing.T) error {
shard := clusterInstance.Keyspaces[0].Shards[2]
newPrimary := shard.Vttablets[1]
_, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("EmergencyReparentShard", fmt.Sprintf("%s/%s", keyspaceName, shard.Name), "--new-primary", newPrimary.Alias)
return err
}

// vttabletRestartShard3 restarts the first vttablet of the third shard.
func vttabletRestartShard3() error {
func vttabletRestartShard3(t *testing.T) error {
shard := clusterInstance.Keyspaces[0].Shards[2]
tablet := shard.Vttablets[0]
return tablet.RestartOnlyTablet()
}

// mysqlRestartShard3 restarts MySQL on the first tablet of the third shard.
func mysqlRestartShard3() error {
func mysqlRestartShard3(t *testing.T) error {
shard := clusterInstance.Keyspaces[0].Shards[2]
vttablets := shard.Vttablets
tablet := vttablets[0]
Expand All @@ -227,3 +238,76 @@ func mysqlRestartShard3() error {
}
return syscallutil.Kill(pid, syscall.SIGKILL)
}

var orderedDDL = []string{
"alter table twopc_t1 add column extra_col1 varchar(20)",
"alter table twopc_t1 add column extra_col2 varchar(20)",
"alter table twopc_t1 add column extra_col3 varchar(20)",
"alter table twopc_t1 add column extra_col4 varchar(20)",
}

var count = 0

// onlineDDL runs a DDL statement.
func onlineDDL(t *testing.T) error {
output, err := clusterInstance.VtctldClientProcess.ApplySchemaWithOutput(keyspaceName, orderedDDL[count%len(orderedDDL)], cluster.ApplySchemaParams{
DDLStrategy: "vitess --force-cut-over-after=1ms",
})
require.NoError(t, err)
count++
fmt.Println("uuid: ", output)
status := WaitForMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
onlineddl.CheckMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), status)
require.Equal(t, schema.OnlineDDLStatusComplete, status)
return nil
}

func WaitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, timeout time.Duration, expectStatuses ...schema.OnlineDDLStatus) schema.OnlineDDLStatus {
shardNames := map[string]bool{}
for _, shard := range shards {
shardNames[shard.Name] = true
}
query := fmt.Sprintf("show vitess_migrations like '%s'", uuid)

statusesMap := map[string]bool{}
for _, status := range expectStatuses {
statusesMap[string(status)] = true
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

lastKnownStatus := ""
for {
countMatchedShards := 0
conn, err := mysql.Connect(ctx, vtParams)
if err != nil {
continue
}
r, err := utils.ExecAllowError(t, conn, query)
conn.Close()
if err != nil {
continue
}
for _, row := range r.Named().Rows {
shardName := row["shard"].ToString()
if !shardNames[shardName] {
// irrelevant shard
continue
}
lastKnownStatus = row["migration_status"].ToString()
if row["migration_uuid"].ToString() == uuid && statusesMap[lastKnownStatus] {
countMatchedShards++
}
}
if countMatchedShards == len(shards) {
return schema.OnlineDDLStatus(lastKnownStatus)
}
select {
case <-ctx.Done():
return schema.OnlineDDLStatus(lastKnownStatus)
case <-ticker.C:
}
}
}
2 changes: 1 addition & 1 deletion go/test/endtoend/transaction/twopc/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ func ClearOutTable(t *testing.T, vtParams mysql.ConnParams, tableName string) {
return
}
_, err = conn.ExecuteFetch(fmt.Sprintf("DELETE FROM %v LIMIT 10000", tableName), 10000, false)
conn.Close()
if err != nil {
fmt.Printf("Error in cleanup deletion - %v\n", err)
conn.Close()
time.Sleep(100 * time.Millisecond)
continue
}
Expand Down
21 changes: 21 additions & 0 deletions go/vt/sqlparser/ast_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2818,3 +2818,24 @@ func (lock Lock) GetHighestOrderLock(newLock Lock) Lock {
func Clone[K SQLNode](x K) K {
return CloneSQLNode(x).(K)
}

// ExtractAllTables returns all the table names in the SQLNode as slice of string
func ExtractAllTables(stmt Statement) []string {
var tables []string
tableMap := make(map[string]any)
_ = Walk(func(node SQLNode) (kontinue bool, err error) {
switch node := node.(type) {
case *AliasedTableExpr:
if tblName, ok := node.Expr.(TableName); ok {
name := String(tblName)
if _, exists := tableMap[name]; !exists {
tableMap[name] = nil
tables = append(tables, name)
}
return false, nil
}
}
return true, nil
}, stmt)
return tables
}
Loading

0 comments on commit 773a216

Please sign in to comment.