Skip to content

Commit

Permalink
check for retryable error, if not update redo log state to fails and …
Browse files Browse the repository at this point in the history
…store the error message

Signed-off-by: Harshit Gangal <harshit@planetscale.com>
  • Loading branch information
harshit-gangal committed Sep 18, 2024
1 parent 8816a2d commit 64ab0ac
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 62 deletions.
1 change: 1 addition & 0 deletions go/vt/sidecardb/schema/twopc/redo_state.sql
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ CREATE TABLE IF NOT EXISTS redo_state(
dtid varbinary(512) NOT NULL,
state bigint NOT NULL,
time_created bigint NOT NULL,
message text,
primary key(dtid)
) ENGINE = InnoDB CHARSET = utf8mb4
29 changes: 1 addition & 28 deletions go/vt/vttablet/tabletserver/dt_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ func (dte *DTExecutor) CommitPrepared(dtid string) (err error) {
ctx := trace.CopySpan(context.Background(), dte.ctx)
defer func() {
if err != nil {
dte.markFailed(ctx, dtid)
log.Warningf("failed to commit the prepared transaction '%s' with error: %v", dtid, err)
dte.te.checkErrorAndMarkFailed(ctx, dtid, err)
}
dte.te.txPool.RollbackAndRelease(ctx, conn)
}()
Expand All @@ -172,33 +172,6 @@ func (dte *DTExecutor) CommitPrepared(dtid string) (err error) {
return nil
}

// markFailed does the necessary work to mark a CommitPrepared
// as failed. It marks the dtid as failed in the prepared pool,
// increments the InternalErros counter, and also changes the
// state of the transaction in the redo log as failed. If the
// state change does not succeed, it just logs the event.
// The function uses the passed in context that has no timeout
// instead of DTExecutor's context.
func (dte *DTExecutor) markFailed(ctx context.Context, dtid string) {
dte.te.env.Stats().InternalErrors.Add("TwopcCommit", 1)
dte.te.preparedPool.SetFailed(dtid)
conn, _, _, err := dte.te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil)
if err != nil {
log.Errorf("markFailed: Begin failed for dtid %s: %v", dtid, err)
return
}
defer dte.te.txPool.RollbackAndRelease(ctx, conn)

if err = dte.te.twoPC.UpdateRedo(ctx, conn, dtid, RedoStateFailed); err != nil {
log.Errorf("markFailed: UpdateRedo failed for dtid %s: %v", dtid, err)
return
}

if _, err = dte.te.txPool.Commit(ctx, conn); err != nil {
log.Errorf("markFailed: Commit failed for dtid %s: %v", dtid, err)
}
}

// RollbackPrepared rolls back a prepared transaction. This function handles
// the case of an incomplete prepare.
//
Expand Down
11 changes: 6 additions & 5 deletions go/vt/vttablet/tabletserver/twopc.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ func (tpc *TwoPC) initializeQueries() {
"insert into %s.redo_statement(dtid, id, statement) values %a",
dbname, ":vals")
tpc.updateRedoTx = sqlparser.BuildParsedQuery(
"update %s.redo_state set state = %a where dtid = %a",
dbname, ":state", ":dtid")
"update %s.redo_state set state = %a, message = %a where dtid = %a",
dbname, ":state", ":message", ":dtid")
tpc.deleteRedoTx = sqlparser.BuildParsedQuery(
"delete from %s.redo_state where dtid = %a",
dbname, ":dtid")
Expand Down Expand Up @@ -200,10 +200,11 @@ func (tpc *TwoPC) SaveRedo(ctx context.Context, conn *StatefulConnection, dtid s
}

// UpdateRedo changes the state of the redo log for the dtid.
func (tpc *TwoPC) UpdateRedo(ctx context.Context, conn *StatefulConnection, dtid string, state int) error {
func (tpc *TwoPC) UpdateRedo(ctx context.Context, conn *StatefulConnection, dtid string, state int, message string) error {
bindVars := map[string]*querypb.BindVariable{
"dtid": sqltypes.StringBindVariable(dtid),
"state": sqltypes.Int64BindVariable(int64(state)),
"dtid": sqltypes.StringBindVariable(dtid),
"state": sqltypes.Int64BindVariable(int64(state)),
"message": sqltypes.StringBindVariable(message),
}
_, err := tpc.exec(ctx, conn, tpc.updateRedoTx, bindVars)
return err
Expand Down
125 changes: 96 additions & 29 deletions go/vt/vttablet/tabletserver/tx_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import (
"sync"
"time"

"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/pools/smartconnpool"
"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/dtids"
"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
Expand Down Expand Up @@ -411,58 +411,125 @@ func (te *TxEngine) shutdownLocked() {
// to ensure there are no future collisions.
func (te *TxEngine) prepareFromRedo() error {
ctx := tabletenv.LocalContext()
var allErr concurrency.AllErrorRecorder
prepared, failed, err := te.twoPC.ReadAllRedo(ctx)
if err != nil {
return err

prepared, failed, readErr := te.twoPC.ReadAllRedo(ctx)
if readErr != nil {
return readErr
}

maxid := int64(0)
var (
maxID = int64(0)
preparedCounter = 0
failedCounter = len(failed)
lastDtid string
lastErr error
allErrs []error
)

outer:
for _, preparedTx := range prepared {
txid, err := dtids.TransactionID(preparedTx.Dtid)
if err != nil {
log.Errorf("Error extracting transaction ID from dtid: %v", err)
var conn *StatefulConnection

txID, _ := dtids.TransactionID(preparedTx.Dtid)
if txID > maxID {
maxID = txID
}
if txid > maxid {
maxid = txid

// check last error to record failure.
if lastErr != nil {
allErrs = append(allErrs, vterrors.Wrapf(lastErr, "dtid - %v", lastDtid))
if te.checkErrorAndMarkFailed(ctx, lastDtid, lastErr) {
failedCounter++
}
}

lastDtid = preparedTx.Dtid

// We need to redo the prepared transactions using a dba user because MySQL might still be in read only mode.
conn, err := te.beginNewDbaConnection(ctx)
if err != nil {
allErr.RecordError(vterrors.Wrapf(err, "dtid - %v", preparedTx.Dtid))
conn, lastErr = te.beginNewDbaConnection(ctx)
if lastErr != nil {
continue
}
for _, stmt := range preparedTx.Queries {
conn.TxProperties().RecordQuery(stmt, te.env.Environment().Parser())
_, err := conn.Exec(ctx, stmt, 1, false)
if err != nil {
allErr.RecordError(vterrors.Wrapf(err, "dtid - %v", preparedTx.Dtid))
if _, lastErr = conn.Exec(ctx, stmt, 1, false); lastErr != nil {
te.txPool.RollbackAndRelease(ctx, conn)
continue outer
}
}
// We should not use the external Prepare because
// we don't want to write again to the redo log.
err = te.preparedPool.Put(conn, preparedTx.Dtid)
if err != nil {
allErr.RecordError(vterrors.Wrapf(err, "dtid - %v", preparedTx.Dtid))
if lastErr = te.preparedPool.Put(conn, preparedTx.Dtid); lastErr != nil {
continue
}
preparedCounter++
}
for _, preparedTx := range failed {
txid, err := dtids.TransactionID(preparedTx.Dtid)
if err != nil {
log.Errorf("Error extracting transaction ID from dtid: %v", err)

// check last error to record failure.
if lastErr != nil {
allErrs = append(allErrs, vterrors.Wrapf(lastErr, "dtid - %v", lastDtid))
if te.checkErrorAndMarkFailed(ctx, lastDtid, lastErr) {
failedCounter++
}
if txid > maxid {
maxid = txid
}

for _, preparedTx := range failed {
txID, _ := dtids.TransactionID(preparedTx.Dtid)
if txID > maxID {
maxID = txID
}
te.preparedPool.SetFailed(preparedTx.Dtid)
}
te.txPool.AdjustLastID(maxid)
log.Infof("TwoPC: Prepared %d transactions, and registered %d failures.", len(prepared), len(failed))
return allErr.Error()
te.txPool.AdjustLastID(maxID)
log.Infof("TwoPC: Prepared %d transactions, and registered %d failures.", preparedCounter, failedCounter)
return vterrors.Aggregate(allErrs)
}

// checkErrorAndMarkFailed check that the error is retryable or non-retryable error.
// If it is a non-retryable error than it marks the dtid as failed in the prepared pool,
// increments the InternalErrors counter, and also changes the state of the transaction in the redo log as failed.
func (te *TxEngine) checkErrorAndMarkFailed(ctx context.Context, dtid string, receivedErr error) (fail bool) {
if isRetryableError(receivedErr) {
log.Infof("retryable error for dtid: %s", dtid)
return
}

fail = true
te.env.Stats().InternalErrors.Add("TwopcCommit", 1)
te.preparedPool.SetFailed(dtid)
conn, _, _, err := te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil)
if err != nil {
log.Errorf("markFailed: Begin failed for dtid %s: %v", dtid, err)
return
}
defer te.txPool.RollbackAndRelease(ctx, conn)

if err = te.twoPC.UpdateRedo(ctx, conn, dtid, RedoStateFailed, receivedErr.Error()); err != nil {
log.Errorf("markFailed: UpdateRedo failed for dtid %s: %v", dtid, err)
return
}

if _, err = te.txPool.Commit(ctx, conn); err != nil {
log.Errorf("markFailed: Commit failed for dtid %s: %v", dtid, err)
}
return
}

func isRetryableError(err error) bool {
switch vterrors.Code(err) {
case vtrpcpb.Code_OK,
vtrpcpb.Code_DEADLINE_EXCEEDED,
vtrpcpb.Code_CANCELED,
vtrpcpb.Code_UNAVAILABLE:
return true
case vtrpcpb.Code_UNKNOWN:
// If the error is unknown, convert to SQL Error.
sqlErr := sqlerror.NewSQLErrorFromError(err)
// Connection errors are retryable
return sqlerror.IsConnErr(sqlErr)
default:
return false
}
}

// shutdownTransactions rolls back all open transactions that are idol.
Expand Down

0 comments on commit 64ab0ac

Please sign in to comment.