Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Online DDL: Fail a --in-order-completion migration, if a _prior_ migration within the same context is 'failed' or 'cancelled' #16071

Merged
115 changes: 113 additions & 2 deletions go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func waitForReadyToComplete(t *testing.T, uuid string, expected bool) {
case <-ticker.C:
case <-ctx.Done():
}
require.NoError(t, ctx.Err())
require.NoError(t, ctx.Err(), "waiting for ready_to_complete=%t for %v", expected, uuid)
}
}

Expand Down Expand Up @@ -422,6 +422,14 @@ func testScheduler(t *testing.T) {
assert.GreaterOrEqual(t, endTime2, endTime1)
})
}
testTableCompletionAndStartTimes := func(t *testing.T, uuid1, uuid2 string) {
// expect uuid1 to complete before uuid2
t.Run("Compare t1, t2 completion times", func(t *testing.T) {
endTime1 := testReadTimestamp(t, uuid1, "completed_timestamp")
startedTime2 := testReadTimestamp(t, uuid2, "started_timestamp")
assert.GreaterOrEqual(t, startedTime2, endTime1)
})
}
testAllowConcurrent := func(t *testing.T, name string, uuid string, expect int64) {
t.Run("verify allow_concurrent: "+name, func(t *testing.T) {
rs := onlineddl.ReadMigrations(t, &vtParams, uuid)
Expand All @@ -434,7 +442,7 @@ func testScheduler(t *testing.T) {
}

// CREATE
t.Run("CREATE TABLEs t1, t1", func(t *testing.T) {
t.Run("CREATE TABLEs t1, t2", func(t *testing.T) {
{ // The table does not exist
t1uuid = testOnlineDDLStatement(t, createParams(createT1Statement, ddlStrategy, "vtgate", "just-created", "", false))
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete)
Expand Down Expand Up @@ -1183,6 +1191,36 @@ func testScheduler(t *testing.T) {
})
})
// in-order-completion
t.Run("in-order-completion: multiple drops for nonexistent tables and views, sequential", func(t *testing.T) {
u, err := schema.CreateOnlineDDLUUID()
require.NoError(t, err)

sqls := []string{
fmt.Sprintf("drop table if exists t4_%s", u),
fmt.Sprintf("drop view if exists t1_%s", u),
fmt.Sprintf("drop table if exists t2_%s", u),
fmt.Sprintf("drop view if exists t3_%s", u),
}
sql := strings.Join(sqls, ";")
var vuuids []string
t.Run("drop multiple tables and views, in-order-completion", func(t *testing.T) {
uuidList := testOnlineDDLStatement(t, createParams(sql, ddlStrategy+" --in-order-completion", "vtctl", "", "", true)) // skip wait
vuuids = strings.Split(uuidList, "\n")
assert.Equal(t, 4, len(vuuids))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you didn't already know, you can use assert.Len too/instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not know!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

for _, uuid := range vuuids {
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
}
})
require.Equal(t, 4, len(vuuids))
for i := range vuuids {
if i > 0 {
testTableCompletionTimes(t, vuuids[i-1], vuuids[i])
testTableCompletionAndStartTimes(t, vuuids[i-1], vuuids[i])
}
}
})
t.Run("in-order-completion: multiple drops for nonexistent tables and views", func(t *testing.T) {
u, err := schema.CreateOnlineDDLUUID()
require.NoError(t, err)
Expand Down Expand Up @@ -1212,6 +1250,79 @@ func testScheduler(t *testing.T) {
}
}
})

t.Run("in-order-completion: bail out on first error", func(t *testing.T) {
u, err := schema.CreateOnlineDDLUUID()
require.NoError(t, err)

sqls := []string{
fmt.Sprintf("drop table if exists t4_%s", u),
fmt.Sprintf("drop view if exists t1_%s", u),
fmt.Sprintf("drop table t2_%s", u), // non existent
fmt.Sprintf("drop view if exists t3_%s", u),
}
sql := strings.Join(sqls, ";")
var vuuids []string
t.Run("apply schema", func(t *testing.T) {
uuidList := testOnlineDDLStatement(t, createParams(sql, ddlStrategy+" --in-order-completion", "vtctl", "", "", true)) // skip wait
vuuids = strings.Split(uuidList, "\n")
assert.Equal(t, 4, len(vuuids))
for _, uuid := range vuuids[0:2] {
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
}
{
uuid := vuuids[2] // the failed one
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusFailed)
}
{
uuid := vuuids[3] // should consequently fail without even running
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusFailed)

rs := onlineddl.ReadMigrations(t, &vtParams, uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
message := row["message"].ToString()
require.Contains(t, message, vuuids[2]) // Indicating this migration failed due to vuuids[2] failure
}
}
})
testTableCompletionTimes(t, vuuids[0], vuuids[1])
testTableCompletionAndStartTimes(t, vuuids[0], vuuids[1])
testTableCompletionAndStartTimes(t, vuuids[1], vuuids[2])
})
t.Run("in-order-completion concurrent: bail out on first error", func(t *testing.T) {
sqls := []string{
`alter table t1_test force`,
`alter table t2_test force`,
}
sql := strings.Join(sqls, ";")
var vuuids []string
t.Run("apply schema", func(t *testing.T) {
uuidList := testOnlineDDLStatement(t, createParams(sql, ddlStrategy+" --in-order-completion --postpone-completion --allow-concurrent", "vtctl", "", "", true)) // skip wait
vuuids = strings.Split(uuidList, "\n")
assert.Equal(t, 2, len(vuuids))
for _, uuid := range vuuids {
waitForReadyToComplete(t, uuid, true)
}
t.Run("cancel 1st migration", func(t *testing.T) {
onlineddl.CheckCancelMigration(t, &vtParams, shards, vuuids[0], true)
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, vuuids[0], normalWaitTime, schema.OnlineDDLStatusFailed, schema.OnlineDDLStatusCancelled)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, vuuids[0], schema.OnlineDDLStatusCancelled)
})
t.Run("expect 2nd migration to fail", func(t *testing.T) {
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, vuuids[1], normalWaitTime, schema.OnlineDDLStatusFailed, schema.OnlineDDLStatusCancelled)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, vuuids[1], schema.OnlineDDLStatusFailed)
})
})
})
t.Run("in-order-completion: two new views, one depends on the other", func(t *testing.T) {
u, err := schema.CreateOnlineDDLUUID()
require.NoError(t, err)
Expand Down
149 changes: 108 additions & 41 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2854,6 +2854,31 @@ func (e *Executor) getCompletedMigrationByContextAndSQL(ctx context.Context, onl
return completedUUID, nil
}

// readFailedCancelledMigrationsInContextBeforeMigration returns UUIDs for migrations that are failed/cancelled
// and are in the same context as given migration and _precede_ it chronologically (have lower `id` value)
func (e *Executor) readFailedCancelledMigrationsInContextBeforeMigration(ctx context.Context, onlineDDL *schema.OnlineDDL) (uuids []string, err error) {
if onlineDDL.MigrationContext == "" {
// only applies to migrations with an explicit context
return nil, nil
}
query, err := sqlparser.ParseAndBind(sqlSelectFailedCancelledMigrationsInContextBeforeMigration,
sqltypes.StringBindVariable(onlineDDL.MigrationContext),
sqltypes.StringBindVariable(onlineDDL.UUID),
)
if err != nil {
return nil, err
}
r, err := e.execQuery(ctx, query)
if err != nil {
return uuids, err
}
for _, row := range r.Named().Rows {
uuid := row["migration_uuid"].ToString()
uuids = append(uuids, uuid)
}
return uuids, err
}

// failMigration marks a migration as failed
func (e *Executor) failMigration(ctx context.Context, onlineDDL *schema.OnlineDDL, withError error) error {
defer e.triggerNextCheckInterval()
Expand All @@ -2865,6 +2890,23 @@ func (e *Executor) failMigration(ctx context.Context, onlineDDL *schema.OnlineDD
return withError
}

// validateInOrderMigration checks whether an in-order migration should be forced to fail, either before running or
// while running.
// This may happen if a prior migration in the same context has failed or was cancelled.
func (e *Executor) validateInOrderMigration(ctx context.Context, onlineDDL *schema.OnlineDDL) (wasFailed bool, err error) {
if !onlineDDL.StrategySetting().IsInOrderCompletion() {
return false, nil
}
uuids, err := e.readFailedCancelledMigrationsInContextBeforeMigration(ctx, onlineDDL)
if err != nil {
return false, err
}
if len(uuids) == 0 {
return false, err
}
return true, e.failMigration(ctx, onlineDDL, fmt.Errorf("migration %v cannot run because prior migration %v in same context has failed/was cancelled", onlineDDL.UUID, uuids[0]))
}

// analyzeDropDDLActionMigration analyzes a DROP <TABLE|VIEW> migration.
func (e *Executor) analyzeDropDDLActionMigration(ctx context.Context, onlineDDL *schema.OnlineDDL) error {
// Schema analysis:
Expand Down Expand Up @@ -3384,6 +3426,58 @@ func (e *Executor) executeMigration(ctx context.Context, onlineDDL *schema.Onlin
return nil
}

// getNonConflictingMigration finds a single 'ready' migration which does not conflict with running migrations.
// Conflicts are:
// - a migration is 'ready' but is not set to run _concurrently_, and there's a running migration that is also non-concurrent
// - a migration is 'ready' but there's another migration 'running' on the exact same table
func (e *Executor) getNonConflictingMigration(ctx context.Context) (*schema.OnlineDDL, error) {
pendingMigrationsUUIDs, err := e.readPendingMigrationsUUIDs(ctx)
if err != nil {
return nil, err
}
r, err := e.execQuery(ctx, sqlSelectReadyMigrations)
if err != nil {
return nil, err
}
for _, row := range r.Named().Rows {
uuid := row["migration_uuid"].ToString()
onlineDDL, migrationRow, err := e.readMigration(ctx, uuid)
if err != nil {
return nil, err
}
isImmediateOperation := migrationRow.AsBool("is_immediate_operation", false)

if conflictFound, _ := e.isAnyConflictingMigrationRunning(onlineDDL); conflictFound {
continue // this migration conflicts with a running one
}
if e.countOwnedRunningMigrations() >= maxConcurrentOnlineDDLs {
return nil, nil // too many running migrations
}
if isImmediateOperation && onlineDDL.StrategySetting().IsInOrderCompletion() {
// This migration is immediate: if we run it now, it will complete within a second or two at most.
if len(pendingMigrationsUUIDs) > 0 && pendingMigrationsUUIDs[0] != onlineDDL.UUID {
continue
}
}
// We will fail an in-order migration if there's _prior_ migrations within the same migration-context
// which have failed.
if onlineDDL.StrategySetting().IsInOrderCompletion() {
wasFailed, err := e.validateInOrderMigration(ctx, onlineDDL)
if err != nil {
return nil, err
}
if wasFailed {
continue
}
}
Comment on lines +3462 to +3472
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of this function is just an extract/refactor out of runNextMigration() and into an independent function of its own. But these lines above are then also added to fail a migration before even running it, if the in-order terms are not met.

// This migration seems good to go
return onlineDDL, err
}
// no non-conflicting migration found...
// Either all ready migrations are conflicting, or there are no ready migrations...
return nil, nil
}

// runNextMigration picks up to one 'ready' migration that is able to run, and executes it.
// Possible scenarios:
// - no migration is in 'ready' state -- nothing to be done
Expand All @@ -3405,47 +3499,7 @@ func (e *Executor) runNextMigration(ctx context.Context) error {
return nil
}

// getNonConflictingMigration finds a single 'ready' migration which does not conflict with running migrations.
// Conflicts are:
// - a migration is 'ready' but is not set to run _concurrently_, and there's a running migration that is also non-concurrent
// - a migration is 'ready' but there's another migration 'running' on the exact same table
getNonConflictingMigration := func() (*schema.OnlineDDL, error) {
pendingMigrationsUUIDs, err := e.readPendingMigrationsUUIDs(ctx)
if err != nil {
return nil, err
}
r, err := e.execQuery(ctx, sqlSelectReadyMigrations)
if err != nil {
return nil, err
}
for _, row := range r.Named().Rows {
uuid := row["migration_uuid"].ToString()
onlineDDL, migrationRow, err := e.readMigration(ctx, uuid)
if err != nil {
return nil, err
}
isImmediateOperation := migrationRow.AsBool("is_immediate_operation", false)

if conflictFound, _ := e.isAnyConflictingMigrationRunning(onlineDDL); conflictFound {
continue // this migration conflicts with a running one
}
if e.countOwnedRunningMigrations() >= maxConcurrentOnlineDDLs {
continue // too many running migrations
}
if isImmediateOperation && onlineDDL.StrategySetting().IsInOrderCompletion() {
// This migration is immediate: if we run it now, it will complete within a second or two at most.
if len(pendingMigrationsUUIDs) > 0 && pendingMigrationsUUIDs[0] != onlineDDL.UUID {
continue
}
}
// This migration seems good to go
return onlineDDL, err
}
// no non-conflicting migration found...
// Either all ready migrations are conflicting, or there are no ready migrations...
return nil, nil
}
onlineDDL, err := getNonConflictingMigration()
onlineDDL, err := e.getNonConflictingMigration(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -3792,6 +3846,19 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i
_ = e.updateMigrationETASecondsByProgress(ctx, uuid)
_ = e.updateMigrationLastThrottled(ctx, uuid, time.Unix(s.timeThrottled, 0), s.componentThrottled)

if onlineDDL.StrategySetting().IsInOrderCompletion() {
// We will fail an in-order migration if there's _prior_ migrations within the same migration-context
// which have failed.
wasFailed, err := e.validateInOrderMigration(ctx, onlineDDL)
if err != nil {
return err
}
if wasFailed {
return nil
}
}

// Check if the migration is ready to cut-over, and proceed to do so if it is.
isReady, err := e.isVReplMigrationReadyToCutOver(ctx, onlineDDL, s)
if err != nil {
_ = e.updateMigrationMessage(ctx, uuid, err.Error())
Expand Down
14 changes: 14 additions & 0 deletions go/vt/vttablet/onlineddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ const (
FROM _vt.schema_migrations
WHERE
migration_status='running'
ORDER BY id
`
sqlSelectCompleteMigrationsOnTable = `SELECT
migration_uuid,
Expand Down Expand Up @@ -333,6 +334,18 @@ const (
WHERE
migration_status='running'
AND liveness_timestamp < NOW() - INTERVAL %a MINUTE
ORDER BY id
`
sqlSelectFailedCancelledMigrationsInContextBeforeMigration = `SELECT
migration_uuid
FROM _vt.schema_migrations
WHERE
migration_context=%a
AND migration_status IN ('failed', 'cancelled')
AND id < (
SELECT id FROM _vt.schema_migrations WHERE migration_uuid=%a
)
ORDER BY id
`
sqlSelectPendingMigrations = `SELECT
migration_uuid,
Expand Down Expand Up @@ -365,6 +378,7 @@ const (
NOW() - INTERVAL %a SECOND,
NOW() - INTERVAL retain_artifacts_seconds SECOND
)
ORDER BY id
`
sqlFixCompletedTimestamp = `UPDATE _vt.schema_migrations
SET
Expand Down
Loading