diff --git a/go/vt/schemamanager/tablet_executor.go b/go/vt/schemamanager/tablet_executor.go index 592c64e7073..5397cf9343f 100644 --- a/go/vt/schemamanager/tablet_executor.go +++ b/go/vt/schemamanager/tablet_executor.go @@ -19,12 +19,14 @@ package schemamanager import ( "context" "fmt" + "strconv" "strings" "sync" "time" "golang.org/x/sync/semaphore" + "vitess.io/vitess/go/protoutil" "vitess.io/vitess/go/timer" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/schema" @@ -32,12 +34,15 @@ import ( "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vtctl/schematools" "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" "vitess.io/vitess/go/vt/vttablet/tmclient" querypb "vitess.io/vitess/go/vt/proto/query" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" - "vitess.io/vitess/go/vt/proto/vtrpc" + vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) // TabletExecutor applies schema changes to all tablets. @@ -146,7 +151,7 @@ func (exec *TabletExecutor) parseDDLs(sqls []string) error { for _, sql := range sqls { stmt, err := exec.parser.Parse(sql) if err != nil { - return vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "failed to parse sql: %s, got error: %v", sql, err) + return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "failed to parse sql: %s, got error: %v", sql, err) } switch stmt.(type) { case sqlparser.DDLStatement: @@ -155,7 +160,7 @@ func (exec *TabletExecutor) parseDDLs(sqls []string) error { case *sqlparser.AlterMigration: default: if len(exec.tablets) != 1 { - return vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "non-ddl statements can only be executed for single shard keyspaces: %s", sql) + return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "non-ddl statements can only be executed for single shard keyspaces: %s", sql) } } } @@ -190,6 +195,76 @@ func (exec *TabletExecutor) isOnlineSchemaDDL(stmt sqlparser.Statement) (isOnlin return false } +func validateThrottleParams(alterMigrationType sqlparser.AlterMigrationType, expireString string, ratioLiteral *sqlparser.Literal) (duration time.Duration, ratio float64, err error) { + switch alterMigrationType { + case sqlparser.UnthrottleMigrationType, + sqlparser.UnthrottleAllMigrationType: + // Unthrottling is like throttling with duration=0 + duration = 0 + default: + duration = throttle.DefaultAppThrottleDuration + if expireString != "" { + duration, err = time.ParseDuration(expireString) + if err != nil || duration < 0 { + return duration, ratio, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid EXPIRE value: %s. Try '120s', '30m', '1h', etc. Allowed units are (s)ec, (m)in, (h)hour", expireString) + } + } + } + ratio = 1.0 + if ratioLiteral != nil { + ratio, err = strconv.ParseFloat(ratioLiteral.Val, 64) + if err != nil || ratio < 0 || ratio > 1 { + return duration, ratio, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid RATIO value: %s. Try any decimal number between '0.0' (no throttle) and `1.0` (fully throttled)", ratioLiteral.Val) + } + } + return duration, ratio, nil +} + +func (exec *TabletExecutor) executeAlterMigrationThrottle(ctx context.Context, alterMigration *sqlparser.AlterMigration) (err error) { + duration, ratio, err := validateThrottleParams(alterMigration.Type, alterMigration.Expire, alterMigration.Ratio) + if err != nil { + return err + } + expireAt := time.Now().Add(duration) + appName := alterMigration.UUID + if appName == "" { + appName = throttlerapp.OnlineDDLName.String() + } + throttledAppRule := &topodatapb.ThrottledAppRule{ + Name: appName, + ExpiresAt: protoutil.TimeToProto(expireAt), + Ratio: ratio, + } + + req := &vtctldatapb.UpdateThrottlerConfigRequest{ + Keyspace: exec.keyspace, + ThrottledApp: throttledAppRule, + } + + update := func(throttlerConfig *topodatapb.ThrottlerConfig) *topodatapb.ThrottlerConfig { + if throttlerConfig == nil { + throttlerConfig = &topodatapb.ThrottlerConfig{} + } + if throttlerConfig.ThrottledApps == nil { + throttlerConfig.ThrottledApps = make(map[string]*topodatapb.ThrottledAppRule) + } + throttlerConfig.ThrottledApps[req.ThrottledApp.Name] = req.ThrottledApp + return throttlerConfig + } + // We have already locked the keyspace + ki, err := exec.ts.GetKeyspace(ctx, req.Keyspace) + if err != nil { + return err + } + ki.ThrottlerConfig = update(ki.ThrottlerConfig) + err = exec.ts.UpdateKeyspace(ctx, ki) + if err != nil { + return err + } + _, err = exec.ts.UpdateSrvKeyspaceThrottlerConfig(ctx, req.Keyspace, []string{}, update) + return err +} + // executeSQL executes a single SQL statement either as online DDL or synchronously on all tablets. // In online DDL case, the query may be exploded into multiple queries during func (exec *TabletExecutor) executeSQL(ctx context.Context, sql string, providedUUID string, execResult *ExecuteResult) (executedAsynchronously bool, err error) { @@ -235,8 +310,21 @@ func (exec *TabletExecutor) executeSQL(ctx context.Context, sql string, provided exec.logger.Printf("%s\n", onlineDDL.UUID) return true, nil case *sqlparser.AlterMigration: - exec.executeOnAllTablets(ctx, execResult, sql, true) - return true, nil + switch stmt.Type { + case sqlparser.ThrottleMigrationType, + sqlparser.ThrottleAllMigrationType, + sqlparser.UnthrottleMigrationType, + sqlparser.UnthrottleAllMigrationType: + err := exec.executeAlterMigrationThrottle(ctx, stmt) + if err != nil { + execResult.ExecutorErr = err.Error() + return false, err + } + return true, nil + default: + exec.executeOnAllTablets(ctx, execResult, sql, true) + return true, nil + } } // Got here? The statement needs to be executed directly. return executeViaFetch() @@ -267,7 +355,7 @@ func allSQLsAreCreateQueries(sqls []string, parser *sqlparser.Parser) (bool, err for _, sql := range sqls { stmt, err := parser.Parse(sql) if err != nil { - return false, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "failed to parse sql: %s, got error: %v", sql, err) + return false, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "failed to parse sql: %s, got error: %v", sql, err) } switch stmt.(type) { case *sqlparser.CreateTable, *sqlparser.CreateView: