Skip to content

Commit

Permalink
ApplySchema: special handling of 'ALTER VITESS_MIGRATION ... [UN]THRO…
Browse files Browse the repository at this point in the history
…TTLE' command

Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
  • Loading branch information
shlomi-noach committed May 30, 2024
1 parent 3a016d2 commit 8a18b88
Showing 1 changed file with 94 additions and 6 deletions.
100 changes: 94 additions & 6 deletions go/vt/schemamanager/tablet_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,30 @@ package schemamanager
import (
"context"
"fmt"
"strconv"
"strings"
"sync"
"time"

"golang.org/x/sync/semaphore"

"vitess.io/vitess/go/protoutil"
"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vtctl/schematools"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"
"vitess.io/vitess/go/vt/vttablet/tmclient"

querypb "vitess.io/vitess/go/vt/proto/query"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/proto/vtrpc"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

// TabletExecutor applies schema changes to all tablets.
Expand Down Expand Up @@ -146,7 +151,7 @@ func (exec *TabletExecutor) parseDDLs(sqls []string) error {
for _, sql := range sqls {
stmt, err := exec.parser.Parse(sql)
if err != nil {
return vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "failed to parse sql: %s, got error: %v", sql, err)
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "failed to parse sql: %s, got error: %v", sql, err)
}
switch stmt.(type) {
case sqlparser.DDLStatement:
Expand All @@ -155,7 +160,7 @@ func (exec *TabletExecutor) parseDDLs(sqls []string) error {
case *sqlparser.AlterMigration:
default:
if len(exec.tablets) != 1 {
return vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "non-ddl statements can only be executed for single shard keyspaces: %s", sql)
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "non-ddl statements can only be executed for single shard keyspaces: %s", sql)
}
}
}
Expand Down Expand Up @@ -190,6 +195,76 @@ func (exec *TabletExecutor) isOnlineSchemaDDL(stmt sqlparser.Statement) (isOnlin
return false
}

func validateThrottleParams(alterMigrationType sqlparser.AlterMigrationType, expireString string, ratioLiteral *sqlparser.Literal) (duration time.Duration, ratio float64, err error) {
switch alterMigrationType {
case sqlparser.UnthrottleMigrationType,
sqlparser.UnthrottleAllMigrationType:
// Unthrottling is like throttling with duration=0
duration = 0
default:
duration = throttle.DefaultAppThrottleDuration
if expireString != "" {
duration, err = time.ParseDuration(expireString)
if err != nil || duration < 0 {
return duration, ratio, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid EXPIRE value: %s. Try '120s', '30m', '1h', etc. Allowed units are (s)ec, (m)in, (h)hour", expireString)
}
}
}
ratio = 1.0
if ratioLiteral != nil {
ratio, err = strconv.ParseFloat(ratioLiteral.Val, 64)
if err != nil || ratio < 0 || ratio > 1 {
return duration, ratio, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid RATIO value: %s. Try any decimal number between '0.0' (no throttle) and `1.0` (fully throttled)", ratioLiteral.Val)
}
}
return duration, ratio, nil
}

func (exec *TabletExecutor) executeAlterMigrationThrottle(ctx context.Context, alterMigration *sqlparser.AlterMigration) (err error) {
duration, ratio, err := validateThrottleParams(alterMigration.Type, alterMigration.Expire, alterMigration.Ratio)
if err != nil {
return err
}
expireAt := time.Now().Add(duration)
appName := alterMigration.UUID
if appName == "" {
appName = throttlerapp.OnlineDDLName.String()
}
throttledAppRule := &topodatapb.ThrottledAppRule{
Name: appName,
ExpiresAt: protoutil.TimeToProto(expireAt),
Ratio: ratio,
}

req := &vtctldatapb.UpdateThrottlerConfigRequest{
Keyspace: exec.keyspace,
ThrottledApp: throttledAppRule,
}

update := func(throttlerConfig *topodatapb.ThrottlerConfig) *topodatapb.ThrottlerConfig {
if throttlerConfig == nil {
throttlerConfig = &topodatapb.ThrottlerConfig{}
}
if throttlerConfig.ThrottledApps == nil {
throttlerConfig.ThrottledApps = make(map[string]*topodatapb.ThrottledAppRule)
}
throttlerConfig.ThrottledApps[req.ThrottledApp.Name] = req.ThrottledApp
return throttlerConfig
}
// We have already locked the keyspace
ki, err := exec.ts.GetKeyspace(ctx, req.Keyspace)
if err != nil {
return err
}
ki.ThrottlerConfig = update(ki.ThrottlerConfig)
err = exec.ts.UpdateKeyspace(ctx, ki)
if err != nil {
return err
}
_, err = exec.ts.UpdateSrvKeyspaceThrottlerConfig(ctx, req.Keyspace, []string{}, update)
return err
}

// executeSQL executes a single SQL statement either as online DDL or synchronously on all tablets.
// In online DDL case, the query may be exploded into multiple queries during
func (exec *TabletExecutor) executeSQL(ctx context.Context, sql string, providedUUID string, execResult *ExecuteResult) (executedAsynchronously bool, err error) {
Expand Down Expand Up @@ -235,8 +310,21 @@ func (exec *TabletExecutor) executeSQL(ctx context.Context, sql string, provided
exec.logger.Printf("%s\n", onlineDDL.UUID)
return true, nil
case *sqlparser.AlterMigration:
exec.executeOnAllTablets(ctx, execResult, sql, true)
return true, nil
switch stmt.Type {
case sqlparser.ThrottleMigrationType,
sqlparser.ThrottleAllMigrationType,
sqlparser.UnthrottleMigrationType,
sqlparser.UnthrottleAllMigrationType:
err := exec.executeAlterMigrationThrottle(ctx, stmt)
if err != nil {
execResult.ExecutorErr = err.Error()
return false, err
}
return true, nil
default:
exec.executeOnAllTablets(ctx, execResult, sql, true)
return true, nil
}
}
// Got here? The statement needs to be executed directly.
return executeViaFetch()
Expand Down Expand Up @@ -267,7 +355,7 @@ func allSQLsAreCreateQueries(sqls []string, parser *sqlparser.Parser) (bool, err
for _, sql := range sqls {
stmt, err := parser.Parse(sql)
if err != nil {
return false, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "failed to parse sql: %s, got error: %v", sql, err)
return false, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "failed to parse sql: %s, got error: %v", sql, err)
}
switch stmt.(type) {
case *sqlparser.CreateTable, *sqlparser.CreateView:
Expand Down

0 comments on commit 8a18b88

Please sign in to comment.