diff --git a/go/vt/vttablet/tabletserver/dt_executor_test.go b/go/vt/vttablet/tabletserver/dt_executor_test.go index a3486c4370e..9aec50ff59f 100644 --- a/go/vt/vttablet/tabletserver/dt_executor_test.go +++ b/go/vt/vttablet/tabletserver/dt_executor_test.go @@ -26,6 +26,7 @@ import ( "time" "vitess.io/vitess/go/event/syslogger" + "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/vt/vtenv" "vitess.io/vitess/go/vt/vttablet/tabletserver/rules" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" @@ -683,6 +684,7 @@ func newTestTxExecutor(t *testing.T, ctx context.Context) (txe *DTExecutor, tsv db.AddQuery("delete from _vt.redo_state where dtid = 'aa'", &sqltypes.Result{}) db.AddQuery("delete from _vt.redo_statement where dtid = 'aa'", &sqltypes.Result{}) db.AddQuery("update test_table set `name` = 2 where pk = 1 limit 10001", &sqltypes.Result{}) + db.AddRejectedQuery("bogus", sqlerror.NewSQLError(sqlerror.ERUnknownError, sqlerror.SSUnknownSQLState, "bogus query")) return &DTExecutor{ ctx: ctx, logStats: logStats, diff --git a/go/vt/vttablet/tabletserver/tabletserver_test.go b/go/vt/vttablet/tabletserver/tabletserver_test.go index 443ab9cbe09..f4119aabd20 100644 --- a/go/vt/vttablet/tabletserver/tabletserver_test.go +++ b/go/vt/vttablet/tabletserver/tabletserver_test.go @@ -231,12 +231,14 @@ func TestTabletServerRedoLogIsKeptBetweenRestarts(t *testing.T) { {Type: sqltypes.Uint64}, {Type: sqltypes.Uint64}, {Type: sqltypes.VarBinary}, + {Type: sqltypes.Text}, }, Rows: [][]sqltypes.Value{{ sqltypes.NewVarBinary("dtid0"), sqltypes.NewInt64(RedoStatePrepared), sqltypes.NewVarBinary(""), sqltypes.NewVarBinary("update test_table set `name` = 2 where pk = 1 limit 10001"), + sqltypes.NULL, }}, }) turnOnTxEngine() @@ -257,22 +259,26 @@ func TestTabletServerRedoLogIsKeptBetweenRestarts(t *testing.T) { {Type: sqltypes.Uint64}, {Type: sqltypes.Uint64}, {Type: sqltypes.VarBinary}, + {Type: sqltypes.Text}, }, Rows: [][]sqltypes.Value{{ sqltypes.NewVarBinary("bogus"), sqltypes.NewInt64(RedoStatePrepared), sqltypes.NewVarBinary(""), sqltypes.NewVarBinary("bogus"), + sqltypes.NULL, }, { sqltypes.NewVarBinary("a:b:10"), sqltypes.NewInt64(RedoStatePrepared), sqltypes.NewVarBinary(""), sqltypes.NewVarBinary("update test_table set `name` = 2 where pk = 1 limit 10001"), + sqltypes.NULL, }, { sqltypes.NewVarBinary("a:b:20"), sqltypes.NewInt64(RedoStateFailed), sqltypes.NewVarBinary(""), sqltypes.NewVarBinary("unused"), + sqltypes.TestValue(sqltypes.Text, "deadlock detected, transaction rolled back"), }}, }) turnOnTxEngine() diff --git a/go/vt/vttablet/tabletserver/twopc.go b/go/vt/vttablet/tabletserver/twopc.go index 00d4cf388e1..bc3cdbf9d68 100644 --- a/go/vt/vttablet/tabletserver/twopc.go +++ b/go/vt/vttablet/tabletserver/twopc.go @@ -47,7 +47,7 @@ const ( // DTStateRollback represents the ROLLBACK state for dt_state. DTStateRollback = querypb.TransactionState_ROLLBACK - readAllRedo = `select t.dtid, t.state, t.time_created, s.statement + readAllRedo = `select t.dtid, t.state, t.time_created, s.statement, t.message from %s.redo_state t join %s.redo_statement s on t.dtid = s.dtid order by t.dtid, s.id` @@ -245,8 +245,9 @@ func (tpc *TwoPC) ReadAllRedo(ctx context.Context) (prepared, failed []*tx.Prepa // which is harmless. tm, _ := row[2].ToCastInt64() curTx = &tx.PreparedTx{ - Dtid: dtid, - Time: time.Unix(0, tm), + Dtid: dtid, + Time: time.Unix(0, tm), + Message: row[4].ToString(), } st, err := row[1].ToCastInt64() if err != nil { diff --git a/go/vt/vttablet/tabletserver/tx/twopc.go b/go/vt/vttablet/tabletserver/tx/twopc.go index 56cfbd1a51f..6412fc53b4d 100644 --- a/go/vt/vttablet/tabletserver/tx/twopc.go +++ b/go/vt/vttablet/tabletserver/tx/twopc.go @@ -36,4 +36,5 @@ type PreparedTx struct { Dtid string Queries []string Time time.Time + Message string } diff --git a/go/vt/vttablet/tabletserver/tx_engine_test.go b/go/vt/vttablet/tabletserver/tx_engine_test.go index be840c2a683..a46488b92cf 100644 --- a/go/vt/vttablet/tabletserver/tx_engine_test.go +++ b/go/vt/vttablet/tabletserver/tx_engine_test.go @@ -658,6 +658,10 @@ func TestCheckReceivedError(t *testing.T) { t.Run(tc.receivedErr.Error(), func(t *testing.T) { nonRetryable := te.checkErrorAndMarkFailed(context.Background(), "aa", tc.receivedErr) require.Equal(t, tc.nonRetryable, nonRetryable) + if tc.nonRetryable { + require.Equal(t, errPrepFailed, te.preparedPool.reserved["aa"]) + } + delete(te.preparedPool.reserved, "aa") }) } }