Skip to content

Commit

Permalink
Merge branch 'txmv2_stuck_tx_detection' into txmv2_enablement
Browse files Browse the repository at this point in the history
  • Loading branch information
dimriou committed Nov 19, 2024
2 parents 0d7b1c4 + 54a59d4 commit 409ac1a
Show file tree
Hide file tree
Showing 28 changed files with 3,126 additions and 3,661 deletions.
11 changes: 11 additions & 0 deletions .changeset/late-windows-clean.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
"chainlink": minor
---

#internal Updated the TXM confirmation logic to use the mined transaction count to identify re-org'd or confirmed transactions.

- Confirmer uses the mined transaction count to determine if transactions have been re-org'd or confirmed.
- Confirmer no longer sets transaction states to `confirmed_missing_receipt`. This state is maintained in queries for backwards compatibility.
- Finalizer now responsible for fetching and storing receipts for confirmed transactions.
- Finalizer now responsible for resuming pending task runs.
- Finalizer now responsible for marking old transactions without receipts broadcasted before the finalized head as fatal.
2 changes: 1 addition & 1 deletion common/txmgr/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) save
}
}
}
return eb.txStore.UpdateTxFatalError(ctx, etx)
return eb.txStore.UpdateTxFatalErrorAndDeleteAttempts(ctx, etx)
}

func observeTimeUntilBroadcast[CHAIN_ID types.ID](chainID CHAIN_ID, createdAt, broadcastAt time.Time) {
Expand Down
709 changes: 134 additions & 575 deletions common/txmgr/confirmer.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion common/txmgr/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) markTxFatal

// Set state to TxInProgress so the tracker can attempt to mark it as fatal
tx.State = TxInProgress
if err := tr.txStore.UpdateTxFatalError(ctx, tx); err != nil {
if err := tr.txStore.UpdateTxFatalErrorAndDeleteAttempts(ctx, tx); err != nil {
return fmt.Errorf("failed to mark tx %v as abandoned: %w", tx.ID, err)
}
return nil
Expand Down
1 change: 1 addition & 0 deletions common/txmgr/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RegisterRe
b.resumeCallback = fn
b.broadcaster.SetResumeCallback(fn)
b.confirmer.SetResumeCallback(fn)
b.finalizer.SetResumeCallback(fn)
}

// NewTxm creates a new Txm with the given configuration.
Expand Down
7 changes: 0 additions & 7 deletions common/txmgr/types/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import "time"

type TransactionManagerChainConfig interface {
BroadcasterChainConfig
ConfirmerChainConfig
}

type TransactionManagerFeeConfig interface {
Expand Down Expand Up @@ -46,12 +45,6 @@ type ConfirmerFeeConfig interface {
// from gas.Config
BumpThreshold() uint64
MaxFeePrice() string // logging value
BumpPercent() uint16
}

type ConfirmerChainConfig interface {
RPCDefaultBatchSize() uint32
FinalityDepth() uint32
}

type ConfirmerDatabaseConfig interface {
Expand Down
5 changes: 5 additions & 0 deletions common/txmgr/types/finalizer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package types

import (
"context"

"github.com/google/uuid"

"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink/v2/common/types"
)
Expand All @@ -9,4 +13,5 @@ type Finalizer[BLOCK_HASH types.Hashable, HEAD types.Head[BLOCK_HASH]] interface
// interfaces for running the underlying estimator
services.Service
DeliverLatestHead(head HEAD) bool
SetResumeCallback(callback func(ctx context.Context, id uuid.UUID, result interface{}, err error) error)
}
462 changes: 204 additions & 258 deletions common/txmgr/types/mocks/tx_store.go

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions common/txmgr/types/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,15 @@ func (e *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) GetChecker() (Transm
return t, nil
}

func (e *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) HasPurgeAttempt() bool {
for _, attempt := range e.TxAttempts {
if attempt.IsPurgeAttempt {
return true
}
}
return false
}

// Provides error classification to external components in a chain agnostic way
// Only exposes the error types that could be set in the transaction error field
type ErrorClassifier interface {
Expand Down
30 changes: 16 additions & 14 deletions common/txmgr/types/tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ type TxStore[
// Find confirmed txes beyond the minConfirmations param that require callback but have not yet been signaled
FindTxesPendingCallback(ctx context.Context, latest, finalized int64, chainID CHAIN_ID) (receiptsPlus []ReceiptPlus[R], err error)
// Update tx to mark that its callback has been signaled
UpdateTxCallbackCompleted(ctx context.Context, pipelineTaskRunRid uuid.UUID, chainId CHAIN_ID) error
SaveFetchedReceipts(ctx context.Context, r []R, state TxState, errorMsg *string, chainID CHAIN_ID) error
UpdateTxCallbackCompleted(ctx context.Context, pipelineTaskRunRid uuid.UUID, chainID CHAIN_ID) error
SaveFetchedReceipts(ctx context.Context, r []R) error

// additional methods for tx store management
CheckTxQueueCapacity(ctx context.Context, fromAddress ADDR, maxQueuedTransactions uint64, chainID CHAIN_ID) (err error)
Expand Down Expand Up @@ -68,20 +68,19 @@ type TransactionStore[
CountUnstartedTransactions(ctx context.Context, fromAddress ADDR, chainID CHAIN_ID) (count uint32, err error)
CreateTransaction(ctx context.Context, txRequest TxRequest[ADDR, TX_HASH], chainID CHAIN_ID) (tx Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
DeleteInProgressAttempt(ctx context.Context, attempt TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error
FindLatestSequence(ctx context.Context, fromAddress ADDR, chainId CHAIN_ID) (SEQ, error)
FindLatestSequence(ctx context.Context, fromAddress ADDR, chainID CHAIN_ID) (SEQ, error)
// FindReorgOrIncludedTxs returns either a list of re-org'd transactions or included transactions based on the provided sequence
FindReorgOrIncludedTxs(ctx context.Context, fromAddress ADDR, nonce SEQ, chainID CHAIN_ID) (reorgTx []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], includedTxs []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
FindTxsRequiringGasBump(ctx context.Context, address ADDR, blockNum, gasBumpThreshold, depth int64, chainID CHAIN_ID) (etxs []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
FindTxsRequiringResubmissionDueToInsufficientFunds(ctx context.Context, address ADDR, chainID CHAIN_ID) (etxs []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
FindTxAttemptsConfirmedMissingReceipt(ctx context.Context, chainID CHAIN_ID) (attempts []TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
FindTxAttemptsRequiringReceiptFetch(ctx context.Context, chainID CHAIN_ID) (attempts []TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
FindTxAttemptsRequiringResend(ctx context.Context, olderThan time.Time, maxInFlightTransactions uint32, chainID CHAIN_ID, address ADDR) (attempts []TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
// Search for Tx using the idempotencyKey and chainID
FindTxWithIdempotencyKey(ctx context.Context, idempotencyKey string, chainID CHAIN_ID) (tx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
// Search for Tx using the fromAddress and sequence
FindTxWithSequence(ctx context.Context, fromAddress ADDR, seq SEQ) (etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
FindNextUnstartedTransactionFromAddress(ctx context.Context, fromAddress ADDR, chainID CHAIN_ID) (*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error)

// FindTransactionsConfirmedInBlockRange retrieves tx with attempts and partial receipt values for optimization purpose
FindTransactionsConfirmedInBlockRange(ctx context.Context, highBlockNumber, lowBlockNumber int64, chainID CHAIN_ID) (etxs []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
FindEarliestUnconfirmedBroadcastTime(ctx context.Context, chainID CHAIN_ID) (null.Time, error)
FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context, chainID CHAIN_ID) (null.Int, error)
GetTxInProgress(ctx context.Context, fromAddress ADDR) (etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
Expand All @@ -90,23 +89,26 @@ type TransactionStore[
GetTxByID(ctx context.Context, id int64) (tx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
HasInProgressTransaction(ctx context.Context, account ADDR, chainID CHAIN_ID) (exists bool, err error)
LoadTxAttempts(ctx context.Context, etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error
MarkAllConfirmedMissingReceipt(ctx context.Context, chainID CHAIN_ID) (err error)
MarkOldTxesMissingReceiptAsErrored(ctx context.Context, blockNum int64, latestFinalizedBlockNum int64, chainID CHAIN_ID) error
PreloadTxes(ctx context.Context, attempts []TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error
SaveConfirmedMissingReceiptAttempt(ctx context.Context, timeout time.Duration, attempt *TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error
SaveConfirmedAttempt(ctx context.Context, timeout time.Duration, attempt *TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error
SaveInProgressAttempt(ctx context.Context, attempt *TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error
SaveInsufficientFundsAttempt(ctx context.Context, timeout time.Duration, attempt *TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error
SaveReplacementInProgressAttempt(ctx context.Context, oldAttempt TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], replacementAttempt *TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error
SaveSentAttempt(ctx context.Context, timeout time.Duration, attempt *TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error
SetBroadcastBeforeBlockNum(ctx context.Context, blockNum int64, chainID CHAIN_ID) error
UpdateBroadcastAts(ctx context.Context, now time.Time, etxIDs []int64) error
UpdateTxAttemptInProgressToBroadcast(ctx context.Context, etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], NewAttemptState TxAttemptState) error
// Update tx to mark that its callback has been signaled
UpdateTxCallbackCompleted(ctx context.Context, pipelineTaskRunRid uuid.UUID, chainId CHAIN_ID) error
UpdateTxsUnconfirmed(ctx context.Context, ids []int64) error
// UpdateTxCallbackCompleted updates tx to mark that its callback has been signaled
UpdateTxCallbackCompleted(ctx context.Context, pipelineTaskRunRid uuid.UUID, chainID CHAIN_ID) error
// UpdateTxConfirmed updates transaction states to confirmed
UpdateTxConfirmed(ctx context.Context, etxIDs []int64) error
// UpdateTxFatalErrorAndDeleteAttempts updates transaction states to fatal error, deletes attempts, and clears broadcast info and sequence
UpdateTxFatalErrorAndDeleteAttempts(ctx context.Context, etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error
// UpdateTxFatalError updates transaction states to fatal error with error message
UpdateTxFatalError(ctx context.Context, etxIDs []int64, errMsg string) error
UpdateTxsForRebroadcast(ctx context.Context, etxIDs []int64, attemptIDs []int64) error
UpdateTxsUnconfirmed(ctx context.Context, etxIDs []int64) error
UpdateTxUnstartedToInProgress(ctx context.Context, etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt *TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error
UpdateTxFatalError(ctx context.Context, etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error
UpdateTxForRebroadcast(ctx context.Context, etx Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], etxAttempt TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error
}

type TxHistoryReaper[CHAIN_ID types.ID] interface {
Expand Down
12 changes: 6 additions & 6 deletions core/capabilities/ccip/ocrimpls/contract_transmitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,12 @@ func testTransmitter(

// wait for receipt to be written to the db
require.Eventually(t, func() bool {
rows, err := uni.db.QueryContext(testutils.Context(t), `SELECT count(*) as cnt FROM evm.receipts LIMIT 1`)
require.NoError(t, err, "failed to query receipts")
defer rows.Close()
var count int
for rows.Next() {
require.NoError(t, rows.Scan(&count), "failed to scan")
uni.backend.Commit()
var count uint32
err := uni.db.GetContext(testutils.Context(t), &count, `SELECT count(*) as cnt FROM evm.receipts LIMIT 1`)
require.NoError(t, err)
if count == 1 {
t.Log("tx receipt found in db")
}
return count == 1
}, testutils.WaitTimeout(t), 2*time.Second)
Expand Down
10 changes: 5 additions & 5 deletions core/chains/evm/txmgr/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/ethereum/go-ethereum/common"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
"github.com/smartcontractkit/chainlink/v2/common/txmgr"
Expand Down Expand Up @@ -64,8 +65,8 @@ func NewTxm(
evmBroadcaster := NewEvmBroadcaster(txStore, txmClient, txmCfg, feeCfg, txConfig, listenerConfig, keyStore, txAttemptBuilder, lggr, checker, chainConfig.NonceAutoSync(), chainConfig.ChainType())
evmTracker := NewEvmTracker(txStore, keyStore, chainID, lggr)
stuckTxDetector := NewStuckTxDetector(lggr, client.ConfiguredChainID(), chainConfig.ChainType(), fCfg.PriceMax(), txConfig.AutoPurge(), estimator, txStore, client)
evmConfirmer := NewEvmConfirmer(txStore, txmClient, txmCfg, feeCfg, txConfig, dbConfig, keyStore, txAttemptBuilder, lggr, stuckTxDetector, headTracker)
evmFinalizer := NewEvmFinalizer(lggr, client.ConfiguredChainID(), chainConfig.RPCDefaultBatchSize(), txStore, client, headTracker)
evmConfirmer := NewEvmConfirmer(txStore, txmClient, feeCfg, txConfig, dbConfig, keyStore, txAttemptBuilder, lggr, stuckTxDetector, headTracker)
evmFinalizer := NewEvmFinalizer(lggr, client.ConfiguredChainID(), chainConfig.RPCDefaultBatchSize(), txConfig.ForwardersEnabled(), txStore, txmClient, headTracker)
var evmResender *Resender
if txConfig.ResendAfterThreshold() > 0 {
evmResender = NewEvmResender(lggr, txStore, txmClient, evmTracker, keyStore, txmgr.DefaultResenderPollInterval, chainConfig, txConfig)
Expand Down Expand Up @@ -167,7 +168,6 @@ func NewEvmReaper(lggr logger.Logger, store txmgrtypes.TxHistoryReaper[*big.Int]
func NewEvmConfirmer(
txStore TxStore,
client TxmClient,
chainConfig txmgrtypes.ConfirmerChainConfig,
feeConfig txmgrtypes.ConfirmerFeeConfig,
txConfig txmgrtypes.ConfirmerTransactionsConfig,
dbConfig txmgrtypes.ConfirmerDatabaseConfig,
Expand All @@ -177,7 +177,7 @@ func NewEvmConfirmer(
stuckTxDetector StuckTxDetector,
headTracker latestAndFinalizedBlockHeadTracker,
) *Confirmer {
return txmgr.NewConfirmer(txStore, client, chainConfig, feeConfig, txConfig, dbConfig, keystore, txAttemptBuilder, lggr, func(r *evmtypes.Receipt) bool { return r == nil }, stuckTxDetector, headTracker)
return txmgr.NewConfirmer(txStore, client, feeConfig, txConfig, dbConfig, keystore, txAttemptBuilder, lggr, func(r *evmtypes.Receipt) bool { return r == nil }, stuckTxDetector)
}

// NewEvmTracker instantiates a new EVM tracker for abandoned transactions
Expand All @@ -187,7 +187,7 @@ func NewEvmTracker(
chainID *big.Int,
lggr logger.Logger,
) *Tracker {
return txmgr.NewTracker(txStore, keyStore, chainID, lggr)
return txmgr.NewTracker[*big.Int, common.Address, common.Hash, common.Hash, *evmtypes.Receipt](txStore, keyStore, chainID, lggr)
}

// NewEvmBroadcaster returns a new concrete EvmBroadcaster
Expand Down
6 changes: 5 additions & 1 deletion core/chains/evm/txmgr/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (c *evmTxmClient) BatchGetReceipts(ctx context.Context, attempts []TxAttemp
}

if err := c.client.BatchCallContext(ctx, reqs); err != nil {
return nil, nil, fmt.Errorf("EthConfirmer#batchFetchReceipts error fetching receipts with BatchCallContext: %w", err)
return nil, nil, fmt.Errorf("error fetching receipts with BatchCallContext: %w", err)
}

for _, req := range reqs {
Expand Down Expand Up @@ -192,3 +192,7 @@ func (c *evmTxmClient) CallContract(ctx context.Context, a TxAttempt, blockNumbe
func (c *evmTxmClient) HeadByHash(ctx context.Context, hash common.Hash) (*evmtypes.Head, error) {
return c.client.HeadByHash(ctx, hash)
}

func (c *evmTxmClient) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error {
return c.client.BatchCallContext(ctx, b)
}
1 change: 0 additions & 1 deletion core/chains/evm/txmgr/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ type (
EvmTxmConfig txmgrtypes.TransactionManagerChainConfig
EvmTxmFeeConfig txmgrtypes.TransactionManagerFeeConfig
EvmBroadcasterConfig txmgrtypes.BroadcasterChainConfig
EvmConfirmerConfig txmgrtypes.ConfirmerChainConfig
EvmResenderConfig txmgrtypes.ResenderChainConfig
)

Expand Down
Loading

0 comments on commit 409ac1a

Please sign in to comment.