Skip to content

Commit

Permalink
Some cleanup. Attempt to fix flaky throttler test: throttle-app-customer
Browse files Browse the repository at this point in the history
Signed-off-by: Rohit Nayak <rohit@planetscale.com>
  • Loading branch information
rohit-nayak-ps committed Nov 8, 2024
1 parent 405c70f commit 779de8c
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 32 deletions.
1 change: 0 additions & 1 deletion go/test/endtoend/vreplication/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,6 @@ func (vc *VitessCluster) CleanupDataroot(t *testing.T, recreate bool) {
// https://docs.github.com/en/actions/learn-github-actions/variables#default-environment-variables
ci, ok := os.LookupEnv("CI")
if !ok || strings.ToLower(ci) != "true" {
fmt.Println("Not running in CI, skipping cleanup")
// Leave the directory in place to support local debugging.
return
}
Expand Down
12 changes: 6 additions & 6 deletions go/test/endtoend/vreplication/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,12 @@ create table nopk (name varchar(128), age int unsigned);
},
"admins": {
"column_vindexes": [
{
"column": "team_id",
"name": "reverse_bits"
}
]
},
{
"column": "team_id",
"name": "reverse_bits"
}
]
},
"enterprise_customer": {
"column_vindexes": [
{
Expand Down
8 changes: 7 additions & 1 deletion go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1259,12 +1259,18 @@ func materializeProduct(t *testing.T, useVtctldClient bool) {
// Now, throttle vreplication on the target side (vplayer), and insert some
// more rows.
for _, tab := range customerTablets {
{
body, err := unthrottleApp(tab, sourceThrottlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, sourceThrottlerAppName)
waitForTabletThrottlingStatus(t, tab, sourceThrottlerAppName, throttlerStatusNotThrottled)
}
body, err := throttleApp(tab, targetThrottlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, targetThrottlerAppName)
// Wait for throttling to take effect (caching will expire by this time):
waitForTabletThrottlingStatus(t, tab, targetThrottlerAppName, throttlerStatusThrottled)
waitForTabletThrottlingStatus(t, tab, sourceThrottlerAppName, throttlerStatusNotThrottled)
waitForTabletThrottlingStatus(t, tab, targetThrottlerAppName, throttlerStatusThrottled)
}
insertMoreProductsForTargetThrottler(t)
// To be fair to the test, we give the target time to apply the new changes.
Expand Down
24 changes: 2 additions & 22 deletions go/vt/vttablet/tabletmanager/vreplication/vdbclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,18 +151,7 @@ func (vc *vdbClient) AddQueryToTrxBatch(query string) error {
return nil
}

func (vc *vdbClient) PopLastQueryFromBatch() error {
if !vc.InTransaction {
return vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "cannot pop query outside of a transaction")
}
if vc.batchSize > 0 {
vc.batchSize -= 1
vc.queries = vc.queries[:len(vc.queries)-1]
}
return nil
}

// ExecuteQueryBatch sends the transaction's current batch of queries
// ExecuteTrxQueryBatch sends the transaction's current batch of queries
// down the wire to the database.
func (vc *vdbClient) ExecuteTrxQueryBatch() ([]*sqltypes.Result, error) {
defer vc.stats.Timings.Record(binlogplayer.BlplMultiQuery, time.Now())
Expand All @@ -184,16 +173,7 @@ func (vc *vdbClient) Execute(query string) (*sqltypes.Result, error) {
return vc.ExecuteFetch(query, vc.relayLogMaxItems)
}

func (vc *vdbClient) IsRetryable(err error) bool {
if sqlErr, ok := err.(*sqlerror.SQLError); ok {
return sqlErr.Number() == sqlerror.ERDupEntry
}
return false
}

func (vc *vdbClient) ExecuteWithRetry(ctx context.Context, query string) (*sqltypes.Result, error) {
ctx2, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
qr, err := vc.Execute(query)
for err != nil {
var sqlErr *sqlerror.SQLError
Expand All @@ -206,7 +186,7 @@ func (vc *vdbClient) ExecuteWithRetry(ctx context.Context, query string) (*sqlty
time.Sleep(dbLockRetryDelay)
// Check context here. Otherwise this can become an infinite loop.
select {
case <-ctx2.Done():
case <-ctx.Done():
return nil, io.EOF
default:
}
Expand Down
5 changes: 3 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,9 @@ func (vp *vplayer) executeWithRetryAndBackoff(ctx context.Context, query string)
for {
i++
query = fmt.Sprintf("%s /* backoff:: %d */", origQuery, i)
qr, err := vp.vr.dbClient.ExecuteWithRetry(ctx, query)
log.Flush()
shorterCtx, cancel2 := context.WithTimeout(shortCtx, time.Duration(backoffSeconds)*time.Second)
defer cancel2()
qr, err := vp.vr.dbClient.ExecuteWithRetry(shorterCtx, query)
if err == nil {
vp.vr.dbClient.Commit()
return qr, nil
Expand Down

0 comments on commit 779de8c

Please sign in to comment.