diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index e87e8c6c084..ebce8f0619c 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -2854,6 +2854,31 @@ func (e *Executor) getCompletedMigrationByContextAndSQL(ctx context.Context, onl return completedUUID, nil } +// readFailedCancelledMigrationsInContextBeforeMigration returns UUIDs for migrations that are failed/cancelled +// and are in the same context as given migration and _precede_ it chronologically (have lower `id` value) +func (e *Executor) readFailedCancelledMigrationsInContextBeforeMigration(ctx context.Context, onlineDDL *schema.OnlineDDL) (uuids []string, err error) { + if onlineDDL.MigrationContext == "" { + // only applies to migrations with an explicit context + return nil, nil + } + query, err := sqlparser.ParseAndBind(sqlSelectFailedCancelledMigrationsInContextBeforeMigration, + sqltypes.StringBindVariable(onlineDDL.MigrationContext), + sqltypes.StringBindVariable(onlineDDL.UUID), + ) + if err != nil { + return nil, err + } + r, err := e.execQuery(ctx, query) + if err != nil { + return uuids, err + } + for _, row := range r.Named().Rows { + uuid := row["migration_uuid"].ToString() + uuids = append(uuids, uuid) + } + return uuids, err +} + // failMigration marks a migration as failed func (e *Executor) failMigration(ctx context.Context, onlineDDL *schema.OnlineDDL, withError error) error { defer e.triggerNextCheckInterval() @@ -2865,6 +2890,23 @@ func (e *Executor) failMigration(ctx context.Context, onlineDDL *schema.OnlineDD return withError } +// validateInOrderMigration checks whether an in-order migration should be forced to fail, either before running or +// while running. +// This may happen if a prior migration in the same context has failed or was cancelled. +func (e *Executor) validateInOrderMigration(ctx context.Context, onlineDDL *schema.OnlineDDL) (wasFailed bool, err error) { + if !onlineDDL.StrategySetting().IsInOrderCompletion() { + return false, nil + } + uuids, err := e.readFailedCancelledMigrationsInContextBeforeMigration(ctx, onlineDDL) + if err != nil { + return false, err + } + if len(uuids) == 0 { + return false, err + } + return true, e.failMigration(ctx, onlineDDL, fmt.Errorf("migration %v cannot run because prior migration %v in same context has failed/was cancelled", onlineDDL.UUID, uuids[0])) +} + // analyzeDropDDLActionMigration analyzes a DROP migration. func (e *Executor) analyzeDropDDLActionMigration(ctx context.Context, onlineDDL *schema.OnlineDDL) error { // Schema analysis: @@ -3417,6 +3459,17 @@ func (e *Executor) getNonConflictingMigration(ctx context.Context) (*schema.Onli continue } } + // We will fail an in-order migration if there's _prior_ migrations within the same migration-context + // which have failed. + if onlineDDL.StrategySetting().IsInOrderCompletion() { + wasFailed, err := e.validateInOrderMigration(ctx, onlineDDL) + if err != nil { + return nil, err + } + if wasFailed { + continue + } + } // This migration seems good to go return onlineDDL, err } @@ -3793,6 +3846,19 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i _ = e.updateMigrationETASecondsByProgress(ctx, uuid) _ = e.updateMigrationLastThrottled(ctx, uuid, time.Unix(s.timeThrottled, 0), s.componentThrottled) + if onlineDDL.StrategySetting().IsInOrderCompletion() { + // We will fail an in-order migration if there's _prior_ migrations within the same migration-context + // which have failed. + wasFailed, err := e.validateInOrderMigration(ctx, onlineDDL) + if err != nil { + return err + } + if wasFailed { + return nil + } + } + + // Check if the migration is ready to cut-over, and proceed to do so if it is. isReady, err := e.isVReplMigrationReadyToCutOver(ctx, onlineDDL, s) if err != nil { _ = e.updateMigrationMessage(ctx, uuid, err.Error())