Skip to content

Commit

Permalink
Online DDL: Fail a --in-order-completion migration, if a _prior_ migr…
Browse files Browse the repository at this point in the history
…ation within the same context is 'failed' or 'cancelled' (#16071)

Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
  • Loading branch information
shlomi-noach authored Jun 6, 2024
1 parent 3fb15b0 commit b81c617
Show file tree
Hide file tree
Showing 3 changed files with 239 additions and 47 deletions.
123 changes: 117 additions & 6 deletions go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func waitForReadyToComplete(t *testing.T, uuid string, expected bool) {
case <-ticker.C:
case <-ctx.Done():
}
require.NoError(t, ctx.Err())
require.NoError(t, ctx.Err(), "waiting for ready_to_complete=%t for %v", expected, uuid)
}
}

Expand Down Expand Up @@ -422,6 +422,14 @@ func testScheduler(t *testing.T) {
assert.GreaterOrEqual(t, endTime2, endTime1)
})
}
testTableCompletionAndStartTimes := func(t *testing.T, uuid1, uuid2 string) {
// expect uuid1 to complete before uuid2
t.Run("Compare t1, t2 completion times", func(t *testing.T) {
endTime1 := testReadTimestamp(t, uuid1, "completed_timestamp")
startedTime2 := testReadTimestamp(t, uuid2, "started_timestamp")
assert.GreaterOrEqual(t, startedTime2, endTime1)
})
}
testAllowConcurrent := func(t *testing.T, name string, uuid string, expect int64) {
t.Run("verify allow_concurrent: "+name, func(t *testing.T) {
rs := onlineddl.ReadMigrations(t, &vtParams, uuid)
Expand All @@ -434,7 +442,7 @@ func testScheduler(t *testing.T) {
}

// CREATE
t.Run("CREATE TABLEs t1, t1", func(t *testing.T) {
t.Run("CREATE TABLEs t1, t2", func(t *testing.T) {
{ // The table does not exist
t1uuid = testOnlineDDLStatement(t, createParams(createT1Statement, ddlStrategy, "vtgate", "just-created", "", false))
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete)
Expand Down Expand Up @@ -1183,6 +1191,36 @@ func testScheduler(t *testing.T) {
})
})
// in-order-completion
t.Run("in-order-completion: multiple drops for nonexistent tables and views, sequential", func(t *testing.T) {
u, err := schema.CreateOnlineDDLUUID()
require.NoError(t, err)

sqls := []string{
fmt.Sprintf("drop table if exists t4_%s", u),
fmt.Sprintf("drop view if exists t1_%s", u),
fmt.Sprintf("drop table if exists t2_%s", u),
fmt.Sprintf("drop view if exists t3_%s", u),
}
sql := strings.Join(sqls, ";")
var vuuids []string
t.Run("drop multiple tables and views, in-order-completion", func(t *testing.T) {
uuidList := testOnlineDDLStatement(t, createParams(sql, ddlStrategy+" --in-order-completion", "vtctl", "", "", true)) // skip wait
vuuids = strings.Split(uuidList, "\n")
assert.Len(t, vuuids, 4)
for _, uuid := range vuuids {
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
}
})
require.Len(t, vuuids, 4)
for i := range vuuids {
if i > 0 {
testTableCompletionTimes(t, vuuids[i-1], vuuids[i])
testTableCompletionAndStartTimes(t, vuuids[i-1], vuuids[i])
}
}
})
t.Run("in-order-completion: multiple drops for nonexistent tables and views", func(t *testing.T) {
u, err := schema.CreateOnlineDDLUUID()
require.NoError(t, err)
Expand All @@ -1198,20 +1236,93 @@ func testScheduler(t *testing.T) {
t.Run("drop multiple tables and views, in-order-completion", func(t *testing.T) {
uuidList := testOnlineDDLStatement(t, createParams(sql, ddlStrategy+" --allow-concurrent --in-order-completion", "vtctl", "", "", true)) // skip wait
vuuids = strings.Split(uuidList, "\n")
assert.Equal(t, 4, len(vuuids))
assert.Len(t, vuuids, 4)
for _, uuid := range vuuids {
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
}
})
require.Equal(t, 4, len(vuuids))
require.Len(t, vuuids, 4)
for i := range vuuids {
if i > 0 {
testTableCompletionTimes(t, vuuids[i-1], vuuids[i])
}
}
})

t.Run("in-order-completion: bail out on first error", func(t *testing.T) {
u, err := schema.CreateOnlineDDLUUID()
require.NoError(t, err)

sqls := []string{
fmt.Sprintf("drop table if exists t4_%s", u),
fmt.Sprintf("drop view if exists t1_%s", u),
fmt.Sprintf("drop table t2_%s", u), // non existent
fmt.Sprintf("drop view if exists t3_%s", u),
}
sql := strings.Join(sqls, ";")
var vuuids []string
t.Run("apply schema", func(t *testing.T) {
uuidList := testOnlineDDLStatement(t, createParams(sql, ddlStrategy+" --in-order-completion", "vtctl", "", "", true)) // skip wait
vuuids = strings.Split(uuidList, "\n")
assert.Len(t, vuuids, 4)
for _, uuid := range vuuids[0:2] {
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
}
{
uuid := vuuids[2] // the failed one
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusFailed)
}
{
uuid := vuuids[3] // should consequently fail without even running
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusFailed)

rs := onlineddl.ReadMigrations(t, &vtParams, uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
message := row["message"].ToString()
require.Contains(t, message, vuuids[2]) // Indicating this migration failed due to vuuids[2] failure
}
}
})
testTableCompletionTimes(t, vuuids[0], vuuids[1])
testTableCompletionAndStartTimes(t, vuuids[0], vuuids[1])
testTableCompletionAndStartTimes(t, vuuids[1], vuuids[2])
})
t.Run("in-order-completion concurrent: bail out on first error", func(t *testing.T) {
sqls := []string{
`alter table t1_test force`,
`alter table t2_test force`,
}
sql := strings.Join(sqls, ";")
var vuuids []string
t.Run("apply schema", func(t *testing.T) {
uuidList := testOnlineDDLStatement(t, createParams(sql, ddlStrategy+" --in-order-completion --postpone-completion --allow-concurrent", "vtctl", "", "", true)) // skip wait
vuuids = strings.Split(uuidList, "\n")
assert.Len(t, vuuids, 2)
for _, uuid := range vuuids {
waitForReadyToComplete(t, uuid, true)
}
t.Run("cancel 1st migration", func(t *testing.T) {
onlineddl.CheckCancelMigration(t, &vtParams, shards, vuuids[0], true)
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, vuuids[0], normalWaitTime, schema.OnlineDDLStatusFailed, schema.OnlineDDLStatusCancelled)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, vuuids[0], schema.OnlineDDLStatusCancelled)
})
t.Run("expect 2nd migration to fail", func(t *testing.T) {
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, vuuids[1], normalWaitTime, schema.OnlineDDLStatusFailed, schema.OnlineDDLStatusCancelled)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, vuuids[1], schema.OnlineDDLStatusFailed)
})
})
})
t.Run("in-order-completion: two new views, one depends on the other", func(t *testing.T) {
u, err := schema.CreateOnlineDDLUUID()
require.NoError(t, err)
Expand All @@ -1225,14 +1336,14 @@ func testScheduler(t *testing.T) {
t.Run("create two views, expect both complete", func(t *testing.T) {
uuidList := testOnlineDDLStatement(t, createParams(sql, ddlStrategy+" --allow-concurrent --in-order-completion", "vtctl", "", "", true)) // skip wait
vuuids = strings.Split(uuidList, "\n")
assert.Equal(t, 2, len(vuuids))
assert.Len(t, vuuids, 2)
for _, uuid := range vuuids {
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
}
})
require.Equal(t, 2, len(vuuids))
require.Len(t, vuuids, 2)
testTableCompletionTimes(t, vuuids[0], vuuids[1])
})
t.Run("in-order-completion: new table column, new view depends on said column", func(t *testing.T) {
Expand Down
149 changes: 108 additions & 41 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2854,6 +2854,31 @@ func (e *Executor) getCompletedMigrationByContextAndSQL(ctx context.Context, onl
return completedUUID, nil
}

// readFailedCancelledMigrationsInContextBeforeMigration returns UUIDs for migrations that are failed/cancelled
// and are in the same context as given migration and _precede_ it chronologically (have lower `id` value)
func (e *Executor) readFailedCancelledMigrationsInContextBeforeMigration(ctx context.Context, onlineDDL *schema.OnlineDDL) (uuids []string, err error) {
if onlineDDL.MigrationContext == "" {
// only applies to migrations with an explicit context
return nil, nil
}
query, err := sqlparser.ParseAndBind(sqlSelectFailedCancelledMigrationsInContextBeforeMigration,
sqltypes.StringBindVariable(onlineDDL.MigrationContext),
sqltypes.StringBindVariable(onlineDDL.UUID),
)
if err != nil {
return nil, err
}
r, err := e.execQuery(ctx, query)
if err != nil {
return uuids, err
}
for _, row := range r.Named().Rows {
uuid := row["migration_uuid"].ToString()
uuids = append(uuids, uuid)
}
return uuids, err
}

// failMigration marks a migration as failed
func (e *Executor) failMigration(ctx context.Context, onlineDDL *schema.OnlineDDL, withError error) error {
defer e.triggerNextCheckInterval()
Expand All @@ -2865,6 +2890,23 @@ func (e *Executor) failMigration(ctx context.Context, onlineDDL *schema.OnlineDD
return withError
}

// validateInOrderMigration checks whether an in-order migration should be forced to fail, either before running or
// while running.
// This may happen if a prior migration in the same context has failed or was cancelled.
func (e *Executor) validateInOrderMigration(ctx context.Context, onlineDDL *schema.OnlineDDL) (wasFailed bool, err error) {
if !onlineDDL.StrategySetting().IsInOrderCompletion() {
return false, nil
}
uuids, err := e.readFailedCancelledMigrationsInContextBeforeMigration(ctx, onlineDDL)
if err != nil {
return false, err
}
if len(uuids) == 0 {
return false, err
}
return true, e.failMigration(ctx, onlineDDL, fmt.Errorf("migration %v cannot run because prior migration %v in same context has failed/was cancelled", onlineDDL.UUID, uuids[0]))
}

// analyzeDropDDLActionMigration analyzes a DROP <TABLE|VIEW> migration.
func (e *Executor) analyzeDropDDLActionMigration(ctx context.Context, onlineDDL *schema.OnlineDDL) error {
// Schema analysis:
Expand Down Expand Up @@ -3384,6 +3426,58 @@ func (e *Executor) executeMigration(ctx context.Context, onlineDDL *schema.Onlin
return nil
}

// getNonConflictingMigration finds a single 'ready' migration which does not conflict with running migrations.
// Conflicts are:
// - a migration is 'ready' but is not set to run _concurrently_, and there's a running migration that is also non-concurrent
// - a migration is 'ready' but there's another migration 'running' on the exact same table
func (e *Executor) getNonConflictingMigration(ctx context.Context) (*schema.OnlineDDL, error) {
pendingMigrationsUUIDs, err := e.readPendingMigrationsUUIDs(ctx)
if err != nil {
return nil, err
}
r, err := e.execQuery(ctx, sqlSelectReadyMigrations)
if err != nil {
return nil, err
}
for _, row := range r.Named().Rows {
uuid := row["migration_uuid"].ToString()
onlineDDL, migrationRow, err := e.readMigration(ctx, uuid)
if err != nil {
return nil, err
}
isImmediateOperation := migrationRow.AsBool("is_immediate_operation", false)

if conflictFound, _ := e.isAnyConflictingMigrationRunning(onlineDDL); conflictFound {
continue // this migration conflicts with a running one
}
if e.countOwnedRunningMigrations() >= maxConcurrentOnlineDDLs {
return nil, nil // too many running migrations
}
if isImmediateOperation && onlineDDL.StrategySetting().IsInOrderCompletion() {
// This migration is immediate: if we run it now, it will complete within a second or two at most.
if len(pendingMigrationsUUIDs) > 0 && pendingMigrationsUUIDs[0] != onlineDDL.UUID {
continue
}
}
// We will fail an in-order migration if there's _prior_ migrations within the same migration-context
// which have failed.
if onlineDDL.StrategySetting().IsInOrderCompletion() {
wasFailed, err := e.validateInOrderMigration(ctx, onlineDDL)
if err != nil {
return nil, err
}
if wasFailed {
continue
}
}
// This migration seems good to go
return onlineDDL, err
}
// no non-conflicting migration found...
// Either all ready migrations are conflicting, or there are no ready migrations...
return nil, nil
}

// runNextMigration picks up to one 'ready' migration that is able to run, and executes it.
// Possible scenarios:
// - no migration is in 'ready' state -- nothing to be done
Expand All @@ -3405,47 +3499,7 @@ func (e *Executor) runNextMigration(ctx context.Context) error {
return nil
}

// getNonConflictingMigration finds a single 'ready' migration which does not conflict with running migrations.
// Conflicts are:
// - a migration is 'ready' but is not set to run _concurrently_, and there's a running migration that is also non-concurrent
// - a migration is 'ready' but there's another migration 'running' on the exact same table
getNonConflictingMigration := func() (*schema.OnlineDDL, error) {
pendingMigrationsUUIDs, err := e.readPendingMigrationsUUIDs(ctx)
if err != nil {
return nil, err
}
r, err := e.execQuery(ctx, sqlSelectReadyMigrations)
if err != nil {
return nil, err
}
for _, row := range r.Named().Rows {
uuid := row["migration_uuid"].ToString()
onlineDDL, migrationRow, err := e.readMigration(ctx, uuid)
if err != nil {
return nil, err
}
isImmediateOperation := migrationRow.AsBool("is_immediate_operation", false)

if conflictFound, _ := e.isAnyConflictingMigrationRunning(onlineDDL); conflictFound {
continue // this migration conflicts with a running one
}
if e.countOwnedRunningMigrations() >= maxConcurrentOnlineDDLs {
continue // too many running migrations
}
if isImmediateOperation && onlineDDL.StrategySetting().IsInOrderCompletion() {
// This migration is immediate: if we run it now, it will complete within a second or two at most.
if len(pendingMigrationsUUIDs) > 0 && pendingMigrationsUUIDs[0] != onlineDDL.UUID {
continue
}
}
// This migration seems good to go
return onlineDDL, err
}
// no non-conflicting migration found...
// Either all ready migrations are conflicting, or there are no ready migrations...
return nil, nil
}
onlineDDL, err := getNonConflictingMigration()
onlineDDL, err := e.getNonConflictingMigration(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -3792,6 +3846,19 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i
_ = e.updateMigrationETASecondsByProgress(ctx, uuid)
_ = e.updateMigrationLastThrottled(ctx, uuid, time.Unix(s.timeThrottled, 0), s.componentThrottled)

if onlineDDL.StrategySetting().IsInOrderCompletion() {
// We will fail an in-order migration if there's _prior_ migrations within the same migration-context
// which have failed.
wasFailed, err := e.validateInOrderMigration(ctx, onlineDDL)
if err != nil {
return err
}
if wasFailed {
return nil
}
}

// Check if the migration is ready to cut-over, and proceed to do so if it is.
isReady, err := e.isVReplMigrationReadyToCutOver(ctx, onlineDDL, s)
if err != nil {
_ = e.updateMigrationMessage(ctx, uuid, err.Error())
Expand Down
Loading

0 comments on commit b81c617

Please sign in to comment.