diff --git a/go/test/endtoend/vreplication/cluster_test.go b/go/test/endtoend/vreplication/cluster_test.go index 13268fc749c..923498a70f2 100644 --- a/go/test/endtoend/vreplication/cluster_test.go +++ b/go/test/endtoend/vreplication/cluster_test.go @@ -441,6 +441,7 @@ func (vc *VitessCluster) CleanupDataroot(t *testing.T, recreate bool) { // https://docs.github.com/en/actions/learn-github-actions/variables#default-environment-variables ci, ok := os.LookupEnv("CI") if !ok || strings.ToLower(ci) != "true" { + fmt.Println("Not running in CI, skipping cleanup") // Leave the directory in place to support local debugging. return } diff --git a/go/test/endtoend/vreplication/config_test.go b/go/test/endtoend/vreplication/config_test.go index 25a4b734259..41e2211f082 100644 --- a/go/test/endtoend/vreplication/config_test.go +++ b/go/test/endtoend/vreplication/config_test.go @@ -69,6 +69,7 @@ create table `+"`blüb_tbl`"+` (id int, val1 varchar(20), `+"`blöb1`"+` blob, create table reftable (id int, val1 varchar(20), primary key(id), key(val1)); create table loadtest (id int, name varchar(256), primary key(id), key(name)); create table nopk (name varchar(128), age int unsigned); + create table admins(team_id int, email varchar(128), val varchar(256), primary key(team_id), unique key(email)); `, strings.Join(customerTypes, ",")) // These should always be ignored in vreplication internalSchema = ` @@ -85,6 +86,7 @@ create table nopk (name varchar(128), age int unsigned); "tables": { "product": {}, "merchant": {}, + "admins": {}, "orders": {}, "loadtest": {}, "customer": {}, @@ -158,8 +160,16 @@ create table nopk (name varchar(128), age int unsigned); } ] }, - "enterprise_customer": { + "admins": { "column_vindexes": [ + { + "column": "team_id", + "name": "reverse_bits" + } + ] + }, + "enterprise_customer": { + "column_vindexes": [ { "column": "cid", "name": "xxhash" diff --git a/go/test/endtoend/vreplication/helper_test.go b/go/test/endtoend/vreplication/helper_test.go index 44c35d0acea..766d3f8bdc9 100644 --- a/go/test/endtoend/vreplication/helper_test.go +++ b/go/test/endtoend/vreplication/helper_test.go @@ -288,6 +288,27 @@ func waitForRowCountInTablet(t *testing.T, vttablet *cluster.VttabletProcess, da } } +// Wait for the data fetched by the query from the specified tablet and database to match the expected result. +func waitForResult(t *testing.T, vttablet *cluster.VttabletProcess, database string, query string, want string, timeout time.Duration) { + timer := time.NewTimer(timeout) + defer timer.Stop() + for { + qr, err := vttablet.QueryTablet(query, database, true) + require.NoError(t, err) + require.NotNil(t, qr) + if want == fmt.Sprintf("%v", qr.Rows) { + return + } + select { + case <-timer.C: + require.FailNow(t, fmt.Sprintf("query %q did not reach the expected result (%s) on tablet %q before the timeout of %s; last seen result: %s", + query, want, vttablet.Name, timeout, qr.Rows)) + default: + time.Sleep(defaultTick) + } + } +} + // waitForSequenceValue queries the provided sequence name in the // provided database using the provided vtgate connection until // we get a next value from it. This allows us to move forward diff --git a/go/test/endtoend/vreplication/unsharded_init_data.sql b/go/test/endtoend/vreplication/unsharded_init_data.sql index 8af0cab6608..e7142b04927 100644 --- a/go/test/endtoend/vreplication/unsharded_init_data.sql +++ b/go/test/endtoend/vreplication/unsharded_init_data.sql @@ -50,3 +50,6 @@ insert into reftable (id, val1) values (2, 'b') insert into reftable (id, val1) values (3, 'c') insert into reftable (id, val1) values (4, 'd') insert into reftable (id, val1) values (5, 'e') + +insert into admins(team_id, email, val) values(1, 'a@example.com', 'ibis-1') +insert into admins(team_id, email, val) values(2, 'b@example.com', 'ibis-2') diff --git a/go/test/endtoend/vreplication/workflow_duplicate_key_backoff_test.go b/go/test/endtoend/vreplication/workflow_duplicate_key_backoff_test.go new file mode 100644 index 00000000000..81c2bb3a905 --- /dev/null +++ b/go/test/endtoend/vreplication/workflow_duplicate_key_backoff_test.go @@ -0,0 +1,86 @@ +package vreplication + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/test/endtoend/throttler" + "vitess.io/vitess/go/vt/log" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" + vttablet "vitess.io/vitess/go/vt/vttablet/common" +) + +func TestWorkflowDuplicateKeyBackoff(t *testing.T) { + t.Run("TestWorkflowDuplicateKeyBackoff with batching off", func(t *testing.T) { + testWorkflowDuplicateKeyBackoff(t, false) + }) + t.Run("TestWorkflowDuplicateKeyBackoff with batching on", func(t *testing.T) { + testWorkflowDuplicateKeyBackoff(t, true) + }) +} + +func testWorkflowDuplicateKeyBackoff(t *testing.T, setExperimentalFlags bool) { + debugMode = false + setSidecarDBName("_vt") + origDefaultRdonly := defaultRdonly + origDefailtReplica := defaultReplicas + defer func() { + defaultRdonly = origDefaultRdonly + defaultReplicas = origDefailtReplica + }() + defaultRdonly = 0 + defaultReplicas = 0 + if setExperimentalFlags { + setAllVTTabletExperimentalFlags() + } + + setupMinimalCluster(t) + vttablet.InitVReplicationConfigDefaults() + defer vc.TearDown() + + sourceKeyspaceName := "product" + targetKeyspaceName := "customer" + workflowName := "wf1" + targetTabs := setupMinimalCustomerKeyspace(t) + _ = targetTabs + tables := "customer,admins" + + req := &vtctldatapb.UpdateThrottlerConfigRequest{ + Enable: false, + } + res, err := throttler.UpdateThrottlerTopoConfigRaw(vc.VtctldClient, "customer", req, nil, nil) + require.NoError(t, err, res) + res, err = throttler.UpdateThrottlerTopoConfigRaw(vc.VtctldClient, "product", req, nil, nil) + require.NoError(t, err, res) + + mt := createMoveTables(t, sourceKeyspaceName, targetKeyspaceName, workflowName, tables, nil, nil, nil) + waitForWorkflowState(t, vc, "customer.wf1", binlogdatapb.VReplicationWorkflowState_Running.String()) + mt.SwitchReadsAndWrites() + vtgateConn, cancel := getVTGateConn() + defer cancel() + + // team_id 1 => 80-, team_id 2 => -80 + queries := []string{ + "update admins set email = null, val = 'ibis-3' where team_id = 2", // -80 + "update admins set email = 'b@example.com', val = 'ibis-4' where team_id = 1", // 80- + "update admins set email = 'a@example.com', val = 'ibis-5' where team_id = 2", // -80 + } + + vc.VtctlClient.ExecuteCommandWithOutput("VReplicationExec", "zone1-100", "update _vt.vreplication set state = 'Stopped' where id = 1") //-80 + for _, query := range queries { + execVtgateQuery(t, vtgateConn, targetKeyspaceName, query) + } + // Since -80 is stopped the "update admins set email = 'b@example.com' where team_id = 1" will fail with duplicate key + // since it is already set for team_id = 2 + // The vplayer stream for -80 should backoff with the new logic and retry should be successful once the -80 stream is restarted + time.Sleep(2 * time.Second) // fixme: add check that the table has the expected data after the inserts + vc.VtctlClient.ExecuteCommandWithOutput("VReplicationExec", "zone1-100", "update _vt.vreplication set state = 'Running' where id = 1") + //time.Sleep(5 * time.Second) + productTab := vc.Cells["zone1"].Keyspaces[sourceKeyspaceName].Shards["0"].Tablets["zone1-100"].Vttablet + waitForResult(t, productTab, "product", "select * from admins order by team_id", + "[[INT32(1) VARCHAR(\"b@example.com\") VARCHAR(\"ibis-4\")] [INT32(2) VARCHAR(\"a@example.com\") VARCHAR(\"ibis-5\")]]", 30*time.Second) + log.Infof("TestWorkflowDuplicateKeyBackoff passed") +} diff --git a/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go b/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go index b8339cdf874..d52ac306c39 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go @@ -18,6 +18,7 @@ package vreplication import ( "context" + "errors" "io" "strings" "time" @@ -102,6 +103,10 @@ func (vc *vdbClient) CommitTrxQueryBatch() error { return nil } +func (vc *vdbClient) GetQueries() []string { + return vc.queries +} + func (vc *vdbClient) Rollback() error { if !vc.InTransaction { return nil @@ -146,6 +151,17 @@ func (vc *vdbClient) AddQueryToTrxBatch(query string) error { return nil } +func (vc *vdbClient) PopLastQueryFromBatch() error { + if !vc.InTransaction { + return vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "cannot pop query outside of a transaction") + } + if vc.batchSize > 0 { + vc.batchSize -= 1 + vc.queries = vc.queries[:len(vc.queries)-1] + } + return nil +} + // ExecuteQueryBatch sends the transaction's current batch of queries // down the wire to the database. func (vc *vdbClient) ExecuteTrxQueryBatch() ([]*sqltypes.Result, error) { @@ -168,10 +184,21 @@ func (vc *vdbClient) Execute(query string) (*sqltypes.Result, error) { return vc.ExecuteFetch(query, vc.relayLogMaxItems) } +func (vc *vdbClient) IsRetryable(err error) bool { + if sqlErr, ok := err.(*sqlerror.SQLError); ok { + return sqlErr.Number() == sqlerror.ERDupEntry + } + return false +} + func (vc *vdbClient) ExecuteWithRetry(ctx context.Context, query string) (*sqltypes.Result, error) { + ctx2, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() qr, err := vc.Execute(query) for err != nil { - if sqlErr, ok := err.(*sqlerror.SQLError); ok && sqlErr.Number() == sqlerror.ERLockDeadlock || sqlErr.Number() == sqlerror.ERLockWaitTimeout { + var sqlErr *sqlerror.SQLError + if errors.As(err, &sqlErr) && + sqlErr.Number() == sqlerror.ERLockDeadlock || sqlErr.Number() == sqlerror.ERLockWaitTimeout { log.Infof("retryable error: %v, waiting for %v and retrying", sqlErr, dbLockRetryDelay) if err := vc.Rollback(); err != nil { return nil, err @@ -179,7 +206,7 @@ func (vc *vdbClient) ExecuteWithRetry(ctx context.Context, query string) (*sqlty time.Sleep(dbLockRetryDelay) // Check context here. Otherwise this can become an infinite loop. select { - case <-ctx.Done(): + case <-ctx2.Done(): return nil, io.EOF default: } diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index db2f3f341ac..25696c05afd 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -26,6 +26,9 @@ import ( "strings" "time" + "vitess.io/vitess/go/mysql/sqlerror" + "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/binlog/binlogplayer" @@ -60,6 +63,8 @@ type vplayer struct { replicatorPlan *ReplicatorPlan tablePlans map[string]*TablePlan + ctx context.Context + // These are set when creating the VPlayer based on whether the VPlayer // is in batch (stmt and trx) execution mode or not. query func(ctx context.Context, sql string) (*sqltypes.Result, error) @@ -99,6 +104,11 @@ type vplayer struct { // foreignKeyChecksStateInitialized is set to true once we have initialized the foreignKeyChecksEnabled. // The initialization is done on the first row event that this vplayer sees. foreignKeyChecksStateInitialized bool + + hasSkippedCommit bool + isMergeWorkflow bool + dontSkipCommits bool + inBackoff bool } // NoForeignKeyCheckFlagBitmask is the bitmask for the 2nd bit (least significant) of the flags in a binlog row event. @@ -125,17 +135,33 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map settings.StopPos = pausePos saveStop = false } + batchMode := false + if vr.workflowConfig.ExperimentalFlags&vttablet.VReplicationExperimentalFlagVPlayerBatching != 0 { + batchMode = true + } + + vp := &vplayer{ + vr: vr, + startPos: settings.StartPos, + pos: settings.StartPos, + stopPos: settings.StopPos, + saveStop: saveStop, + copyState: copyState, + timeLastSaved: time.Now(), + tablePlans: make(map[string]*TablePlan), + phase: phase, + throttlerAppName: throttlerapp.VPlayerName.ConcatenateString(vr.throttlerAppName()), + batchMode: batchMode, + isMergeWorkflow: true, + } queryFunc := func(ctx context.Context, sql string) (*sqltypes.Result, error) { return vr.dbClient.ExecuteWithRetry(ctx, sql) } commitFunc := func() error { + log.Infof("Commit func: %v", vr.dbClient.InTransaction) return vr.dbClient.Commit() } - batchMode := false - if vr.workflowConfig.ExperimentalFlags&vttablet.VReplicationExperimentalFlagVPlayerBatching != 0 { - batchMode = true - } if batchMode { // relayLogMaxSize is effectively the limit used when not batching. maxAllowedPacket := int64(vr.workflowConfig.RelayLogMaxSize) @@ -160,29 +186,162 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map } return nil, vr.dbClient.AddQueryToTrxBatch(sql) // Should become part of the trx batch } + unrollBatch := func() error { + batchedQueries := vr.dbClient.GetQueries() + if len(batchedQueries) == 0 { + return nil + } + for _, query := range batchedQueries { + log.Infof("Unrolling batch: exec %v", query) + + _, err := vr.dbClient.Execute(query) + if err != nil { + log.Infof("Unrolling batch: failed to exec %v: %v", query, err) + if vp.mustBackoff(err) { + log.Infof("Unrolling batch: backoff needed for query: %v", query) + if vp.hasSkippedCommit { + log.Infof("Unrolling batch: found skipped Commit, issuing a commit before retrying the query: %v", query) + if err := vr.dbClient.Commit(); err != nil { + return err + } + if err := vr.dbClient.Begin(); err != nil { + return err + } + } + _, err2 := vp.backoffAndRetry(vp.ctx, query) + if err2 != nil { + return err2 + } + } + } else { + log.Infof("Unrolling batch: exec %v succeeded", query) + } + } + return vr.dbClient.Commit() + } commitFunc = func() error { + log.Infof("Batch Commit func: In Transaction %v", vr.dbClient.InTransaction) + if vp.inBackoff { + // We get into backoff when there is a ERDupQuery error. So one of the queries in the batch is + // causing the issue. We need to run all queries until that one first and then backoff/retry that one + return unrollBatch() + } return vr.dbClient.CommitTrxQueryBatch() // Commit the current trx batch } vr.dbClient.maxBatchSize = maxAllowedPacket } - return &vplayer{ - vr: vr, - startPos: settings.StartPos, - pos: settings.StartPos, - stopPos: settings.StopPos, - saveStop: saveStop, - copyState: copyState, - timeLastSaved: time.Now(), - tablePlans: make(map[string]*TablePlan), - phase: phase, - throttlerAppName: throttlerapp.VPlayerName.ConcatenateString(vr.throttlerAppName()), - query: queryFunc, - commit: commitFunc, - batchMode: batchMode, + wrappedCommitFunc := func() error { + vp.hasSkippedCommit = false + err := commitFunc() + if !vp.batchMode { + return err + } + vp.inBackoff = true + defer func() { + vp.inBackoff = false + }() + log.Infof("In backoff in wrapped commit func for batch mode, batched queries: %v", vp.vr.dbClient.GetQueries()) + return commitFunc() + } + vp.commit = wrappedCommitFunc + + wrappedQueryFunc := func(ctx context.Context, sql string) (*sqltypes.Result, error) { + result, err := queryFunc(ctx, sql) + log.Infof("wrapped query func: %v, err: %v", sql, err) + if err != nil && vp.mustBackoff(err) { + return vp.backoffAndRetry(ctx, sql) + } + return result, err + } + vp.query = wrappedQueryFunc + + return vp +} + +func (vp *vplayer) isRetryable(err error) bool { + if sqlErr, ok := err.(*sqlerror.SQLError); ok { + return sqlErr.Number() == sqlerror.ERDupEntry + } + return false +} + +func (vp *vplayer) mustBackoff(err error) bool { + var sqlErr *sqlerror.SQLError + isSqlErr := errors.As(err, &sqlErr) + if err != nil && isSqlErr && + sqlErr.Number() == sqlerror.ERDupEntry && vp.isMergeWorkflow { + log.Infof("mustBackoff for err: %v", err) + return true + } + return false +} + +func (vp *vplayer) executeWithRetryAndBackoff(ctx context.Context, query string) (*sqltypes.Result, error) { + // We will retry the query if it fails with a duplicate entry error. Since this will be a non-recoverable error + // we should wait for a longer time than we would usually do. The backoff is intended to let the other streams catch up + // especially if global query ordering is important. It is possible there is a replica lag skew between shards because + // of one shard being slower or has more write traffic than the others. + origQuery := query + i := 0 + timeout := 1 * time.Minute + shortCtx, cancel := context.WithDeadline(ctx, time.Now().Add(timeout)) + defer cancel() + attempts := 0 + backoffSeconds := 10 + for { + i++ + query = fmt.Sprintf("%s /* backoff:: %d */", origQuery, i) + qr, err := vp.vr.dbClient.ExecuteWithRetry(ctx, query) + log.Flush() + if err == nil { + vp.vr.dbClient.Commit() + return qr, nil + } + if err := vp.vr.dbClient.Rollback(); err != nil { + return nil, err + } + if !vp.isRetryable(err) { + return nil, err + } + attempts++ + log.Infof("Backing off for %v seconds before retrying query: %v, got err %v", backoffSeconds, query, err) + select { + case <-shortCtx.Done(): + return nil, vterrors.Errorf(vtrpc.Code_DEADLINE_EXCEEDED, "backoff timeout exceeded while retrying query: %v", query) + case <-time.After(time.Duration(backoffSeconds) * time.Second): + } + // Exponential backoff with a maximum of "timeout" seconds. + backoffSeconds = (1 << attempts) >> 1 } } +// backoffAndRetry retries the query after a backoff period. +func (vp *vplayer) backoffAndRetry(ctx context.Context, sql string) (*sqltypes.Result, error) { + vp.ctx = ctx + vp.dontSkipCommits = true + log.Infof("Setting inBackoff to true for query: %v", sql) + vp.inBackoff = true + defer func() { + log.Infof("Setting inBackoff to false after query: %v", sql) + vp.inBackoff = false + }() + //FIXME : set dontSkipCommits to false after some time? + if !vp.batchMode { + if vp.hasSkippedCommit { + log.Infof(">>>>>>>> found skipped Commit, issuing a commit before retrying the query: %v", sql) + if err := vp.commit(); err != nil { + return nil, err + } // vp.hasSkippedCommit is reset in the wrapped commit function vp.commit() + if err := vp.vr.dbClient.Begin(); err != nil { + return nil, err + } + } + return vp.executeWithRetryAndBackoff(ctx, sql) + } + return nil, vp.commit() // is batch mode +} + // play is the entry point for playing binlogs. func (vp *vplayer) play(ctx context.Context) error { if !vp.stopPos.IsZero() && vp.startPos.AtLeast(vp.stopPos) { @@ -497,11 +656,11 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { return ctx.Err() } // Check throttler. - if checkResult, ok := vp.vr.vre.throttlerClient.ThrottleCheckOKOrWaitAppName(ctx, throttlerapp.Name(vp.throttlerAppName)); !ok { - _ = vp.vr.updateTimeThrottled(throttlerapp.VPlayerName, checkResult.Summary()) - estimateLag() - continue - } + //if checkResult, ok := vp.vr.vre.throttlerClient.ThrottleCheckOKOrWaitAppName(ctx, throttlerapp.Name(vp.throttlerAppName)); !ok { + // _ = vp.vr.updateTimeThrottled(throttlerapp.VPlayerName, checkResult.Summary()) + // estimateLag() + // continue + //} items, err := relay.Fetch() if err != nil { @@ -548,6 +707,7 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { // mustSave flag. if !vp.stopPos.IsZero() && vp.pos.AtLeast(vp.stopPos) { mustSave = true + vp.hasSkippedCommit = false break } // In order to group multiple commits into a single one, we look ahead for @@ -555,7 +715,9 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { // applying the next set of events as part of the current transaction. This approach // also handles the case where the last transaction is partial. In that case, // we only group the transactions with commits we've seen so far. - if hasAnotherCommit(items, i, j+1) { + if vp.dontSkipCommits && hasAnotherCommit(items, i, j+1) { + log.Infof(">>>>>>>> skipping commit") + vp.hasSkippedCommit = true continue } } diff --git a/test/config.json b/test/config.json index 185201cf3e0..a96229f0b35 100644 --- a/test/config.json +++ b/test/config.json @@ -1076,6 +1076,15 @@ "RetryMax": 0, "Tags": [] }, + "vreplication_workflow_dup": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestWorkflowDuplicateKeyBackoff"], + "Command": [], + "Manual": false, + "Shard": "vreplication_cellalias", + "RetryMax": 0, + "Tags": [] + }, "vreplication_multi_tenant": { "File": "unused.go", "Args": ["vitess.io/vitess/go/test/endtoend/vreplication","-run", "MultiTenant"],