diff --git a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go index e471931a20c..057c937d6cf 100644 --- a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go +++ b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go @@ -363,6 +363,9 @@ func testScheduler(t *testing.T) { createViewDependsOnExtraColumn = ` CREATE VIEW t1_test_view AS SELECT id, extra_column FROM t1_test ` + alterNonexistent = ` + ALTER TABLE nonexistent FORCE + ` ) testReadTimestamp := func(t *testing.T, uuid string, timestampColumn string) (timestamp string) { @@ -960,6 +963,22 @@ func testScheduler(t *testing.T) { }) }) } + // Failure scenarios + t.Run("fail nonexistent", func(t *testing.T) { + uuid := testOnlineDDLStatement(t, createParams(alterNonexistent, "vitess", "vtgate", "", "", false)) + + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusFailed) + + rs := onlineddl.ReadMigrations(t, &vtParams, uuid) + require.NotNil(t, rs) + for _, row := range rs.Named().Rows { + message := row["message"].ToString() + require.Contains(t, message, "errno 1146") + } + }) + // 'mysql' strategy t.Run("mysql strategy", func(t *testing.T) { t.Run("declarative", func(t *testing.T) { diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index 3a6cbc8912f..7b216f0b1ad 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -2310,6 +2310,64 @@ func (e *Executor) reviewImmediateOperations(ctx context.Context, capableOf mysq return false, nil } +// reviewQueuedMigration investigates a single migration found in `queued` state. +// It analyzes whether the migration can & should be fulfilled immediately (e.g. via INSTANT DDL or just because it's a CREATE or DROP), +// or backfils necessary information if it's a REVERT. +// If all goes well, it sets `reviewed_timestamp` which then allows the state machine to schedule the migration. +func (e *Executor) reviewQueuedMigration(ctx context.Context, uuid string, capableOf mysql.CapableOf) error { + onlineDDL, row, err := e.readMigration(ctx, uuid) + if err != nil { + return err + } + // handle REVERT migrations: populate table name and update ddl action and is_view: + ddlAction := row["ddl_action"].ToString() + isRevert := false + if ddlAction == schema.RevertActionStr { + isRevert = true + rowModified, err := e.reviewEmptyTableRevertMigrations(ctx, onlineDDL) + if err != nil { + return err + } + if rowModified { + // re-read migration and entire row + onlineDDL, row, err = e.readMigration(ctx, uuid) + if err != nil { + return err + } + ddlAction = row["ddl_action"].ToString() + } + } + isView := row.AsBool("is_view", false) + isImmediate, err := e.reviewImmediateOperations(ctx, capableOf, onlineDDL, ddlAction, isRevert, isView) + if err != nil { + return err + } + if isImmediate { + if err := e.updateMigrationSetImmediateOperation(ctx, onlineDDL.UUID); err != nil { + return err + } + } + // Find conditions where the migration cannot take place: + switch onlineDDL.Strategy { + case schema.DDLStrategyMySQL: + strategySetting := onlineDDL.StrategySetting() + if strategySetting.IsPostponeCompletion() { + return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "--postpone-completion not supported in 'mysql' strategy") + } + if strategySetting.IsAllowZeroInDateFlag() { + return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "--allow-zero-in-date not supported in 'mysql' strategy") + } + } + + // The review is complete. We've backfilled details on the migration row. We mark + // the migration as having been reviewed. The function scheduleNextMigration() will then + // have access to this row. + if err := e.updateMigrationTimestamp(ctx, "reviewed_timestamp", uuid); err != nil { + return err + } + return nil +} + // reviewQueuedMigrations iterates through queued migrations and sees if any information needs to be updated. // The function analyzes the queued migration and fills in some blanks: // - If this is a REVERT migration, what table is affected? What's the operation? @@ -2332,57 +2390,9 @@ func (e *Executor) reviewQueuedMigrations(ctx context.Context) error { for _, uuidRow := range r.Named().Rows { uuid := uuidRow["migration_uuid"].ToString() - onlineDDL, row, err := e.readMigration(ctx, uuid) - if err != nil { - return err + if err := e.reviewQueuedMigration(ctx, uuid, capableOf); err != nil { + e.failMigration(ctx, &schema.OnlineDDL{UUID: uuid}, err) } - // handle REVERT migrations: populate table name and update ddl action and is_view: - ddlAction := row["ddl_action"].ToString() - isRevert := false - if ddlAction == schema.RevertActionStr { - isRevert = true - rowModified, err := e.reviewEmptyTableRevertMigrations(ctx, onlineDDL) - if err != nil { - return err - } - if rowModified { - // re-read migration and entire row - onlineDDL, row, err = e.readMigration(ctx, uuid) - if err != nil { - return err - } - ddlAction = row["ddl_action"].ToString() - } - } - isView := row.AsBool("is_view", false) - isImmediate, err := e.reviewImmediateOperations(ctx, capableOf, onlineDDL, ddlAction, isRevert, isView) - if err != nil { - return err - } - if isImmediate { - if err := e.updateMigrationSetImmediateOperation(ctx, onlineDDL.UUID); err != nil { - return err - } - } - // Find conditions where the migration cannot take place: - switch onlineDDL.Strategy { - case schema.DDLStrategyMySQL: - strategySetting := onlineDDL.StrategySetting() - if strategySetting.IsPostponeCompletion() { - e.failMigration(ctx, onlineDDL, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "--postpone-completion not supported in 'mysql' strategy")) - } - if strategySetting.IsAllowZeroInDateFlag() { - e.failMigration(ctx, onlineDDL, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "--allow-zero-in-date not supported in 'mysql' strategy")) - } - } - - // The review is complete. We've backfilled details on the migration row. We mark - // the migration as having been reviewed. The function scheduleNextMigration() will then - // have access to this row. - if err := e.updateMigrationTimestamp(ctx, "reviewed_timestamp", uuid); err != nil { - return err - } - } return nil } diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index 3262dca9bc6..d38b18e33bf 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -259,12 +259,14 @@ func (ct *controller) runBlp(ctx context.Context) (err error) { err = vr.Replicate(ctx) ct.lastWorkflowError.Record(err) - // If this is a mysql error that we know needs manual intervention OR - // we cannot identify this as non-recoverable, but it has persisted - // beyond the retry limit (maxTimeToRetryError). - // In addition, we cannot restart a workflow started with AtomicCopy which has _any_ error. + // If this is a MySQL error that we know needs manual intervention or + // it's a FAILED_PRECONDITION vterror, OR we cannot identify this as + // non-recoverable BUT it has persisted beyond the retry limit + // (maxTimeToRetryError). In addition, we cannot restart a workflow + // started with AtomicCopy which has _any_ error. if (err != nil && vr.WorkflowSubType == int32(binlogdatapb.VReplicationWorkflowSubType_AtomicCopy)) || - isUnrecoverableError(err) || !ct.lastWorkflowError.ShouldRetry() { + isUnrecoverableError(err) || + !ct.lastWorkflowError.ShouldRetry() { log.Errorf("vreplication stream %d going into error state due to %+v", ct.id, err) if errSetState := vr.setState(binlogdatapb.VReplicationWorkflowState_Error, err.Error()); errSetState != nil { diff --git a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go index da1b4dfc2f3..d94d0640529 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go +++ b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go @@ -26,10 +26,13 @@ import ( "vitess.io/vitess/go/textutil" "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/key" - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" - querypb "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vterrors" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) // This file contains just the builders for ReplicatorPlan and TablePlan. @@ -629,7 +632,7 @@ func (tpb *tablePlanBuilder) analyzeExtraSourcePkCols(colInfos []*ColumnInfo, so if !col.IsGenerated { // We shouldn't get here in any normal scenario. If a column is part of colInfos, // then it must also exist in tpb.colExprs. - return fmt.Errorf("column %s not found in table expressions", col.Name) + return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "column %s not found in table expressions", col.Name) } } } diff --git a/go/vt/vttablet/tabletmanager/vreplication/utils.go b/go/vt/vttablet/tabletmanager/vreplication/utils.go index 42aa4351647..21c3a61c9f1 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/utils.go +++ b/go/vt/vttablet/tabletmanager/vreplication/utils.go @@ -26,6 +26,9 @@ import ( "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vterrors" + + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) const ( @@ -123,6 +126,9 @@ func isUnrecoverableError(err error) bool { if err == nil { return false } + if vterrors.Code(err) == vtrpcpb.Code_FAILED_PRECONDITION { + return true + } sqlErr, isSQLErr := sqlerror.NewSQLErrorFromError(err).(*sqlerror.SQLError) if !isSQLErr { return false diff --git a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go index c9bb0121571..30fbfdb7a01 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go +++ b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go @@ -23,23 +23,20 @@ import ( "strconv" "strings" - "vitess.io/vitess/go/mysql/collations" - "vitess.io/vitess/go/vt/vtgate/evalengine" - - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" - "vitess.io/vitess/go/vt/vterrors" - - "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/key" + "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate/evalengine" "vitess.io/vitess/go/vt/vtgate/vindexes" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) // Plan represents the plan for a table. @@ -865,5 +862,5 @@ func findColumn(ti *Table, name sqlparser.IdentifierCI) (int, error) { return i, nil } } - return 0, fmt.Errorf("column %s not found in table %s", sqlparser.String(name), ti.Name) + return 0, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "column %s not found in table %s", sqlparser.String(name), ti.Name) }