Skip to content

Commit

Permalink
Merge branch 'develop' into task/KS-297/db-syncer
Browse files Browse the repository at this point in the history
  • Loading branch information
bolekk authored Aug 15, 2024
2 parents ef5f3c7 + 5e99bdb commit dca3aec
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 10 deletions.
5 changes: 5 additions & 0 deletions .changeset/yellow-cougars-act.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

Added client error classification for terminally stuck transactions in the TXM #internal
14 changes: 7 additions & 7 deletions common/txmgr/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,16 +493,16 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand
errType, err = eb.validateOnChainSequence(ctx, lgr, errType, err, etx, retryCount)
}

if errType != client.Fatal {
etx.InitialBroadcastAt = &initialBroadcastAt
etx.BroadcastAt = &initialBroadcastAt
}

switch errType {
case client.Fatal:
if errType == client.Fatal || errType == client.TerminallyStuck {
eb.SvcErrBuffer.Append(err)
etx.Error = null.StringFrom(err.Error())
return eb.saveFatallyErroredTransaction(lgr, &etx), true
}

etx.InitialBroadcastAt = &initialBroadcastAt
etx.BroadcastAt = &initialBroadcastAt

switch errType {
case client.TransactionAlreadyKnown:
fallthrough
case client.Successful:
Expand Down
17 changes: 16 additions & 1 deletion common/txmgr/confirmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Pro
go func(tx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) {
defer wg.Done()
lggr := tx.GetLogger(ec.lggr)
// Create an purge attempt for tx
// Create a purge attempt for tx
purgeAttempt, err := ec.TxAttemptBuilder.NewPurgeTxAttempt(ctx, tx, lggr)
if err != nil {
errMu.Lock()
Expand Down Expand Up @@ -999,6 +999,21 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) han
ec.SvcErrBuffer.Append(sendError)
// This will loop continuously on every new head so it must be handled manually by the node operator!
return ec.txStore.DeleteInProgressAttempt(ctx, attempt)
case client.TerminallyStuck:
// A transaction could broadcast successfully but then be considered terminally stuck on another attempt
// Even though the transaction can succeed under different circumstances, we want to purge this transaction as soon as we get this error
lggr.Warnw("terminally stuck transaction detected", "err", sendError.Error())
ec.SvcErrBuffer.Append(sendError)
// Create a purge attempt for tx
purgeAttempt, err := ec.TxAttemptBuilder.NewPurgeTxAttempt(ctx, etx, lggr)
if err != nil {
return fmt.Errorf("NewPurgeTxAttempt failed: %w", err)
}
// Replace the in progress attempt with the purge attempt
if err := ec.txStore.SaveReplacementInProgressAttempt(ctx, attempt, &purgeAttempt); err != nil {
return fmt.Errorf("saveReplacementInProgressAttempt failed: %w", err)
}
return ec.handleInProgressAttempt(ctx, lggr, etx, purgeAttempt, blockHeight)
case client.TransactionAlreadyKnown:
// Sequence too low indicated that a transaction at this sequence was confirmed already.
// Mark confirmed_missing_receipt and wait for the next cycle to try to get a receipt
Expand Down
5 changes: 5 additions & 0 deletions core/chains/evm/client/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,11 @@ func ClassifySendError(err error, clientErrors config.ClientErrors, lggr logger.
)
return commonclient.ExceedsMaxFee
}
if sendError.IsTerminallyStuckConfigError(configErrors) {
lggr.Warnw("Transaction that would have been terminally stuck in the mempool detected on send. Marking as fatal error.", "err", sendError, "etx", tx)
// Attempt is thrown away in this case; we don't need it since it never got accepted by a node
return commonclient.TerminallyStuck
}
lggr.Criticalw("Unknown error encountered when sending transaction", "err", err, "etx", tx)
return commonclient.Unknown
}
22 changes: 20 additions & 2 deletions core/chains/evm/client/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func Test_Eth_Errors(t *testing.T) {
})

t.Run("Metis gas price errors", func(t *testing.T) {
err := evmclient.NewSendErrorS("primary websocket (wss://ws-mainnet.metis.io) call failed: gas price too low: 18000000000 wei, use at least tx.gasPrice = 19500000000 wei")
err = evmclient.NewSendErrorS("primary websocket (wss://ws-mainnet.metis.io) call failed: gas price too low: 18000000000 wei, use at least tx.gasPrice = 19500000000 wei")
assert.True(t, err.L2FeeTooLow(clientErrors))
err = newSendErrorWrapped("primary websocket (wss://ws-mainnet.metis.io) call failed: gas price too low: 18000000000 wei, use at least tx.gasPrice = 19500000000 wei")
assert.True(t, err.L2FeeTooLow(clientErrors))
Expand All @@ -302,7 +302,7 @@ func Test_Eth_Errors(t *testing.T) {
})

t.Run("moonriver errors", func(t *testing.T) {
err := evmclient.NewSendErrorS("primary http (http://***REDACTED***:9933) call failed: submit transaction to pool failed: Pool(Stale)")
err = evmclient.NewSendErrorS("primary http (http://***REDACTED***:9933) call failed: submit transaction to pool failed: Pool(Stale)")
assert.True(t, err.IsNonceTooLowError(clientErrors))
assert.False(t, err.IsTransactionAlreadyInMempool(clientErrors))
assert.False(t, err.Fatal(clientErrors))
Expand All @@ -311,6 +311,24 @@ func Test_Eth_Errors(t *testing.T) {
assert.False(t, err.IsNonceTooLowError(clientErrors))
assert.False(t, err.Fatal(clientErrors))
})

t.Run("IsTerminallyStuck", func(t *testing.T) {
tests := []errorCase{
{"failed to add tx to the pool: not enough step counters to continue the execution", true, "zkEVM"},
{"failed to add tx to the pool: not enough step counters to continue the execution", true, "Xlayer"},
{"failed to add tx to the pool: not enough keccak counters to continue the execution", true, "zkEVM"},
{"failed to add tx to the pool: not enough keccak counters to continue the execution", true, "Xlayer"},
}

for _, test := range tests {
t.Run(test.network, func(t *testing.T) {
err = evmclient.NewSendErrorS(test.message)
assert.Equal(t, err.IsTerminallyStuckConfigError(clientErrors), test.expect)
err = newSendErrorWrapped(test.message)
assert.Equal(t, err.IsTerminallyStuckConfigError(clientErrors), test.expect)
})
}
})
}

func Test_Eth_Errors_Fatal(t *testing.T) {
Expand Down
19 changes: 19 additions & 0 deletions core/chains/evm/txmgr/broadcaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,25 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Success(t *testing.T) {
assert.True(t, ethTx.Error.Valid)
assert.Equal(t, "transaction reverted during simulation: json-rpc error { Code = 42, Message = 'oh no, it reverted', Data = 'KqYi' }", ethTx.Error.String)
})

t.Run("terminally stuck transaction is marked as fatal", func(t *testing.T) {
terminallyStuckError := "failed to add tx to the pool: not enough step counters to continue the execution"
etx := mustCreateUnstartedTx(t, txStore, fromAddress, toAddress, []byte{42, 42, 0}, gasLimit, big.Int(assets.NewEthValue(243)), testutils.FixtureChainID)
ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *gethTypes.Transaction) bool {
return tx.Nonce() == uint64(346) && tx.Value().Cmp(big.NewInt(243)) == 0
}), fromAddress).Return(commonclient.Fatal, errors.New(terminallyStuckError)).Once()

// Start processing unstarted transactions
retryable, err := eb.ProcessUnstartedTxs(tests.Context(t), fromAddress)
assert.NoError(t, err)
assert.False(t, retryable)

dbTx, err := txStore.FindTxWithAttempts(ctx, etx.ID)
require.NoError(t, err)
assert.Equal(t, txmgrcommon.TxFatalError, dbTx.State)
assert.True(t, dbTx.Error.Valid)
assert.Equal(t, terminallyStuckError, dbTx.Error.String)
})
})
}

Expand Down
52 changes: 52 additions & 0 deletions core/chains/evm/txmgr/confirmer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2673,6 +2673,58 @@ func TestEthConfirmer_RebroadcastWhereNecessary_WhenOutOfEth(t *testing.T) {
})
}

func TestEthConfirmer_RebroadcastWhereNecessary_TerminallyStuckError(t *testing.T) {
t.Parallel()

db := pgtest.NewSqlxDB(t)
cfg := configtest.NewGeneralConfig(t, func(c *chainlink.Config, s *chainlink.Secrets) {
c.EVM[0].GasEstimator.PriceMax = assets.GWei(500)
})
txStore := cltest.NewTestTxStore(t, db)
ctx := tests.Context(t)

ethClient := testutils.NewEthClientMockWithDefaultChain(t)
ethKeyStore := cltest.NewKeyStore(t, db).Eth()
_, fromAddress := cltest.MustInsertRandomKey(t, ethKeyStore)

evmcfg := evmtest.NewChainScopedConfig(t, cfg)

// Use a mock keystore for this test
ec := newEthConfirmer(t, txStore, ethClient, cfg, evmcfg, ethKeyStore, nil)
currentHead := int64(30)
oldEnough := int64(19)
nonce := int64(0)
terminallyStuckError := "failed to add tx to the pool: not enough step counters to continue the execution"

t.Run("terminally stuck transaction replaced with purge attempt", func(t *testing.T) {
originalBroadcastAt := time.Unix(1616509100, 0)
etx := cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, nonce, fromAddress, originalBroadcastAt)
nonce++
attempt1_1 := etx.TxAttempts[0]
var dbAttempt txmgr.DbEthTxAttempt
require.NoError(t, db.Get(&dbAttempt, `UPDATE evm.tx_attempts SET broadcast_before_block_num=$1 WHERE id=$2 RETURNING *`, oldEnough, attempt1_1.ID))

// Return terminally stuck error on first rebroadcast
ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *types.Transaction) bool {
return tx.Nonce() == uint64(*etx.Sequence)
}), fromAddress).Return(commonclient.TerminallyStuck, errors.New(terminallyStuckError)).Once()
// Return successful for purge attempt
ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *types.Transaction) bool {
return tx.Nonce() == uint64(*etx.Sequence)
}), fromAddress).Return(commonclient.Successful, nil).Once()

// Start processing transactions for rebroadcast
require.NoError(t, ec.RebroadcastWhereNecessary(tests.Context(t), currentHead))
var err error
etx, err = txStore.FindTxWithAttempts(ctx, etx.ID)
require.NoError(t, err)

require.Len(t, etx.TxAttempts, 2)
purgeAttempt := etx.TxAttempts[0]
require.True(t, purgeAttempt.IsPurgeAttempt)
})
}

func TestEthConfirmer_EnsureConfirmedTransactionsInLongestChain(t *testing.T) {
t.Parallel()

Expand Down
22 changes: 22 additions & 0 deletions core/chains/evm/txmgr/txmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,7 @@ func TestTxm_GetTransactionStatus(t *testing.T) {
t.Run("returns fatal for fatal error state with terminally stuck error", func(t *testing.T) {
idempotencyKey := uuid.New().String()
_, fromAddress := cltest.MustInsertRandomKey(t, ethKeyStore)
// Test the internal terminally stuck error returns Fatal
nonce := evmtypes.Nonce(0)
broadcast := time.Now()
tx := &txmgr.Tx{
Expand All @@ -804,6 +805,27 @@ func TestTxm_GetTransactionStatus(t *testing.T) {
state, err := txm.GetTransactionStatus(ctx, idempotencyKey)
require.Equal(t, commontypes.Fatal, state)
require.Error(t, err, evmclient.TerminallyStuckMsg)

// Test a terminally stuck client error returns Fatal
nonce = evmtypes.Nonce(1)
idempotencyKey = uuid.New().String()
terminallyStuckClientError := "failed to add tx to the pool: not enough step counters to continue the execution"
tx = &txmgr.Tx{
Sequence: &nonce,
IdempotencyKey: &idempotencyKey,
FromAddress: fromAddress,
EncodedPayload: []byte{1, 2, 3},
FeeLimit: feeLimit,
State: txmgrcommon.TxFatalError,
Error: null.NewString(terminallyStuckClientError, true),
BroadcastAt: &broadcast,
InitialBroadcastAt: &broadcast,
}
err = txStore.InsertTx(ctx, tx)
require.NoError(t, err)
state, err = txm.GetTransactionStatus(ctx, idempotencyKey)
require.Equal(t, commontypes.Fatal, state)
require.Error(t, err, evmclient.TerminallyStuckMsg)
})

t.Run("returns failed for fatal error state with other error", func(t *testing.T) {
Expand Down

0 comments on commit dca3aec

Please sign in to comment.