From a5f2c545ceb8053f794d58330596c946c5b8b8b5 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 2 May 2024 09:28:53 +0300 Subject: [PATCH 1/8] refactor getNonConflictingMigration function Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/vttablet/onlineddl/executor.go | 83 ++++++++++++++-------------- 1 file changed, 42 insertions(+), 41 deletions(-) diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index 42b2a4f827b..e3bf5a69866 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -3384,6 +3384,47 @@ func (e *Executor) executeMigration(ctx context.Context, onlineDDL *schema.Onlin return nil } +// getNonConflictingMigration finds a single 'ready' migration which does not conflict with running migrations. +// Conflicts are: +// - a migration is 'ready' but is not set to run _concurrently_, and there's a running migration that is also non-concurrent +// - a migration is 'ready' but there's another migration 'running' on the exact same table +func (e *Executor) getNonConflictingMigration(ctx context.Context) (*schema.OnlineDDL, error) { + pendingMigrationsUUIDs, err := e.readPendingMigrationsUUIDs(ctx) + if err != nil { + return nil, err + } + r, err := e.execQuery(ctx, sqlSelectReadyMigrations) + if err != nil { + return nil, err + } + for _, row := range r.Named().Rows { + uuid := row["migration_uuid"].ToString() + onlineDDL, migrationRow, err := e.readMigration(ctx, uuid) + if err != nil { + return nil, err + } + isImmediateOperation := migrationRow.AsBool("is_immediate_operation", false) + + if conflictFound, _ := e.isAnyConflictingMigrationRunning(onlineDDL); conflictFound { + continue // this migration conflicts with a running one + } + if e.countOwnedRunningMigrations() >= maxConcurrentOnlineDDLs { + continue // too many running migrations + } + if isImmediateOperation && onlineDDL.StrategySetting().IsInOrderCompletion() { + // This migration is immediate: if we run it now, it will complete within a second or two at most. + if len(pendingMigrationsUUIDs) > 0 && pendingMigrationsUUIDs[0] != onlineDDL.UUID { + continue + } + } + // This migration seems good to go + return onlineDDL, err + } + // no non-conflicting migration found... + // Either all ready migrations are conflicting, or there are no ready migrations... + return nil, nil +} + // runNextMigration picks up to one 'ready' migration that is able to run, and executes it. // Possible scenarios: // - no migration is in 'ready' state -- nothing to be done @@ -3405,47 +3446,7 @@ func (e *Executor) runNextMigration(ctx context.Context) error { return nil } - // getNonConflictingMigration finds a single 'ready' migration which does not conflict with running migrations. - // Conflicts are: - // - a migration is 'ready' but is not set to run _concurrently_, and there's a running migration that is also non-concurrent - // - a migration is 'ready' but there's another migration 'running' on the exact same table - getNonConflictingMigration := func() (*schema.OnlineDDL, error) { - pendingMigrationsUUIDs, err := e.readPendingMigrationsUUIDs(ctx) - if err != nil { - return nil, err - } - r, err := e.execQuery(ctx, sqlSelectReadyMigrations) - if err != nil { - return nil, err - } - for _, row := range r.Named().Rows { - uuid := row["migration_uuid"].ToString() - onlineDDL, migrationRow, err := e.readMigration(ctx, uuid) - if err != nil { - return nil, err - } - isImmediateOperation := migrationRow.AsBool("is_immediate_operation", false) - - if conflictFound, _ := e.isAnyConflictingMigrationRunning(onlineDDL); conflictFound { - continue // this migration conflicts with a running one - } - if e.countOwnedRunningMigrations() >= maxConcurrentOnlineDDLs { - continue // too many running migrations - } - if isImmediateOperation && onlineDDL.StrategySetting().IsInOrderCompletion() { - // This migration is immediate: if we run it now, it will complete within a second or two at most. - if len(pendingMigrationsUUIDs) > 0 && pendingMigrationsUUIDs[0] != onlineDDL.UUID { - continue - } - } - // This migration seems good to go - return onlineDDL, err - } - // no non-conflicting migration found... - // Either all ready migrations are conflicting, or there are no ready migrations... - return nil, nil - } - onlineDDL, err := getNonConflictingMigration() + onlineDDL, err := e.getNonConflictingMigration(ctx) if err != nil { return err } From 768d426d6a7722db12b161e40874f16dbeece4d3 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 6 May 2024 07:49:31 +0300 Subject: [PATCH 2/8] early break Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/vttablet/onlineddl/executor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index e3bf5a69866..e87e8c6c084 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -3409,7 +3409,7 @@ func (e *Executor) getNonConflictingMigration(ctx context.Context) (*schema.Onli continue // this migration conflicts with a running one } if e.countOwnedRunningMigrations() >= maxConcurrentOnlineDDLs { - continue // too many running migrations + return nil, nil // too many running migrations } if isImmediateOperation && onlineDDL.StrategySetting().IsInOrderCompletion() { // This migration is immediate: if we run it now, it will complete within a second or two at most. From e73ce7a65971a7b138a8f8dc104b827edcc6bfb5 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 5 Jun 2024 15:09:23 +0300 Subject: [PATCH 3/8] Adding scheduler tests for sequential in-order completion, expect early bail out when one migration fails. This last test will fail and a followup commit will implement the code Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../scheduler/onlineddl_scheduler_test.go | 78 +++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go index 4362069af66..0f996e7a5d6 100644 --- a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go +++ b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go @@ -422,6 +422,14 @@ func testScheduler(t *testing.T) { assert.GreaterOrEqual(t, endTime2, endTime1) }) } + testTableCompletionAndStartTimes := func(t *testing.T, uuid1, uuid2 string) { + // expect uuid1 to complete before uuid2 + t.Run("Compare t1, t2 completion times", func(t *testing.T) { + endTime1 := testReadTimestamp(t, uuid1, "completed_timestamp") + startedTime2 := testReadTimestamp(t, uuid2, "started_timestamp") + assert.GreaterOrEqual(t, startedTime2, endTime1) + }) + } testAllowConcurrent := func(t *testing.T, name string, uuid string, expect int64) { t.Run("verify allow_concurrent: "+name, func(t *testing.T) { rs := onlineddl.ReadMigrations(t, &vtParams, uuid) @@ -1183,6 +1191,36 @@ func testScheduler(t *testing.T) { }) }) // in-order-completion + t.Run("in-order-completion: multiple drops for nonexistent tables and views, sequential", func(t *testing.T) { + u, err := schema.CreateOnlineDDLUUID() + require.NoError(t, err) + + sqls := []string{ + fmt.Sprintf("drop table if exists t4_%s", u), + fmt.Sprintf("drop view if exists t1_%s", u), + fmt.Sprintf("drop table if exists t2_%s", u), + fmt.Sprintf("drop view if exists t3_%s", u), + } + sql := strings.Join(sqls, ";") + var vuuids []string + t.Run("drop multiple tables and views, in-order-completion", func(t *testing.T) { + uuidList := testOnlineDDLStatement(t, createParams(sql, ddlStrategy+" --in-order-completion", "vtctl", "", "", true)) // skip wait + vuuids = strings.Split(uuidList, "\n") + assert.Equal(t, 4, len(vuuids)) + for _, uuid := range vuuids { + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + } + }) + require.Equal(t, 4, len(vuuids)) + for i := range vuuids { + if i > 0 { + testTableCompletionTimes(t, vuuids[i-1], vuuids[i]) + testTableCompletionAndStartTimes(t, vuuids[i-1], vuuids[i]) + } + } + }) t.Run("in-order-completion: multiple drops for nonexistent tables and views", func(t *testing.T) { u, err := schema.CreateOnlineDDLUUID() require.NoError(t, err) @@ -1212,6 +1250,46 @@ func testScheduler(t *testing.T) { } } }) + t.Run("in-order-completion: bail out on first error", func(t *testing.T) { + u, err := schema.CreateOnlineDDLUUID() + require.NoError(t, err) + + sqls := []string{ + fmt.Sprintf("drop table if exists t4_%s", u), + fmt.Sprintf("drop view if exists t1_%s", u), + fmt.Sprintf("drop table t2_%s", u), // non existent + fmt.Sprintf("drop view if exists t3_%s", u), + } + sql := strings.Join(sqls, ";") + var vuuids []string + t.Run("apply schema", func(t *testing.T) { + uuidList := testOnlineDDLStatement(t, createParams(sql, ddlStrategy+" --in-order-completion", "vtctl", "", "", true)) // skip wait + vuuids = strings.Split(uuidList, "\n") + assert.Equal(t, 4, len(vuuids)) + for _, uuid := range vuuids[0:2] { + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + } + { + uuid := vuuids[2] // the failed one + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusFailed) + } + { + uuid := vuuids[3] // should not run + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed, schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusReady) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusReady) + <-time.After(4 * time.Second) // wait for a while to ensure it doesn't change + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusReady) + } + }) + testTableCompletionTimes(t, vuuids[0], vuuids[1]) + testTableCompletionAndStartTimes(t, vuuids[0], vuuids[1]) + testTableCompletionAndStartTimes(t, vuuids[1], vuuids[2]) + }) t.Run("in-order-completion: two new views, one depends on the other", func(t *testing.T) { u, err := schema.CreateOnlineDDLUUID() require.NoError(t, err) From a24634eba3f5523a94d60e265d5c46fd0d7f5d28 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 5 Jun 2024 15:09:55 +0300 Subject: [PATCH 4/8] add query to select migrations by context Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/vttablet/onlineddl/schema.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/go/vt/vttablet/onlineddl/schema.go b/go/vt/vttablet/onlineddl/schema.go index 2ba566703e5..7bc3bb97759 100644 --- a/go/vt/vttablet/onlineddl/schema.go +++ b/go/vt/vttablet/onlineddl/schema.go @@ -303,6 +303,7 @@ const ( FROM _vt.schema_migrations WHERE migration_status='running' + ORDER BY id ` sqlSelectCompleteMigrationsOnTable = `SELECT migration_uuid, @@ -333,6 +334,17 @@ const ( WHERE migration_status='running' AND liveness_timestamp < NOW() - INTERVAL %a MINUTE + ORDER BY id + ` + sqlSelectMigrationsByContext = `SELECT + migration_uuid, + migration_context, + mysql_table, + migration_status + FROM _vt.schema_migrations + WHERE + migration_context=%a + ORDER BY id ` sqlSelectPendingMigrations = `SELECT migration_uuid, @@ -365,6 +377,7 @@ const ( NOW() - INTERVAL %a SECOND, NOW() - INTERVAL retain_artifacts_seconds SECOND ) + ORDER BY id ` sqlFixCompletedTimestamp = `UPDATE _vt.schema_migrations SET From d2eff24d36ed689d560de0730bc4a32ef7210c84 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 5 Jun 2024 17:06:17 +0300 Subject: [PATCH 5/8] Test that a runniing in-order migration fails if a prior migration is cancelled/failed Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../scheduler/onlineddl_scheduler_test.go | 47 ++++++++++++++++--- 1 file changed, 40 insertions(+), 7 deletions(-) diff --git a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go index 0f996e7a5d6..abbd9aea791 100644 --- a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go +++ b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go @@ -198,7 +198,7 @@ func waitForReadyToComplete(t *testing.T, uuid string, expected bool) { case <-ticker.C: case <-ctx.Done(): } - require.NoError(t, ctx.Err()) + require.NoError(t, ctx.Err(), "waiting for ready_to_complete=%t for %v", expected, uuid) } } @@ -442,7 +442,7 @@ func testScheduler(t *testing.T) { } // CREATE - t.Run("CREATE TABLEs t1, t1", func(t *testing.T) { + t.Run("CREATE TABLEs t1, t2", func(t *testing.T) { { // The table does not exist t1uuid = testOnlineDDLStatement(t, createParams(createT1Statement, ddlStrategy, "vtgate", "just-created", "", false)) onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete) @@ -1250,6 +1250,7 @@ func testScheduler(t *testing.T) { } } }) + t.Run("in-order-completion: bail out on first error", func(t *testing.T) { u, err := schema.CreateOnlineDDLUUID() require.NoError(t, err) @@ -1278,18 +1279,50 @@ func testScheduler(t *testing.T) { onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusFailed) } { - uuid := vuuids[3] // should not run - status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed, schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusReady) + uuid := vuuids[3] // should consequently fail without even running + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) - onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusReady) - <-time.After(4 * time.Second) // wait for a while to ensure it doesn't change - onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusReady) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusFailed) + + rs := onlineddl.ReadMigrations(t, &vtParams, uuid) + require.NotNil(t, rs) + for _, row := range rs.Named().Rows { + message := row["message"].ToString() + require.Contains(t, message, vuuids[2]) // Indicating this migration failed due to vuuids[2] failure + } } }) testTableCompletionTimes(t, vuuids[0], vuuids[1]) testTableCompletionAndStartTimes(t, vuuids[0], vuuids[1]) testTableCompletionAndStartTimes(t, vuuids[1], vuuids[2]) }) + t.Run("in-order-completion concurrent: bail out on first error", func(t *testing.T) { + sqls := []string{ + `alter table t1_test force`, + `alter table t2_test force`, + } + sql := strings.Join(sqls, ";") + var vuuids []string + t.Run("apply schema", func(t *testing.T) { + uuidList := testOnlineDDLStatement(t, createParams(sql, ddlStrategy+" --in-order-completion --postpone-completion --allow-concurrent", "vtctl", "", "", true)) // skip wait + vuuids = strings.Split(uuidList, "\n") + assert.Equal(t, 2, len(vuuids)) + for _, uuid := range vuuids { + waitForReadyToComplete(t, uuid, true) + } + t.Run("cancel 1st migration", func(t *testing.T) { + onlineddl.CheckCancelMigration(t, &vtParams, shards, vuuids[0], true) + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, vuuids[0], normalWaitTime, schema.OnlineDDLStatusFailed, schema.OnlineDDLStatusCancelled) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, vuuids[0], schema.OnlineDDLStatusCancelled) + }) + t.Run("expect 2nd migration to fail", func(t *testing.T) { + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, vuuids[1], normalWaitTime, schema.OnlineDDLStatusFailed, schema.OnlineDDLStatusCancelled) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, vuuids[1], schema.OnlineDDLStatusFailed) + }) + }) + }) t.Run("in-order-completion: two new views, one depends on the other", func(t *testing.T) { u, err := schema.CreateOnlineDDLUUID() require.NoError(t, err) From 7d6cee01694ef906b4dfdb5887edfea0af92d004 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 5 Jun 2024 20:57:50 +0300 Subject: [PATCH 6/8] all in one 'sqlSelectFailedCancelledMigrationsInContextBeforeMigration' query picks a failed or cancelled migration with a given migration context, and which comes before a given migration Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/vttablet/onlineddl/schema.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/go/vt/vttablet/onlineddl/schema.go b/go/vt/vttablet/onlineddl/schema.go index 7bc3bb97759..30f132bd0e3 100644 --- a/go/vt/vttablet/onlineddl/schema.go +++ b/go/vt/vttablet/onlineddl/schema.go @@ -336,14 +336,15 @@ const ( AND liveness_timestamp < NOW() - INTERVAL %a MINUTE ORDER BY id ` - sqlSelectMigrationsByContext = `SELECT - migration_uuid, - migration_context, - mysql_table, - migration_status + sqlSelectFailedCancelledMigrationsInContextBeforeMigration = `SELECT + migration_uuid FROM _vt.schema_migrations WHERE migration_context=%a + AND migration_status IN ('failed', 'cancelled') + AND id < ( + SELECT id FROM _vt.schema_migrations WHERE migration_uuid=%a + ) ORDER BY id ` sqlSelectPendingMigrations = `SELECT From 1cf18bef02c73732610b1cdd5eee7af7404905b0 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 5 Jun 2024 21:01:07 +0300 Subject: [PATCH 7/8] Fail a --in-order-completion migration, if a _prior_ migration within the same context is 'failed' or 'cancelled'. Failing happens either at 'ready' state or at 'running' state Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/vttablet/onlineddl/executor.go | 66 ++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index e87e8c6c084..ebce8f0619c 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -2854,6 +2854,31 @@ func (e *Executor) getCompletedMigrationByContextAndSQL(ctx context.Context, onl return completedUUID, nil } +// readFailedCancelledMigrationsInContextBeforeMigration returns UUIDs for migrations that are failed/cancelled +// and are in the same context as given migration and _precede_ it chronologically (have lower `id` value) +func (e *Executor) readFailedCancelledMigrationsInContextBeforeMigration(ctx context.Context, onlineDDL *schema.OnlineDDL) (uuids []string, err error) { + if onlineDDL.MigrationContext == "" { + // only applies to migrations with an explicit context + return nil, nil + } + query, err := sqlparser.ParseAndBind(sqlSelectFailedCancelledMigrationsInContextBeforeMigration, + sqltypes.StringBindVariable(onlineDDL.MigrationContext), + sqltypes.StringBindVariable(onlineDDL.UUID), + ) + if err != nil { + return nil, err + } + r, err := e.execQuery(ctx, query) + if err != nil { + return uuids, err + } + for _, row := range r.Named().Rows { + uuid := row["migration_uuid"].ToString() + uuids = append(uuids, uuid) + } + return uuids, err +} + // failMigration marks a migration as failed func (e *Executor) failMigration(ctx context.Context, onlineDDL *schema.OnlineDDL, withError error) error { defer e.triggerNextCheckInterval() @@ -2865,6 +2890,23 @@ func (e *Executor) failMigration(ctx context.Context, onlineDDL *schema.OnlineDD return withError } +// validateInOrderMigration checks whether an in-order migration should be forced to fail, either before running or +// while running. +// This may happen if a prior migration in the same context has failed or was cancelled. +func (e *Executor) validateInOrderMigration(ctx context.Context, onlineDDL *schema.OnlineDDL) (wasFailed bool, err error) { + if !onlineDDL.StrategySetting().IsInOrderCompletion() { + return false, nil + } + uuids, err := e.readFailedCancelledMigrationsInContextBeforeMigration(ctx, onlineDDL) + if err != nil { + return false, err + } + if len(uuids) == 0 { + return false, err + } + return true, e.failMigration(ctx, onlineDDL, fmt.Errorf("migration %v cannot run because prior migration %v in same context has failed/was cancelled", onlineDDL.UUID, uuids[0])) +} + // analyzeDropDDLActionMigration analyzes a DROP migration. func (e *Executor) analyzeDropDDLActionMigration(ctx context.Context, onlineDDL *schema.OnlineDDL) error { // Schema analysis: @@ -3417,6 +3459,17 @@ func (e *Executor) getNonConflictingMigration(ctx context.Context) (*schema.Onli continue } } + // We will fail an in-order migration if there's _prior_ migrations within the same migration-context + // which have failed. + if onlineDDL.StrategySetting().IsInOrderCompletion() { + wasFailed, err := e.validateInOrderMigration(ctx, onlineDDL) + if err != nil { + return nil, err + } + if wasFailed { + continue + } + } // This migration seems good to go return onlineDDL, err } @@ -3793,6 +3846,19 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i _ = e.updateMigrationETASecondsByProgress(ctx, uuid) _ = e.updateMigrationLastThrottled(ctx, uuid, time.Unix(s.timeThrottled, 0), s.componentThrottled) + if onlineDDL.StrategySetting().IsInOrderCompletion() { + // We will fail an in-order migration if there's _prior_ migrations within the same migration-context + // which have failed. + wasFailed, err := e.validateInOrderMigration(ctx, onlineDDL) + if err != nil { + return err + } + if wasFailed { + return nil + } + } + + // Check if the migration is ready to cut-over, and proceed to do so if it is. isReady, err := e.isVReplMigrationReadyToCutOver(ctx, onlineDDL, s) if err != nil { _ = e.updateMigrationMessage(ctx, uuid, err.Error()) From c429499dca7b0892d87143e8e2bd42ff154da2db Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 5 Jun 2024 23:09:57 +0300 Subject: [PATCH 8/8] use assert.Len, require.Len Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../scheduler/onlineddl_scheduler_test.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go index abbd9aea791..5b88b1f8678 100644 --- a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go +++ b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go @@ -1206,14 +1206,14 @@ func testScheduler(t *testing.T) { t.Run("drop multiple tables and views, in-order-completion", func(t *testing.T) { uuidList := testOnlineDDLStatement(t, createParams(sql, ddlStrategy+" --in-order-completion", "vtctl", "", "", true)) // skip wait vuuids = strings.Split(uuidList, "\n") - assert.Equal(t, 4, len(vuuids)) + assert.Len(t, vuuids, 4) for _, uuid := range vuuids { status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) } }) - require.Equal(t, 4, len(vuuids)) + require.Len(t, vuuids, 4) for i := range vuuids { if i > 0 { testTableCompletionTimes(t, vuuids[i-1], vuuids[i]) @@ -1236,14 +1236,14 @@ func testScheduler(t *testing.T) { t.Run("drop multiple tables and views, in-order-completion", func(t *testing.T) { uuidList := testOnlineDDLStatement(t, createParams(sql, ddlStrategy+" --allow-concurrent --in-order-completion", "vtctl", "", "", true)) // skip wait vuuids = strings.Split(uuidList, "\n") - assert.Equal(t, 4, len(vuuids)) + assert.Len(t, vuuids, 4) for _, uuid := range vuuids { status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) } }) - require.Equal(t, 4, len(vuuids)) + require.Len(t, vuuids, 4) for i := range vuuids { if i > 0 { testTableCompletionTimes(t, vuuids[i-1], vuuids[i]) @@ -1266,7 +1266,7 @@ func testScheduler(t *testing.T) { t.Run("apply schema", func(t *testing.T) { uuidList := testOnlineDDLStatement(t, createParams(sql, ddlStrategy+" --in-order-completion", "vtctl", "", "", true)) // skip wait vuuids = strings.Split(uuidList, "\n") - assert.Equal(t, 4, len(vuuids)) + assert.Len(t, vuuids, 4) for _, uuid := range vuuids[0:2] { status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) @@ -1306,7 +1306,7 @@ func testScheduler(t *testing.T) { t.Run("apply schema", func(t *testing.T) { uuidList := testOnlineDDLStatement(t, createParams(sql, ddlStrategy+" --in-order-completion --postpone-completion --allow-concurrent", "vtctl", "", "", true)) // skip wait vuuids = strings.Split(uuidList, "\n") - assert.Equal(t, 2, len(vuuids)) + assert.Len(t, vuuids, 2) for _, uuid := range vuuids { waitForReadyToComplete(t, uuid, true) } @@ -1336,14 +1336,14 @@ func testScheduler(t *testing.T) { t.Run("create two views, expect both complete", func(t *testing.T) { uuidList := testOnlineDDLStatement(t, createParams(sql, ddlStrategy+" --allow-concurrent --in-order-completion", "vtctl", "", "", true)) // skip wait vuuids = strings.Split(uuidList, "\n") - assert.Equal(t, 2, len(vuuids)) + assert.Len(t, vuuids, 2) for _, uuid := range vuuids { status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) } }) - require.Equal(t, 2, len(vuuids)) + require.Len(t, vuuids, 2) testTableCompletionTimes(t, vuuids[0], vuuids[1]) }) t.Run("in-order-completion: new table column, new view depends on said column", func(t *testing.T) {