Skip to content

Commit

Permalink
retrieve message from redo log state
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <harshit@planetscale.com>
  • Loading branch information
harshit-gangal committed Sep 20, 2024
1 parent cfde812 commit f15a1b8
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 3 deletions.
2 changes: 2 additions & 0 deletions go/vt/vttablet/tabletserver/dt_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"vitess.io/vitess/go/event/syslogger"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/vt/vtenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/rules"
"vitess.io/vitess/go/vt/vttablet/tabletserver/schema"
Expand Down Expand Up @@ -683,6 +684,7 @@ func newTestTxExecutor(t *testing.T, ctx context.Context) (txe *DTExecutor, tsv
db.AddQuery("delete from _vt.redo_state where dtid = 'aa'", &sqltypes.Result{})
db.AddQuery("delete from _vt.redo_statement where dtid = 'aa'", &sqltypes.Result{})
db.AddQuery("update test_table set `name` = 2 where pk = 1 limit 10001", &sqltypes.Result{})
db.AddRejectedQuery("bogus", sqlerror.NewSQLError(sqlerror.ERUnknownError, sqlerror.SSUnknownSQLState, "bogus query"))
return &DTExecutor{
ctx: ctx,
logStats: logStats,
Expand Down
6 changes: 6 additions & 0 deletions go/vt/vttablet/tabletserver/tabletserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,12 +231,14 @@ func TestTabletServerRedoLogIsKeptBetweenRestarts(t *testing.T) {
{Type: sqltypes.Uint64},
{Type: sqltypes.Uint64},
{Type: sqltypes.VarBinary},
{Type: sqltypes.Text},
},
Rows: [][]sqltypes.Value{{
sqltypes.NewVarBinary("dtid0"),
sqltypes.NewInt64(RedoStatePrepared),
sqltypes.NewVarBinary(""),
sqltypes.NewVarBinary("update test_table set `name` = 2 where pk = 1 limit 10001"),
sqltypes.NULL,
}},
})
turnOnTxEngine()
Expand All @@ -257,22 +259,26 @@ func TestTabletServerRedoLogIsKeptBetweenRestarts(t *testing.T) {
{Type: sqltypes.Uint64},
{Type: sqltypes.Uint64},
{Type: sqltypes.VarBinary},
{Type: sqltypes.Text},
},
Rows: [][]sqltypes.Value{{
sqltypes.NewVarBinary("bogus"),
sqltypes.NewInt64(RedoStatePrepared),
sqltypes.NewVarBinary(""),
sqltypes.NewVarBinary("bogus"),
sqltypes.NULL,
}, {
sqltypes.NewVarBinary("a:b:10"),
sqltypes.NewInt64(RedoStatePrepared),
sqltypes.NewVarBinary(""),
sqltypes.NewVarBinary("update test_table set `name` = 2 where pk = 1 limit 10001"),
sqltypes.NULL,
}, {
sqltypes.NewVarBinary("a:b:20"),
sqltypes.NewInt64(RedoStateFailed),
sqltypes.NewVarBinary(""),
sqltypes.NewVarBinary("unused"),
sqltypes.TestValue(sqltypes.Text, "deadlock detected, transaction rolled back"),
}},
})
turnOnTxEngine()
Expand Down
7 changes: 4 additions & 3 deletions go/vt/vttablet/tabletserver/twopc.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ const (
// DTStateRollback represents the ROLLBACK state for dt_state.
DTStateRollback = querypb.TransactionState_ROLLBACK

readAllRedo = `select t.dtid, t.state, t.time_created, s.statement
readAllRedo = `select t.dtid, t.state, t.time_created, s.statement, t.message
from %s.redo_state t
join %s.redo_statement s on t.dtid = s.dtid
order by t.dtid, s.id`
Expand Down Expand Up @@ -245,8 +245,9 @@ func (tpc *TwoPC) ReadAllRedo(ctx context.Context) (prepared, failed []*tx.Prepa
// which is harmless.
tm, _ := row[2].ToCastInt64()
curTx = &tx.PreparedTx{
Dtid: dtid,
Time: time.Unix(0, tm),
Dtid: dtid,
Time: time.Unix(0, tm),
Message: row[4].ToString(),
}
st, err := row[1].ToCastInt64()
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions go/vt/vttablet/tabletserver/tx/twopc.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ type PreparedTx struct {
Dtid string
Queries []string
Time time.Time
Message string
}
4 changes: 4 additions & 0 deletions go/vt/vttablet/tabletserver/tx_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,10 @@ func TestCheckReceivedError(t *testing.T) {
t.Run(tc.receivedErr.Error(), func(t *testing.T) {
nonRetryable := te.checkErrorAndMarkFailed(context.Background(), "aa", tc.receivedErr)
require.Equal(t, tc.nonRetryable, nonRetryable)
if tc.nonRetryable {
require.Equal(t, errPrepFailed, te.preparedPool.reserved["aa"])
}
delete(te.preparedPool.reserved, "aa")
})
}
}

0 comments on commit f15a1b8

Please sign in to comment.