Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] VReplication Merge Workflows: add heuristic to handle valid duplicate key errors caused by unique key columns. #17074

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions go/test/endtoend/vreplication/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ func (vc *VitessCluster) CleanupDataroot(t *testing.T, recreate bool) {
// https://docs.github.com/en/actions/learn-github-actions/variables#default-environment-variables
ci, ok := os.LookupEnv("CI")
if !ok || strings.ToLower(ci) != "true" {
fmt.Println("Not running in CI, skipping cleanup")
// Leave the directory in place to support local debugging.
return
}
Expand Down
12 changes: 11 additions & 1 deletion go/test/endtoend/vreplication/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ create table `+"`blüb_tbl`"+` (id int, val1 varchar(20), `+"`blöb1`"+` blob,
create table reftable (id int, val1 varchar(20), primary key(id), key(val1));
create table loadtest (id int, name varchar(256), primary key(id), key(name));
create table nopk (name varchar(128), age int unsigned);
create table admins(team_id int, email varchar(128), val varchar(256), primary key(team_id), unique key(email));
`, strings.Join(customerTypes, ","))
// These should always be ignored in vreplication
internalSchema = `
Expand All @@ -85,6 +86,7 @@ create table nopk (name varchar(128), age int unsigned);
"tables": {
"product": {},
"merchant": {},
"admins": {},
"orders": {},
"loadtest": {},
"customer": {},
Expand Down Expand Up @@ -158,8 +160,16 @@ create table nopk (name varchar(128), age int unsigned);
}
]
},
"enterprise_customer": {
"admins": {
"column_vindexes": [
{
"column": "team_id",
"name": "reverse_bits"
}
]
},
"enterprise_customer": {
"column_vindexes": [
{
"column": "cid",
"name": "xxhash"
Expand Down
21 changes: 21 additions & 0 deletions go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,27 @@ func waitForRowCountInTablet(t *testing.T, vttablet *cluster.VttabletProcess, da
}
}

// Wait for the data fetched by the query from the specified tablet and database to match the expected result.
func waitForResult(t *testing.T, vttablet *cluster.VttabletProcess, database string, query string, want string, timeout time.Duration) {
timer := time.NewTimer(timeout)
defer timer.Stop()
for {
qr, err := vttablet.QueryTablet(query, database, true)
require.NoError(t, err)
require.NotNil(t, qr)
if want == fmt.Sprintf("%v", qr.Rows) {
return
}
select {
case <-timer.C:
require.FailNow(t, fmt.Sprintf("query %q did not reach the expected result (%s) on tablet %q before the timeout of %s; last seen result: %s",
query, want, vttablet.Name, timeout, qr.Rows))
default:
time.Sleep(defaultTick)
}
}
}

// waitForSequenceValue queries the provided sequence name in the
// provided database using the provided vtgate connection until
// we get a next value from it. This allows us to move forward
Expand Down
3 changes: 3 additions & 0 deletions go/test/endtoend/vreplication/unsharded_init_data.sql
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,6 @@ insert into reftable (id, val1) values (2, 'b')
insert into reftable (id, val1) values (3, 'c')
insert into reftable (id, val1) values (4, 'd')
insert into reftable (id, val1) values (5, 'e')

insert into admins(team_id, email, val) values(1, 'a@example.com', 'ibis-1')
insert into admins(team_id, email, val) values(2, 'b@example.com', 'ibis-2')
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package vreplication

import (
"testing"
"time"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/test/endtoend/throttler"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
vttablet "vitess.io/vitess/go/vt/vttablet/common"
)

func TestWorkflowDuplicateKeyBackoff(t *testing.T) {
t.Run("TestWorkflowDuplicateKeyBackoff with batching off", func(t *testing.T) {
testWorkflowDuplicateKeyBackoff(t, false)
})
t.Run("TestWorkflowDuplicateKeyBackoff with batching on", func(t *testing.T) {
testWorkflowDuplicateKeyBackoff(t, true)
})
}

func testWorkflowDuplicateKeyBackoff(t *testing.T, setExperimentalFlags bool) {
debugMode = false
setSidecarDBName("_vt")
origDefaultRdonly := defaultRdonly
origDefailtReplica := defaultReplicas
defer func() {
defaultRdonly = origDefaultRdonly
defaultReplicas = origDefailtReplica
}()
defaultRdonly = 0
defaultReplicas = 0
if setExperimentalFlags {
setAllVTTabletExperimentalFlags()
}

setupMinimalCluster(t)
vttablet.InitVReplicationConfigDefaults()
defer vc.TearDown()

sourceKeyspaceName := "product"
targetKeyspaceName := "customer"
workflowName := "wf1"
targetTabs := setupMinimalCustomerKeyspace(t)
_ = targetTabs
tables := "customer,admins"

req := &vtctldatapb.UpdateThrottlerConfigRequest{
Enable: false,
}
res, err := throttler.UpdateThrottlerTopoConfigRaw(vc.VtctldClient, "customer", req, nil, nil)
require.NoError(t, err, res)
res, err = throttler.UpdateThrottlerTopoConfigRaw(vc.VtctldClient, "product", req, nil, nil)
require.NoError(t, err, res)

mt := createMoveTables(t, sourceKeyspaceName, targetKeyspaceName, workflowName, tables, nil, nil, nil)
waitForWorkflowState(t, vc, "customer.wf1", binlogdatapb.VReplicationWorkflowState_Running.String())
mt.SwitchReadsAndWrites()
vtgateConn, cancel := getVTGateConn()
defer cancel()

// team_id 1 => 80-, team_id 2 => -80
queries := []string{
"update admins set email = null, val = 'ibis-3' where team_id = 2", // -80
"update admins set email = 'b@example.com', val = 'ibis-4' where team_id = 1", // 80-
"update admins set email = 'a@example.com', val = 'ibis-5' where team_id = 2", // -80
}

vc.VtctlClient.ExecuteCommandWithOutput("VReplicationExec", "zone1-100", "update _vt.vreplication set state = 'Stopped' where id = 1") //-80
for _, query := range queries {
execVtgateQuery(t, vtgateConn, targetKeyspaceName, query)
}
// Since -80 is stopped the "update admins set email = 'b@example.com' where team_id = 1" will fail with duplicate key
// since it is already set for team_id = 2
// The vplayer stream for -80 should backoff with the new logic and retry should be successful once the -80 stream is restarted
time.Sleep(2 * time.Second) // fixme: add check that the table has the expected data after the inserts
vc.VtctlClient.ExecuteCommandWithOutput("VReplicationExec", "zone1-100", "update _vt.vreplication set state = 'Running' where id = 1")
//time.Sleep(5 * time.Second)
productTab := vc.Cells["zone1"].Keyspaces[sourceKeyspaceName].Shards["0"].Tablets["zone1-100"].Vttablet
waitForResult(t, productTab, "product", "select * from admins order by team_id",
"[[INT32(1) VARCHAR(\"b@example.com\") VARCHAR(\"ibis-4\")] [INT32(2) VARCHAR(\"a@example.com\") VARCHAR(\"ibis-5\")]]", 30*time.Second)
log.Infof("TestWorkflowDuplicateKeyBackoff passed")
}
31 changes: 29 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/vdbclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package vreplication

import (
"context"
"errors"
"io"
"strings"
"time"
Expand Down Expand Up @@ -102,6 +103,10 @@ func (vc *vdbClient) CommitTrxQueryBatch() error {
return nil
}

func (vc *vdbClient) GetQueries() []string {
return vc.queries
}

func (vc *vdbClient) Rollback() error {
if !vc.InTransaction {
return nil
Expand Down Expand Up @@ -146,6 +151,17 @@ func (vc *vdbClient) AddQueryToTrxBatch(query string) error {
return nil
}

func (vc *vdbClient) PopLastQueryFromBatch() error {
if !vc.InTransaction {
return vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "cannot pop query outside of a transaction")
}
if vc.batchSize > 0 {
vc.batchSize -= 1
vc.queries = vc.queries[:len(vc.queries)-1]
}
return nil
}

// ExecuteQueryBatch sends the transaction's current batch of queries
// down the wire to the database.
func (vc *vdbClient) ExecuteTrxQueryBatch() ([]*sqltypes.Result, error) {
Expand All @@ -168,18 +184,29 @@ func (vc *vdbClient) Execute(query string) (*sqltypes.Result, error) {
return vc.ExecuteFetch(query, vc.relayLogMaxItems)
}

func (vc *vdbClient) IsRetryable(err error) bool {
if sqlErr, ok := err.(*sqlerror.SQLError); ok {
return sqlErr.Number() == sqlerror.ERDupEntry
}
return false
}

func (vc *vdbClient) ExecuteWithRetry(ctx context.Context, query string) (*sqltypes.Result, error) {
ctx2, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
qr, err := vc.Execute(query)
for err != nil {
if sqlErr, ok := err.(*sqlerror.SQLError); ok && sqlErr.Number() == sqlerror.ERLockDeadlock || sqlErr.Number() == sqlerror.ERLockWaitTimeout {
var sqlErr *sqlerror.SQLError
if errors.As(err, &sqlErr) &&
sqlErr.Number() == sqlerror.ERLockDeadlock || sqlErr.Number() == sqlerror.ERLockWaitTimeout {
log.Infof("retryable error: %v, waiting for %v and retrying", sqlErr, dbLockRetryDelay)
if err := vc.Rollback(); err != nil {
return nil, err
}
time.Sleep(dbLockRetryDelay)
// Check context here. Otherwise this can become an infinite loop.
select {
case <-ctx.Done():
case <-ctx2.Done():
return nil, io.EOF
default:
}
Expand Down
Loading
Loading