diff --git a/.changeset/chatty-spiders-double.md b/.changeset/chatty-spiders-double.md new file mode 100644 index 00000000000..750a11628fe --- /dev/null +++ b/.changeset/chatty-spiders-double.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +remove dependency on FinalityDepth in EVM TXM code. #internal diff --git a/common/txmgr/confirmer.go b/common/txmgr/confirmer.go index 1e3922fdbfb..3b421191782 100644 --- a/common/txmgr/confirmer.go +++ b/common/txmgr/confirmer.go @@ -102,6 +102,10 @@ var ( }, []string{"chainID"}) ) +type confirmerHeadTracker[HEAD types.Head[BLOCK_HASH], BLOCK_HASH types.Hashable] interface { + LatestAndFinalizedBlock(ctx context.Context) (latest, finalized HEAD, err error) +} + // Confirmer is a broad service which performs four different tasks in sequence on every new longest chain // Step 1: Mark that all currently pending transaction attempts were broadcast before this block // Step 2: Check pending transactions for receipts @@ -133,14 +137,15 @@ type Confirmer[ ks txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ] enabledAddresses []ADDR - mb *mailbox.Mailbox[HEAD] - stopCh services.StopChan - wg sync.WaitGroup - initSync sync.Mutex - isStarted bool - + mb *mailbox.Mailbox[HEAD] + stopCh services.StopChan + wg sync.WaitGroup + initSync sync.Mutex + isStarted bool nConsecutiveBlocksChainTooShort int isReceiptNil func(R) bool + + headTracker confirmerHeadTracker[HEAD, BLOCK_HASH] } func NewConfirmer[ @@ -164,6 +169,7 @@ func NewConfirmer[ lggr logger.Logger, isReceiptNil func(R) bool, stuckTxDetector txmgrtypes.StuckTxDetector[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + headTracker confirmerHeadTracker[HEAD, BLOCK_HASH], ) *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { lggr = logger.Named(lggr, "Confirmer") return &Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{ @@ -181,6 +187,7 @@ func NewConfirmer[ mb: mailbox.NewSingle[HEAD](), isReceiptNil: isReceiptNil, stuckTxDetector: stuckTxDetector, + headTracker: headTracker, } } @@ -297,7 +304,20 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pro return fmt.Errorf("CheckConfirmedMissingReceipt failed: %w", err) } - if err := ec.CheckForReceipts(ctx, head.BlockNumber()); err != nil { + _, latestFinalizedHead, err := ec.headTracker.LatestAndFinalizedBlock(ctx) + if err != nil { + return fmt.Errorf("failed to retrieve latest finalized head: %w", err) + } + + if !latestFinalizedHead.IsValid() { + return fmt.Errorf("latest finalized head is not valid") + } + + if latestFinalizedHead.BlockNumber() > head.BlockNumber() { + ec.lggr.Debugw("processHead received old block", "latestFinalizedHead", latestFinalizedHead.BlockNumber(), "headNum", head.BlockNumber(), "time", time.Since(mark), "id", "confirmer") + } + + if err := ec.CheckForReceipts(ctx, head.BlockNumber(), latestFinalizedHead.BlockNumber()); err != nil { return fmt.Errorf("CheckForReceipts failed: %w", err) } @@ -318,7 +338,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pro ec.lggr.Debugw("Finished RebroadcastWhereNecessary", "headNum", head.BlockNumber(), "time", time.Since(mark), "id", "confirmer") mark = time.Now() - if err := ec.EnsureConfirmedTransactionsInLongestChain(ctx, head); err != nil { + if err := ec.EnsureConfirmedTransactionsInLongestChain(ctx, head, latestFinalizedHead.BlockNumber()); err != nil { return fmt.Errorf("EnsureConfirmedTransactionsInLongestChain failed: %w", err) } @@ -395,8 +415,8 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Che return } -// CheckForReceipts finds attempts that are still pending and checks to see if a receipt is present for the given block number -func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CheckForReceipts(ctx context.Context, blockNum int64) error { +// CheckForReceipts finds attempts that are still pending and checks to see if a receipt is present for the given block number. +func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CheckForReceipts(ctx context.Context, blockNum int64, latestFinalizedBlockNum int64) error { attempts, err := ec.txStore.FindTxAttemptsRequiringReceiptFetch(ctx, ec.chainID) if err != nil { return fmt.Errorf("FindTxAttemptsRequiringReceiptFetch failed: %w", err) @@ -443,7 +463,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Che return fmt.Errorf("unable to mark txes as 'confirmed_missing_receipt': %w", err) } - if err := ec.txStore.MarkOldTxesMissingReceiptAsErrored(ctx, blockNum, ec.chainConfig.FinalityDepth(), ec.chainID); err != nil { + if err := ec.txStore.MarkOldTxesMissingReceiptAsErrored(ctx, blockNum, latestFinalizedBlockNum, ec.chainID); err != nil { return fmt.Errorf("unable to confirm buried unconfirmed txes': %w", err) } return nil @@ -1004,22 +1024,30 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) han } } -// EnsureConfirmedTransactionsInLongestChain finds all confirmed txes up to the depth +// EnsureConfirmedTransactionsInLongestChain finds all confirmed txes up to the earliest head // of the given chain and ensures that every one has a receipt with a block hash that is // in the given chain. // // If any of the confirmed transactions does not have a receipt in the chain, it has been // re-org'd out and will be rebroadcast. -func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) EnsureConfirmedTransactionsInLongestChain(ctx context.Context, head types.Head[BLOCK_HASH]) error { - if head.ChainLength() < ec.chainConfig.FinalityDepth() { - logArgs := []interface{}{ - "chainLength", head.ChainLength(), "finalityDepth", ec.chainConfig.FinalityDepth(), - } +func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) EnsureConfirmedTransactionsInLongestChain(ctx context.Context, head types.Head[BLOCK_HASH], latestFinalizedHeadNumber int64) error { + logArgs := []interface{}{ + "chainLength", head.ChainLength(), "latestFinalizedHead number", latestFinalizedHeadNumber, + } + + if head.BlockNumber() < latestFinalizedHeadNumber { + errMsg := "current head is shorter than latest finalized head" + ec.lggr.Errorw(errMsg, append(logArgs, "head block number", head.BlockNumber())...) + return errors.New(errMsg) + } + + calculatedFinalityDepth := uint32(head.BlockNumber() - latestFinalizedHeadNumber) + if head.ChainLength() < calculatedFinalityDepth { if ec.nConsecutiveBlocksChainTooShort > logAfterNConsecutiveBlocksChainTooShort { - warnMsg := "Chain length supplied for re-org detection was shorter than FinalityDepth. Re-org protection is not working properly. This could indicate a problem with the remote RPC endpoint, a compatibility issue with a particular blockchain, a bug with this particular blockchain, heads table being truncated too early, remote node out of sync, or something else. If this happens a lot please raise a bug with the Chainlink team including a log output sample and details of the chain and RPC endpoint you are using." + warnMsg := "Chain length supplied for re-org detection was shorter than the depth from the latest head to the finalized head. Re-org protection is not working properly. This could indicate a problem with the remote RPC endpoint, a compatibility issue with a particular blockchain, a bug with this particular blockchain, heads table being truncated too early, remote node out of sync, or something else. If this happens a lot please raise a bug with the Chainlink team including a log output sample and details of the chain and RPC endpoint you are using." ec.lggr.Warnw(warnMsg, append(logArgs, "nConsecutiveBlocksChainTooShort", ec.nConsecutiveBlocksChainTooShort)...) } else { - logMsg := "Chain length supplied for re-org detection was shorter than FinalityDepth" + logMsg := "Chain length supplied for re-org detection was shorter than the depth from the latest head to the finalized head" ec.lggr.Debugw(logMsg, append(logArgs, "nConsecutiveBlocksChainTooShort", ec.nConsecutiveBlocksChainTooShort)...) } ec.nConsecutiveBlocksChainTooShort++ diff --git a/common/txmgr/types/mocks/tx_store.go b/common/txmgr/types/mocks/tx_store.go index 0b9c7110660..4467729e167 100644 --- a/common/txmgr/types/mocks/tx_store.go +++ b/common/txmgr/types/mocks/tx_store.go @@ -1854,17 +1854,17 @@ func (_c *TxStore_MarkAllConfirmedMissingReceipt_Call[ADDR, CHAIN_ID, TX_HASH, B return _c } -// MarkOldTxesMissingReceiptAsErrored provides a mock function with given fields: ctx, blockNum, finalityDepth, chainID -func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MarkOldTxesMissingReceiptAsErrored(ctx context.Context, blockNum int64, finalityDepth uint32, chainID CHAIN_ID) error { - ret := _m.Called(ctx, blockNum, finalityDepth, chainID) +// MarkOldTxesMissingReceiptAsErrored provides a mock function with given fields: ctx, blockNum, latestFinalizedBlockNum, chainID +func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MarkOldTxesMissingReceiptAsErrored(ctx context.Context, blockNum int64, latestFinalizedBlockNum int64, chainID CHAIN_ID) error { + ret := _m.Called(ctx, blockNum, latestFinalizedBlockNum, chainID) if len(ret) == 0 { panic("no return value specified for MarkOldTxesMissingReceiptAsErrored") } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, int64, uint32, CHAIN_ID) error); ok { - r0 = rf(ctx, blockNum, finalityDepth, chainID) + if rf, ok := ret.Get(0).(func(context.Context, int64, int64, CHAIN_ID) error); ok { + r0 = rf(ctx, blockNum, latestFinalizedBlockNum, chainID) } else { r0 = ret.Error(0) } @@ -1880,15 +1880,15 @@ type TxStore_MarkOldTxesMissingReceiptAsErrored_Call[ADDR types.Hashable, CHAIN_ // MarkOldTxesMissingReceiptAsErrored is a helper method to define mock.On call // - ctx context.Context // - blockNum int64 -// - finalityDepth uint32 +// - latestFinalizedBlockNum int64 // - chainID CHAIN_ID -func (_e *TxStore_Expecter[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MarkOldTxesMissingReceiptAsErrored(ctx interface{}, blockNum interface{}, finalityDepth interface{}, chainID interface{}) *TxStore_MarkOldTxesMissingReceiptAsErrored_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { - return &TxStore_MarkOldTxesMissingReceiptAsErrored_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{Call: _e.mock.On("MarkOldTxesMissingReceiptAsErrored", ctx, blockNum, finalityDepth, chainID)} +func (_e *TxStore_Expecter[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MarkOldTxesMissingReceiptAsErrored(ctx interface{}, blockNum interface{}, latestFinalizedBlockNum interface{}, chainID interface{}) *TxStore_MarkOldTxesMissingReceiptAsErrored_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { + return &TxStore_MarkOldTxesMissingReceiptAsErrored_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{Call: _e.mock.On("MarkOldTxesMissingReceiptAsErrored", ctx, blockNum, latestFinalizedBlockNum, chainID)} } -func (_c *TxStore_MarkOldTxesMissingReceiptAsErrored_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Run(run func(ctx context.Context, blockNum int64, finalityDepth uint32, chainID CHAIN_ID)) *TxStore_MarkOldTxesMissingReceiptAsErrored_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { +func (_c *TxStore_MarkOldTxesMissingReceiptAsErrored_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Run(run func(ctx context.Context, blockNum int64, latestFinalizedBlockNum int64, chainID CHAIN_ID)) *TxStore_MarkOldTxesMissingReceiptAsErrored_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(int64), args[2].(uint32), args[3].(CHAIN_ID)) + run(args[0].(context.Context), args[1].(int64), args[2].(int64), args[3].(CHAIN_ID)) }) return _c } @@ -1898,7 +1898,7 @@ func (_c *TxStore_MarkOldTxesMissingReceiptAsErrored_Call[ADDR, CHAIN_ID, TX_HAS return _c } -func (_c *TxStore_MarkOldTxesMissingReceiptAsErrored_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RunAndReturn(run func(context.Context, int64, uint32, CHAIN_ID) error) *TxStore_MarkOldTxesMissingReceiptAsErrored_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { +func (_c *TxStore_MarkOldTxesMissingReceiptAsErrored_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RunAndReturn(run func(context.Context, int64, int64, CHAIN_ID) error) *TxStore_MarkOldTxesMissingReceiptAsErrored_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { _c.Call.Return(run) return _c } diff --git a/common/txmgr/types/tx_store.go b/common/txmgr/types/tx_store.go index 63b56dd169a..5489a57e636 100644 --- a/common/txmgr/types/tx_store.go +++ b/common/txmgr/types/tx_store.go @@ -89,7 +89,7 @@ type TransactionStore[ HasInProgressTransaction(ctx context.Context, account ADDR, chainID CHAIN_ID) (exists bool, err error) LoadTxAttempts(ctx context.Context, etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error MarkAllConfirmedMissingReceipt(ctx context.Context, chainID CHAIN_ID) (err error) - MarkOldTxesMissingReceiptAsErrored(ctx context.Context, blockNum int64, finalityDepth uint32, chainID CHAIN_ID) error + MarkOldTxesMissingReceiptAsErrored(ctx context.Context, blockNum int64, latestFinalizedBlockNum int64, chainID CHAIN_ID) error PreloadTxes(ctx context.Context, attempts []TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error SaveConfirmedMissingReceiptAttempt(ctx context.Context, timeout time.Duration, attempt *TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error SaveInProgressAttempt(ctx context.Context, attempt *TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error diff --git a/core/chains/evm/txmgr/builder.go b/core/chains/evm/txmgr/builder.go index d85d6acdc8c..cbfb8775cfb 100644 --- a/core/chains/evm/txmgr/builder.go +++ b/core/chains/evm/txmgr/builder.go @@ -1,6 +1,7 @@ package txmgr import ( + "context" "math/big" "time" @@ -13,12 +14,15 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/chains/evm/config/chaintype" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/forwarders" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas" - httypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker/types" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/keystore" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" ) +type latestAndFinalizedBlockHeadTracker interface { + LatestAndFinalizedBlock(ctx context.Context) (latest, finalized *evmtypes.Head, err error) +} + // NewTxm constructs the necessary dependencies for the EvmTxm (broadcaster, confirmer, etc) and returns a new EvmTxManager func NewTxm( ds sqlutil.DataSource, @@ -33,7 +37,7 @@ func NewTxm( logPoller logpoller.LogPoller, keyStore keystore.Eth, estimator gas.EvmFeeEstimator, - headTracker httypes.HeadTracker, + headTracker latestAndFinalizedBlockHeadTracker, ) (txm TxManager, err error, ) { @@ -55,7 +59,7 @@ func NewTxm( evmBroadcaster := NewEvmBroadcaster(txStore, txmClient, txmCfg, feeCfg, txConfig, listenerConfig, keyStore, txAttemptBuilder, lggr, checker, chainConfig.NonceAutoSync(), chainConfig.ChainType()) evmTracker := NewEvmTracker(txStore, keyStore, chainID, lggr) stuckTxDetector := NewStuckTxDetector(lggr, client.ConfiguredChainID(), chainConfig.ChainType(), fCfg.PriceMax(), txConfig.AutoPurge(), estimator, txStore, client) - evmConfirmer := NewEvmConfirmer(txStore, txmClient, txmCfg, feeCfg, txConfig, dbConfig, keyStore, txAttemptBuilder, lggr, stuckTxDetector) + evmConfirmer := NewEvmConfirmer(txStore, txmClient, txmCfg, feeCfg, txConfig, dbConfig, keyStore, txAttemptBuilder, lggr, stuckTxDetector, headTracker) evmFinalizer := NewEvmFinalizer(lggr, client.ConfiguredChainID(), chainConfig.RPCDefaultBatchSize(), txStore, client, headTracker) var evmResender *Resender if txConfig.ResendAfterThreshold() > 0 { @@ -116,8 +120,9 @@ func NewEvmConfirmer( txAttemptBuilder TxAttemptBuilder, lggr logger.Logger, stuckTxDetector StuckTxDetector, + headTracker latestAndFinalizedBlockHeadTracker, ) *Confirmer { - return txmgr.NewConfirmer(txStore, client, chainConfig, feeConfig, txConfig, dbConfig, keystore, txAttemptBuilder, lggr, func(r *evmtypes.Receipt) bool { return r == nil }, stuckTxDetector) + return txmgr.NewConfirmer(txStore, client, chainConfig, feeConfig, txConfig, dbConfig, keystore, txAttemptBuilder, lggr, func(r *evmtypes.Receipt) bool { return r == nil }, stuckTxDetector, headTracker) } // NewEvmTracker instantiates a new EVM tracker for abandoned transactions diff --git a/core/chains/evm/txmgr/confirmer_test.go b/core/chains/evm/txmgr/confirmer_test.go index 6b107b222a6..cce6dc8fc65 100644 --- a/core/chains/evm/txmgr/confirmer_test.go +++ b/core/chains/evm/txmgr/confirmer_test.go @@ -34,6 +34,7 @@ import ( evmconfig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/config" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas" gasmocks "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas/mocks" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/keystore" ksmocks "github.com/smartcontractkit/chainlink/v2/core/chains/evm/keystore/mocks" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/testutils" @@ -131,7 +132,8 @@ func TestEthConfirmer_Lifecycle(t *testing.T) { feeEstimator := gas.NewEvmFeeEstimator(lggr, newEst, ge.EIP1559DynamicFees(), ge) txBuilder := txmgr.NewEvmTxAttemptBuilder(*ethClient.ConfiguredChainID(), ge, ethKeyStore, feeEstimator) stuckTxDetector := txmgr.NewStuckTxDetector(lggr, testutils.FixtureChainID, "", assets.NewWei(assets.NewEth(100).ToInt()), config.EVM().Transactions().AutoPurge(), feeEstimator, txStore, ethClient) - ec := txmgr.NewEvmConfirmer(txStore, txmgr.NewEvmTxmClient(ethClient, nil), txmgr.NewEvmTxmConfig(config.EVM()), txmgr.NewEvmTxmFeeConfig(ge), config.EVM().Transactions(), gconfig.Database(), ethKeyStore, txBuilder, lggr, stuckTxDetector) + ht := headtracker.NewSimulatedHeadTracker(ethClient, true, 0) + ec := txmgr.NewEvmConfirmer(txStore, txmgr.NewEvmTxmClient(ethClient, nil), txmgr.NewEvmTxmConfig(config.EVM()), txmgr.NewEvmTxmFeeConfig(ge), config.EVM().Transactions(), gconfig.Database(), ethKeyStore, txBuilder, lggr, stuckTxDetector, ht) ctx := tests.Context(t) // Can't close unstarted instance @@ -145,19 +147,27 @@ func TestEthConfirmer_Lifecycle(t *testing.T) { // Can't start an already started instance err = ec.Start(ctx) require.Error(t, err) + + latestFinalizedHead := evmtypes.Head{ + Number: 8, + Hash: testutils.NewHash(), + Parent: nil, + IsFinalized: true, // We are guaranteed to receive a latestFinalizedHead. + } + head := evmtypes.Head{ Hash: testutils.NewHash(), Number: 10, Parent: &evmtypes.Head{ Hash: testutils.NewHash(), Number: 9, - Parent: &evmtypes.Head{ - Number: 8, - Hash: testutils.NewHash(), - Parent: nil, - }, + Parent: &latestFinalizedHead, }, } + + ethClient.On("HeadByNumber", mock.Anything, (*big.Int)(nil)).Return(&head, nil).Once() + ethClient.On("LatestFinalizedBlock", mock.Anything).Return(&latestFinalizedHead, nil).Once() + err = ec.ProcessHead(ctx, &head) require.NoError(t, err) // Can successfully close once @@ -199,6 +209,7 @@ func TestEthConfirmer_CheckForReceipts(t *testing.T) { nonce := int64(0) ctx := tests.Context(t) blockNum := int64(0) + latestFinalizedBlockNum := int64(0) t.Run("only finds eth_txes in unconfirmed state with at least one broadcast attempt", func(t *testing.T) { mustInsertFatalErrorEthTx(t, txStore, fromAddress) @@ -211,7 +222,7 @@ func TestEthConfirmer_CheckForReceipts(t *testing.T) { mustCreateUnstartedGeneratedTx(t, txStore, fromAddress, config.EVM().ChainID()) // Do the thing - require.NoError(t, ec.CheckForReceipts(ctx, blockNum)) + require.NoError(t, ec.CheckForReceipts(ctx, blockNum, latestFinalizedBlockNum)) }) etx1 := cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, nonce, fromAddress) @@ -232,7 +243,7 @@ func TestEthConfirmer_CheckForReceipts(t *testing.T) { }).Once() // Do the thing - require.NoError(t, ec.CheckForReceipts(ctx, blockNum)) + require.NoError(t, ec.CheckForReceipts(ctx, blockNum, latestFinalizedBlockNum)) var err error etx1, err = txStore.FindTxWithAttempts(ctx, etx1.ID) @@ -261,7 +272,7 @@ func TestEthConfirmer_CheckForReceipts(t *testing.T) { }).Once() // No error because it is merely logged - require.NoError(t, ec.CheckForReceipts(ctx, blockNum)) + require.NoError(t, ec.CheckForReceipts(ctx, blockNum, latestFinalizedBlockNum)) etx, err := txStore.FindTxWithAttempts(ctx, etx1.ID) require.NoError(t, err) @@ -289,7 +300,7 @@ func TestEthConfirmer_CheckForReceipts(t *testing.T) { }).Once() // No error because it is merely logged - require.NoError(t, ec.CheckForReceipts(ctx, blockNum)) + require.NoError(t, ec.CheckForReceipts(ctx, blockNum, latestFinalizedBlockNum)) etx, err := txStore.FindTxWithAttempts(ctx, etx1.ID) require.NoError(t, err) @@ -326,7 +337,7 @@ func TestEthConfirmer_CheckForReceipts(t *testing.T) { }).Once() // Do the thing - require.NoError(t, ec.CheckForReceipts(ctx, blockNum)) + require.NoError(t, ec.CheckForReceipts(ctx, blockNum, latestFinalizedBlockNum)) // Check that the receipt was saved etx, err := txStore.FindTxWithAttempts(ctx, etx1.ID) @@ -388,7 +399,7 @@ func TestEthConfirmer_CheckForReceipts(t *testing.T) { }).Once() // Do the thing - require.NoError(t, ec.CheckForReceipts(ctx, blockNum)) + require.NoError(t, ec.CheckForReceipts(ctx, blockNum, latestFinalizedBlockNum)) // Check that the state was updated etx, err := txStore.FindTxWithAttempts(ctx, etx2.ID) @@ -416,7 +427,7 @@ func TestEthConfirmer_CheckForReceipts(t *testing.T) { }).Once() // Do the thing - require.NoError(t, ec.CheckForReceipts(ctx, blockNum)) + require.NoError(t, ec.CheckForReceipts(ctx, blockNum, latestFinalizedBlockNum)) // No receipt, but no error either etx, err := txStore.FindTxWithAttempts(ctx, etx3.ID) @@ -443,7 +454,7 @@ func TestEthConfirmer_CheckForReceipts(t *testing.T) { }).Once() // Do the thing - require.NoError(t, ec.CheckForReceipts(ctx, blockNum)) + require.NoError(t, ec.CheckForReceipts(ctx, blockNum, latestFinalizedBlockNum)) // No receipt, but no error either etx, err := txStore.FindTxWithAttempts(ctx, etx3.ID) @@ -472,7 +483,7 @@ func TestEthConfirmer_CheckForReceipts(t *testing.T) { }).Once() // Do the thing - require.NoError(t, ec.CheckForReceipts(ctx, blockNum)) + require.NoError(t, ec.CheckForReceipts(ctx, blockNum, latestFinalizedBlockNum)) // Check that the receipt was unchanged etx, err := txStore.FindTxWithAttempts(ctx, etx3.ID) @@ -523,7 +534,7 @@ func TestEthConfirmer_CheckForReceipts(t *testing.T) { }).Once() // Do the thing - require.NoError(t, ec.CheckForReceipts(ctx, blockNum)) + require.NoError(t, ec.CheckForReceipts(ctx, blockNum, latestFinalizedBlockNum)) // Check that the state was updated var err error @@ -576,7 +587,7 @@ func TestEthConfirmer_CheckForReceipts(t *testing.T) { }).Once() // Do the thing - require.NoError(t, ec.CheckForReceipts(ctx, blockNum)) + require.NoError(t, ec.CheckForReceipts(ctx, blockNum, latestFinalizedBlockNum)) // Check that the state was updated etx5, err = txStore.FindTxWithAttempts(ctx, etx5.ID) @@ -614,6 +625,7 @@ func TestEthConfirmer_CheckForReceipts_batching(t *testing.T) { etx := cltest.MustInsertUnconfirmedEthTx(t, txStore, 0, fromAddress) var attempts []txmgr.TxAttempt + latestFinalizedBlockNum := int64(0) // Total of 5 attempts should lead to 3 batched fetches (2, 2, 1) for i := 0; i < 5; i++ { @@ -650,7 +662,7 @@ func TestEthConfirmer_CheckForReceipts_batching(t *testing.T) { elems[0].Result = &evmtypes.Receipt{} }).Once() - require.NoError(t, ec.CheckForReceipts(ctx, 42)) + require.NoError(t, ec.CheckForReceipts(ctx, 42, latestFinalizedBlockNum)) } func TestEthConfirmer_CheckForReceipts_HandlesNonFwdTxsWithForwardingEnabled(t *testing.T) { @@ -671,6 +683,8 @@ func TestEthConfirmer_CheckForReceipts_HandlesNonFwdTxsWithForwardingEnabled(t * _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore) ec := newEthConfirmer(t, txStore, ethClient, cfg, evmcfg, ethKeyStore, nil) ctx := tests.Context(t) + latestFinalizedBlockNum := int64(0) + // tx is not forwarded and doesn't have meta set. EthConfirmer should handle nil meta values etx := cltest.MustInsertUnconfirmedEthTx(t, txStore, 0, fromAddress) attempt := newBroadcastLegacyEthTxAttempt(t, etx.ID, 2) @@ -697,7 +711,7 @@ func TestEthConfirmer_CheckForReceipts_HandlesNonFwdTxsWithForwardingEnabled(t * *(elems[0].Result.(*evmtypes.Receipt)) = txmReceipt // confirmed }).Once() - require.NoError(t, ec.CheckForReceipts(ctx, 42)) + require.NoError(t, ec.CheckForReceipts(ctx, 42, latestFinalizedBlockNum)) // Check receipt is inserted correctly. dbtx, err = txStore.FindTxWithAttempts(ctx, etx.ID) @@ -724,6 +738,7 @@ func TestEthConfirmer_CheckForReceipts_only_likely_confirmed(t *testing.T) { ec := newEthConfirmer(t, txStore, ethClient, cfg, evmcfg, ethKeyStore, nil) ctx := tests.Context(t) + latestFinalizedBlockNum := int64(0) var attempts []txmgr.TxAttempt // inserting in DESC nonce order to test DB ASC ordering @@ -755,7 +770,7 @@ func TestEthConfirmer_CheckForReceipts_only_likely_confirmed(t *testing.T) { elems[3].Result = &evmtypes.Receipt{} }).Once() - require.NoError(t, ec.CheckForReceipts(ctx, 42)) + require.NoError(t, ec.CheckForReceipts(ctx, 42, latestFinalizedBlockNum)) cltest.BatchElemMustMatchParams(t, captured[0], attempts[0].Hash, "eth_getTransactionReceipt") cltest.BatchElemMustMatchParams(t, captured[1], attempts[1].Hash, "eth_getTransactionReceipt") @@ -778,6 +793,7 @@ func TestEthConfirmer_CheckForReceipts_should_not_check_for_likely_unconfirmed(t ec := newEthConfirmer(t, txStore, ethClient, gconfig, config, ethKeyStore, nil) ctx := tests.Context(t) + latestFinalizedBlockNum := int64(0) etx := cltest.MustInsertUnconfirmedEthTx(t, txStore, 1, fromAddress) for i := 0; i < 4; i++ { @@ -788,7 +804,7 @@ func TestEthConfirmer_CheckForReceipts_should_not_check_for_likely_unconfirmed(t // latest nonce is lower that all attempts' nonces ethClient.On("SequenceAt", mock.Anything, mock.Anything, mock.Anything).Return(evmtypes.Nonce(0), nil) - require.NoError(t, ec.CheckForReceipts(ctx, 42)) + require.NoError(t, ec.CheckForReceipts(ctx, 42, latestFinalizedBlockNum)) } func TestEthConfirmer_CheckForReceipts_confirmed_missing_receipt_scoped_to_key(t *testing.T) { @@ -809,6 +825,7 @@ func TestEthConfirmer_CheckForReceipts_confirmed_missing_receipt_scoped_to_key(t ec := newEthConfirmer(t, txStore, ethClient, cfg, evmcfg, ethKeyStore, nil) ctx := tests.Context(t) + latestFinalizedBlockNum := int64(0) // STATE // key 1, tx with nonce 0 is unconfirmed @@ -832,7 +849,7 @@ func TestEthConfirmer_CheckForReceipts_confirmed_missing_receipt_scoped_to_key(t *(elems[0].Result.(*evmtypes.Receipt)) = txmReceipt2_9 }).Once() - require.NoError(t, ec.CheckForReceipts(ctx, 10)) + require.NoError(t, ec.CheckForReceipts(ctx, 10, latestFinalizedBlockNum)) mustTxBeInState(t, txStore, etx1_0, txmgrcommon.TxUnconfirmed) mustTxBeInState(t, txStore, etx1_1, txmgrcommon.TxUnconfirmed) @@ -850,7 +867,7 @@ func TestEthConfirmer_CheckForReceipts_confirmed_missing_receipt_scoped_to_key(t *(elems[0].Result.(*evmtypes.Receipt)) = txmReceipt1_1 }).Once() - require.NoError(t, ec.CheckForReceipts(ctx, 11)) + require.NoError(t, ec.CheckForReceipts(ctx, 11, latestFinalizedBlockNum)) mustTxBeInState(t, txStore, etx1_0, txmgrcommon.TxConfirmedMissingReceipt) mustTxBeInState(t, txStore, etx1_1, txmgrcommon.TxConfirmed) @@ -861,9 +878,7 @@ func TestEthConfirmer_CheckForReceipts_confirmed_missing_receipt(t *testing.T) { t.Parallel() db := pgtest.NewSqlxDB(t) - cfg := configtest.NewGeneralConfig(t, func(c *chainlink.Config, s *chainlink.Secrets) { - c.EVM[0].FinalityDepth = ptr[uint32](50) - }) + cfg := configtest.NewGeneralConfig(t, func(c *chainlink.Config, s *chainlink.Secrets) {}) txStore := cltest.NewTestTxStore(t, db) ethKeyStore := cltest.NewKeyStore(t, db).Eth() @@ -876,6 +891,7 @@ func TestEthConfirmer_CheckForReceipts_confirmed_missing_receipt(t *testing.T) { ec := newEthConfirmer(t, txStore, ethClient, cfg, evmcfg, ethKeyStore, nil) ctx := tests.Context(t) + latestFinalizedBlockNum := int64(0) // STATE // eth_txes with nonce 0 has two attempts (broadcast before block 21 and 41) the first of which will get a receipt @@ -949,7 +965,7 @@ func TestEthConfirmer_CheckForReceipts_confirmed_missing_receipt(t *testing.T) { // PERFORM // Block num of 43 is one higher than the receipt (as would generally be expected) - require.NoError(t, ec.CheckForReceipts(ctx, 43)) + require.NoError(t, ec.CheckForReceipts(ctx, 43, latestFinalizedBlockNum)) // Expected state is that the "top" eth_tx is now confirmed, with the // two below it "confirmed_missing_receipt" and the "bottom" eth_tx also confirmed @@ -1009,7 +1025,7 @@ func TestEthConfirmer_CheckForReceipts_confirmed_missing_receipt(t *testing.T) { // PERFORM // Block num of 44 is one higher than the receipt (as would generally be expected) - require.NoError(t, ec.CheckForReceipts(ctx, 44)) + require.NoError(t, ec.CheckForReceipts(ctx, 44, latestFinalizedBlockNum)) // Expected state is that the "top" two eth_txes are now confirmed, with the // one below it still "confirmed_missing_receipt" and the bottom one remains confirmed @@ -1038,7 +1054,7 @@ func TestEthConfirmer_CheckForReceipts_confirmed_missing_receipt(t *testing.T) { // eth_txes with nonce 2 is confirmed // eth_txes with nonce 3 is confirmed - t.Run("continues to leave eth_txes with state 'confirmed_missing_receipt' unchanged if at least one attempt is above EVM.FinalityDepth", func(t *testing.T) { + t.Run("continues to leave eth_txes with state 'confirmed_missing_receipt' unchanged if at least one attempt is above LatestFinalizedBlockNum", func(t *testing.T) { ethClient.On("SequenceAt", mock.Anything, mock.Anything, mock.Anything).Return(evmtypes.Nonce(10), nil) ethClient.On("BatchCallContext", mock.Anything, mock.MatchedBy(func(b []rpc.BatchElem) bool { return len(b) == 2 && @@ -1051,9 +1067,11 @@ func TestEthConfirmer_CheckForReceipts_confirmed_missing_receipt(t *testing.T) { elems[1].Result = &evmtypes.Receipt{} }).Once() + latestFinalizedBlockNum = 30 + // PERFORM // Block num of 80 puts the first attempt (21) below threshold but second attempt (41) still above - require.NoError(t, ec.CheckForReceipts(ctx, 80)) + require.NoError(t, ec.CheckForReceipts(ctx, 80, latestFinalizedBlockNum)) // Expected state is that the "top" two eth_txes are now confirmed, with the // one below it still "confirmed_missing_receipt" and the bottom one remains confirmed @@ -1078,7 +1096,7 @@ func TestEthConfirmer_CheckForReceipts_confirmed_missing_receipt(t *testing.T) { // eth_txes with nonce 2 is confirmed // eth_txes with nonce 3 is confirmed - t.Run("marks eth_Txes with state 'confirmed_missing_receipt' as 'errored' if a receipt fails to show up and all attempts are buried deeper than EVM.FinalityDepth", func(t *testing.T) { + t.Run("marks eth_Txes with state 'confirmed_missing_receipt' as 'errored' if a receipt fails to show up and all attempts are buried deeper than LatestFinalizedBlockNum", func(t *testing.T) { ethClient.On("SequenceAt", mock.Anything, mock.Anything, mock.Anything).Return(evmtypes.Nonce(10), nil) ethClient.On("BatchCallContext", mock.Anything, mock.MatchedBy(func(b []rpc.BatchElem) bool { return len(b) == 2 && @@ -1091,9 +1109,11 @@ func TestEthConfirmer_CheckForReceipts_confirmed_missing_receipt(t *testing.T) { elems[1].Result = &evmtypes.Receipt{} }).Once() + latestFinalizedBlockNum = 50 + // PERFORM // Block num of 100 puts the first attempt (21) and second attempt (41) below threshold - require.NoError(t, ec.CheckForReceipts(ctx, 100)) + require.NoError(t, ec.CheckForReceipts(ctx, 100, latestFinalizedBlockNum)) // Expected state is that the "top" two eth_txes are now confirmed, with the // one below it marked as "fatal_error" and the bottom one remains confirmed @@ -1117,9 +1137,7 @@ func TestEthConfirmer_CheckConfirmedMissingReceipt(t *testing.T) { t.Parallel() db := pgtest.NewSqlxDB(t) - cfg := configtest.NewGeneralConfig(t, func(c *chainlink.Config, s *chainlink.Secrets) { - c.EVM[0].FinalityDepth = ptr[uint32](50) - }) + cfg := configtest.NewGeneralConfig(t, func(c *chainlink.Config, s *chainlink.Secrets) {}) txStore := cltest.NewTestTxStore(t, db) ethKeyStore := cltest.NewKeyStore(t, db).Eth() @@ -1197,9 +1215,7 @@ func TestEthConfirmer_CheckConfirmedMissingReceipt_batchSendTransactions_fails(t t.Parallel() db := pgtest.NewSqlxDB(t) - cfg := configtest.NewGeneralConfig(t, func(c *chainlink.Config, s *chainlink.Secrets) { - c.EVM[0].FinalityDepth = ptr[uint32](50) - }) + cfg := configtest.NewGeneralConfig(t, func(c *chainlink.Config, s *chainlink.Secrets) {}) txStore := cltest.NewTestTxStore(t, db) ethKeyStore := cltest.NewKeyStore(t, db).Eth() @@ -1262,7 +1278,6 @@ func TestEthConfirmer_CheckConfirmedMissingReceipt_smallEvmRPCBatchSize_middleBa db := pgtest.NewSqlxDB(t) cfg := configtest.NewGeneralConfig(t, func(c *chainlink.Config, s *chainlink.Secrets) { - c.EVM[0].FinalityDepth = ptr[uint32](50) c.EVM[0].RPCDefaultBatchSize = ptr[uint32](1) }) txStore := cltest.NewTestTxStore(t, db) @@ -1651,8 +1666,9 @@ func TestEthConfirmer_RebroadcastWhereNecessary_WithConnectivityCheck(t *testing addresses := []gethCommon.Address{fromAddress} kst.On("EnabledAddressesForChain", mock.Anything, &cltest.FixtureChainID).Return(addresses, nil).Maybe() stuckTxDetector := txmgr.NewStuckTxDetector(lggr, testutils.FixtureChainID, "", assets.NewWei(assets.NewEth(100).ToInt()), ccfg.EVM().Transactions().AutoPurge(), feeEstimator, txStore, ethClient) + ht := headtracker.NewSimulatedHeadTracker(ethClient, true, 0) // Create confirmer with necessary state - ec := txmgr.NewEvmConfirmer(txStore, txmgr.NewEvmTxmClient(ethClient, nil), ccfg.EVM(), txmgr.NewEvmTxmFeeConfig(ccfg.EVM().GasEstimator()), ccfg.EVM().Transactions(), cfg.Database(), kst, txBuilder, lggr, stuckTxDetector) + ec := txmgr.NewEvmConfirmer(txStore, txmgr.NewEvmTxmClient(ethClient, nil), ccfg.EVM(), txmgr.NewEvmTxmFeeConfig(ccfg.EVM().GasEstimator()), ccfg.EVM().Transactions(), cfg.Database(), kst, txBuilder, lggr, stuckTxDetector, ht) servicetest.Run(t, ec) currentHead := int64(30) oldEnough := int64(15) @@ -1700,7 +1716,8 @@ func TestEthConfirmer_RebroadcastWhereNecessary_WithConnectivityCheck(t *testing addresses := []gethCommon.Address{fromAddress} kst.On("EnabledAddressesForChain", mock.Anything, &cltest.FixtureChainID).Return(addresses, nil).Maybe() stuckTxDetector := txmgr.NewStuckTxDetector(lggr, testutils.FixtureChainID, "", assets.NewWei(assets.NewEth(100).ToInt()), ccfg.EVM().Transactions().AutoPurge(), feeEstimator, txStore, ethClient) - ec := txmgr.NewEvmConfirmer(txStore, txmgr.NewEvmTxmClient(ethClient, nil), ccfg.EVM(), txmgr.NewEvmTxmFeeConfig(ccfg.EVM().GasEstimator()), ccfg.EVM().Transactions(), cfg.Database(), kst, txBuilder, lggr, stuckTxDetector) + ht := headtracker.NewSimulatedHeadTracker(ethClient, true, 0) + ec := txmgr.NewEvmConfirmer(txStore, txmgr.NewEvmTxmClient(ethClient, nil), ccfg.EVM(), txmgr.NewEvmTxmFeeConfig(ccfg.EVM().GasEstimator()), ccfg.EVM().Transactions(), cfg.Database(), kst, txBuilder, lggr, stuckTxDetector, ht) servicetest.Run(t, ec) currentHead := int64(30) oldEnough := int64(15) @@ -2672,6 +2689,13 @@ func TestEthConfirmer_EnsureConfirmedTransactionsInLongestChain(t *testing.T) { gconfig, config := newTestChainScopedConfig(t) ec := newEthConfirmer(t, txStore, ethClient, gconfig, config, ethKeyStore, nil) + latestFinalizedHead := evmtypes.Head{ + Number: 8, + Hash: testutils.NewHash(), + Parent: nil, + IsFinalized: false, // We are guaranteed to receive a latestFinalizedHead. + } + head := evmtypes.Head{ Hash: testutils.NewHash(), Number: 10, @@ -2685,16 +2709,15 @@ func TestEthConfirmer_EnsureConfirmedTransactionsInLongestChain(t *testing.T) { }, }, } - t.Run("does nothing if there aren't any transactions", func(t *testing.T) { - require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(tests.Context(t), &head)) + require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(tests.Context(t), &head, latestFinalizedHead.BlockNumber())) }) t.Run("does nothing to unconfirmed transactions", func(t *testing.T) { etx := cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, 0, fromAddress) // Do the thing - require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(tests.Context(t), &head)) + require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(tests.Context(t), &head, latestFinalizedHead.BlockNumber())) etx, err := txStore.FindTxWithAttempts(ctx, etx.ID) require.NoError(t, err) @@ -2706,7 +2729,7 @@ func TestEthConfirmer_EnsureConfirmedTransactionsInLongestChain(t *testing.T) { mustInsertEthReceipt(t, txStore, head.Number, head.Hash, etx.TxAttempts[0].Hash) // Do the thing - require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(tests.Context(t), &head)) + require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(tests.Context(t), &head, latestFinalizedHead.BlockNumber())) etx, err := txStore.FindTxWithAttempts(ctx, etx.ID) require.NoError(t, err) @@ -2719,7 +2742,7 @@ func TestEthConfirmer_EnsureConfirmedTransactionsInLongestChain(t *testing.T) { mustInsertEthReceipt(t, txStore, head.Parent.Parent.Number-1, testutils.NewHash(), etx.TxAttempts[0].Hash) // Do the thing - require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(tests.Context(t), &head)) + require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(tests.Context(t), &head, latestFinalizedHead.BlockNumber())) etx, err := txStore.FindTxWithAttempts(ctx, etx.ID) require.NoError(t, err) @@ -2740,7 +2763,7 @@ func TestEthConfirmer_EnsureConfirmedTransactionsInLongestChain(t *testing.T) { }), fromAddress).Return(commonclient.Successful, nil).Once() // Do the thing - require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(tests.Context(t), &head)) + require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(tests.Context(t), &head, latestFinalizedHead.BlockNumber())) etx, err := txStore.FindTxWithAttempts(ctx, etx.ID) require.NoError(t, err) @@ -2763,7 +2786,7 @@ func TestEthConfirmer_EnsureConfirmedTransactionsInLongestChain(t *testing.T) { commonclient.Successful, nil).Once() // Do the thing - require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(tests.Context(t), &head)) + require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(tests.Context(t), &head, latestFinalizedHead.BlockNumber())) etx, err := txStore.FindTxWithAttempts(ctx, etx.ID) require.NoError(t, err) @@ -2798,7 +2821,7 @@ func TestEthConfirmer_EnsureConfirmedTransactionsInLongestChain(t *testing.T) { }), fromAddress).Return(commonclient.Successful, nil).Once() // Do the thing - require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(tests.Context(t), &head)) + require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(tests.Context(t), &head, latestFinalizedHead.BlockNumber())) etx, err := txStore.FindTxWithAttempts(ctx, etx.ID) require.NoError(t, err) @@ -2818,7 +2841,7 @@ func TestEthConfirmer_EnsureConfirmedTransactionsInLongestChain(t *testing.T) { // Add receipt that is higher than head mustInsertEthReceipt(t, txStore, head.Number+1, testutils.NewHash(), attempt.Hash) - require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(tests.Context(t), &head)) + require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(tests.Context(t), &head, latestFinalizedHead.BlockNumber())) etx, err := txStore.FindTxWithAttempts(ctx, etx.ID) require.NoError(t, err) @@ -3158,7 +3181,8 @@ func TestEthConfirmer_ProcessStuckTransactions(t *testing.T) { ge := evmcfg.EVM().GasEstimator() txBuilder := txmgr.NewEvmTxAttemptBuilder(*ethClient.ConfiguredChainID(), ge, ethKeyStore, feeEstimator) stuckTxDetector := txmgr.NewStuckTxDetector(lggr, testutils.FixtureChainID, "", assets.NewWei(assets.NewEth(100).ToInt()), evmcfg.EVM().Transactions().AutoPurge(), feeEstimator, txStore, ethClient) - ec := txmgr.NewEvmConfirmer(txStore, txmgr.NewEvmTxmClient(ethClient, nil), txmgr.NewEvmTxmConfig(evmcfg.EVM()), txmgr.NewEvmTxmFeeConfig(ge), evmcfg.EVM().Transactions(), cfg.Database(), ethKeyStore, txBuilder, lggr, stuckTxDetector) + ht := headtracker.NewSimulatedHeadTracker(ethClient, true, 0) + ec := txmgr.NewEvmConfirmer(txStore, txmgr.NewEvmTxmClient(ethClient, nil), txmgr.NewEvmTxmConfig(evmcfg.EVM()), txmgr.NewEvmTxmFeeConfig(ge), evmcfg.EVM().Transactions(), cfg.Database(), ethKeyStore, txBuilder, lggr, stuckTxDetector, ht) servicetest.Run(t, ec) ctx := tests.Context(t) @@ -3172,9 +3196,13 @@ func TestEthConfirmer_ProcessStuckTransactions(t *testing.T) { tx := mustInsertUnconfirmedTxWithBroadcastAttempts(t, txStore, nonce, fromAddress, autoPurgeMinAttempts, blockNum-int64(autoPurgeThreshold), marketGasPrice.Add(oneGwei)) head := evmtypes.Head{ - Hash: testutils.NewHash(), - Number: blockNum, + Hash: testutils.NewHash(), + Number: blockNum, + IsFinalized: true, } + + ethClient.On("HeadByNumber", mock.Anything, (*big.Int)(nil)).Return(&head, nil).Once() + ethClient.On("LatestFinalizedBlock", mock.Anything).Return(&head, nil).Once() ethClient.On("SequenceAt", mock.Anything, mock.Anything, mock.Anything).Return(evmtypes.Nonce(0), nil).Once() ethClient.On("BatchCallContext", mock.Anything, mock.Anything).Return(nil).Once() @@ -3196,9 +3224,12 @@ func TestEthConfirmer_ProcessStuckTransactions(t *testing.T) { require.Equal(t, bumpedFee.Legacy, latestAttempt.TxFee.Legacy) head = evmtypes.Head{ - Hash: testutils.NewHash(), - Number: blockNum + 1, + Hash: testutils.NewHash(), + Number: blockNum + 1, + IsFinalized: true, } + ethClient.On("HeadByNumber", mock.Anything, (*big.Int)(nil)).Return(&head, nil).Once() + ethClient.On("LatestFinalizedBlock", mock.Anything).Return(&head, nil).Once() ethClient.On("SequenceAt", mock.Anything, mock.Anything, mock.Anything).Return(evmtypes.Nonce(1), nil) ethClient.On("BatchCallContext", mock.Anything, mock.MatchedBy(func(b []rpc.BatchElem) bool { return len(b) == 4 && cltest.BatchElemMatchesParams(b[0], latestAttempt.Hash, "eth_getTransactionReceipt") @@ -3237,7 +3268,8 @@ func newEthConfirmer(t testing.TB, txStore txmgr.EvmTxStore, ethClient client.Cl }, ge.EIP1559DynamicFees(), ge) txBuilder := txmgr.NewEvmTxAttemptBuilder(*ethClient.ConfiguredChainID(), ge, ks, estimator) stuckTxDetector := txmgr.NewStuckTxDetector(lggr, testutils.FixtureChainID, "", assets.NewWei(assets.NewEth(100).ToInt()), config.EVM().Transactions().AutoPurge(), estimator, txStore, ethClient) - ec := txmgr.NewEvmConfirmer(txStore, txmgr.NewEvmTxmClient(ethClient, nil), txmgr.NewEvmTxmConfig(config.EVM()), txmgr.NewEvmTxmFeeConfig(ge), config.EVM().Transactions(), gconfig.Database(), ks, txBuilder, lggr, stuckTxDetector) + ht := headtracker.NewSimulatedHeadTracker(ethClient, true, 0) + ec := txmgr.NewEvmConfirmer(txStore, txmgr.NewEvmTxmClient(ethClient, nil), txmgr.NewEvmTxmConfig(config.EVM()), txmgr.NewEvmTxmFeeConfig(ge), config.EVM().Transactions(), gconfig.Database(), ks, txBuilder, lggr, stuckTxDetector, ht) ec.SetResumeCallback(fn) servicetest.Run(t, ec) return ec diff --git a/core/chains/evm/txmgr/evm_tx_store.go b/core/chains/evm/txmgr/evm_tx_store.go index 45de437e443..4bdf191376b 100644 --- a/core/chains/evm/txmgr/evm_tx_store.go +++ b/core/chains/evm/txmgr/evm_tx_store.go @@ -34,8 +34,8 @@ import ( var ( ErrKeyNotUpdated = errors.New("evmTxStore: Key not updated") - // ErrCouldNotGetReceipt is the error string we save if we reach our finality depth for a confirmed transaction without ever getting a receipt - // This most likely happened because an external wallet used the account for this nonce + // ErrCouldNotGetReceipt is the error string we save if we reach our LatestFinalizedBlockNum for a confirmed transaction + // without ever getting a receipt. This most likely happened because an external wallet used the account for this nonce ErrCouldNotGetReceipt = "could not get receipt" ) @@ -959,11 +959,11 @@ func (o *evmTxStore) SaveFetchedReceipts(ctx context.Context, r []*evmtypes.Rece // NOTE: We continue to attempt to resend evm.txes in this state on // every head to guard against the extremely rare scenario of nonce gap due to // reorg that excludes the transaction (from another wallet) that had this -// nonce (until finality depth is reached, after which we make the explicit +// nonce (until LatestFinalizedBlockNum is reached, after which we make the explicit // decision to give up). This is done in the EthResender. // // We will continue to try to fetch a receipt for these attempts until all -// attempts are below the finality depth from current head. +// attempts are equal to or below the LatestFinalizedBlockNum from current head. func (o *evmTxStore) MarkAllConfirmedMissingReceipt(ctx context.Context, chainID *big.Int) (err error) { var cancel context.CancelFunc ctx, cancel = o.stopCh.Ctx(ctx) @@ -1430,23 +1430,18 @@ ORDER BY nonce ASC // markOldTxesMissingReceiptAsErrored // -// Once eth_tx has all of its attempts broadcast before some cutoff threshold +// Once eth_tx has all of its attempts broadcast equal to or before latestFinalizedBlockNum // without receiving any receipts, we mark it as fatally errored (never sent). // // The job run will also be marked as errored in this case since we never got a // receipt and thus cannot pass on any transaction hash -func (o *evmTxStore) MarkOldTxesMissingReceiptAsErrored(ctx context.Context, blockNum int64, finalityDepth uint32, chainID *big.Int) error { +func (o *evmTxStore) MarkOldTxesMissingReceiptAsErrored(ctx context.Context, blockNum int64, latestFinalizedBlockNum int64, chainID *big.Int) error { var cancel context.CancelFunc ctx, cancel = o.stopCh.Ctx(ctx) defer cancel() - // cutoffBlockNum is a block height - // Any 'confirmed_missing_receipt' eth_tx with all attempts older than this block height will be marked as errored - // We will not try to query for receipts for this transaction any more - cutoff := blockNum - int64(finalityDepth) - if cutoff <= 0 { - return nil - } - if cutoff <= 0 { + // Any 'confirmed_missing_receipt' eth_tx with all attempts equal to or older than latestFinalizedBlockNum will be marked as errored + // We will not try to query for receipts for this transaction anymore + if latestFinalizedBlockNum <= 0 { return nil } // note: if QOpt passes in a sql.Tx this will reuse it @@ -1466,12 +1461,12 @@ FROM ( WHERE e2.state = 'confirmed_missing_receipt' AND e2.evm_chain_id = $3 GROUP BY e2.id - HAVING max(evm.tx_attempts.broadcast_before_block_num) < $2 + HAVING max(evm.tx_attempts.broadcast_before_block_num) <= $2 ) FOR UPDATE OF e1 ) e0 WHERE e0.id = evm.txes.id -RETURNING e0.id, e0.nonce`, ErrCouldNotGetReceipt, cutoff, chainID.String()) +RETURNING e0.id, e0.nonce`, ErrCouldNotGetReceipt, latestFinalizedBlockNum, chainID.String()) if err != nil { return pkgerrors.Wrap(err, "markOldTxesMissingReceiptAsErrored failed to query") diff --git a/core/chains/evm/txmgr/evm_tx_store_test.go b/core/chains/evm/txmgr/evm_tx_store_test.go index 191a0a5fed2..992bd1f434c 100644 --- a/core/chains/evm/txmgr/evm_tx_store_test.go +++ b/core/chains/evm/txmgr/evm_tx_store_test.go @@ -1117,13 +1117,14 @@ func TestORM_MarkOldTxesMissingReceiptAsErrored(t *testing.T) { ethKeyStore := cltest.NewKeyStore(t, db).Eth() ethClient := evmtest.NewEthClientMockWithDefaultChain(t) _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore) + latestFinalizedBlockNum := int64(8) // tx state should be confirmed missing receipt - // attempt should be broadcast before cutoff time + // attempt should be before latestFinalizedBlockNum t.Run("successfully mark errored transactions", func(t *testing.T) { etx := mustInsertConfirmedMissingReceiptEthTxWithLegacyAttempt(t, txStore, 1, 7, time.Now(), fromAddress) - err := txStore.MarkOldTxesMissingReceiptAsErrored(tests.Context(t), 10, 2, ethClient.ConfiguredChainID()) + err := txStore.MarkOldTxesMissingReceiptAsErrored(tests.Context(t), 10, latestFinalizedBlockNum, ethClient.ConfiguredChainID()) require.NoError(t, err) etx, err = txStore.FindTxWithAttempts(ctx, etx.ID) @@ -1133,7 +1134,7 @@ func TestORM_MarkOldTxesMissingReceiptAsErrored(t *testing.T) { t.Run("successfully mark errored transactions w/ qopt passing in sql.Tx", func(t *testing.T) { etx := mustInsertConfirmedMissingReceiptEthTxWithLegacyAttempt(t, txStore, 1, 7, time.Now(), fromAddress) - err := txStore.MarkOldTxesMissingReceiptAsErrored(tests.Context(t), 10, 2, ethClient.ConfiguredChainID()) + err := txStore.MarkOldTxesMissingReceiptAsErrored(tests.Context(t), 10, latestFinalizedBlockNum, ethClient.ConfiguredChainID()) require.NoError(t, err) // must run other query outside of postgres transaction so changes are committed diff --git a/core/chains/evm/txmgr/mocks/evm_tx_store.go b/core/chains/evm/txmgr/mocks/evm_tx_store.go index b40c0ca8376..a9a175e3d94 100644 --- a/core/chains/evm/txmgr/mocks/evm_tx_store.go +++ b/core/chains/evm/txmgr/mocks/evm_tx_store.go @@ -2214,17 +2214,17 @@ func (_c *EvmTxStore_MarkAllConfirmedMissingReceipt_Call) RunAndReturn(run func( return _c } -// MarkOldTxesMissingReceiptAsErrored provides a mock function with given fields: ctx, blockNum, finalityDepth, chainID -func (_m *EvmTxStore) MarkOldTxesMissingReceiptAsErrored(ctx context.Context, blockNum int64, finalityDepth uint32, chainID *big.Int) error { - ret := _m.Called(ctx, blockNum, finalityDepth, chainID) +// MarkOldTxesMissingReceiptAsErrored provides a mock function with given fields: ctx, blockNum, latestFinalizedBlockNum, chainID +func (_m *EvmTxStore) MarkOldTxesMissingReceiptAsErrored(ctx context.Context, blockNum int64, latestFinalizedBlockNum int64, chainID *big.Int) error { + ret := _m.Called(ctx, blockNum, latestFinalizedBlockNum, chainID) if len(ret) == 0 { panic("no return value specified for MarkOldTxesMissingReceiptAsErrored") } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, int64, uint32, *big.Int) error); ok { - r0 = rf(ctx, blockNum, finalityDepth, chainID) + if rf, ok := ret.Get(0).(func(context.Context, int64, int64, *big.Int) error); ok { + r0 = rf(ctx, blockNum, latestFinalizedBlockNum, chainID) } else { r0 = ret.Error(0) } @@ -2240,15 +2240,15 @@ type EvmTxStore_MarkOldTxesMissingReceiptAsErrored_Call struct { // MarkOldTxesMissingReceiptAsErrored is a helper method to define mock.On call // - ctx context.Context // - blockNum int64 -// - finalityDepth uint32 +// - latestFinalizedBlockNum int64 // - chainID *big.Int -func (_e *EvmTxStore_Expecter) MarkOldTxesMissingReceiptAsErrored(ctx interface{}, blockNum interface{}, finalityDepth interface{}, chainID interface{}) *EvmTxStore_MarkOldTxesMissingReceiptAsErrored_Call { - return &EvmTxStore_MarkOldTxesMissingReceiptAsErrored_Call{Call: _e.mock.On("MarkOldTxesMissingReceiptAsErrored", ctx, blockNum, finalityDepth, chainID)} +func (_e *EvmTxStore_Expecter) MarkOldTxesMissingReceiptAsErrored(ctx interface{}, blockNum interface{}, latestFinalizedBlockNum interface{}, chainID interface{}) *EvmTxStore_MarkOldTxesMissingReceiptAsErrored_Call { + return &EvmTxStore_MarkOldTxesMissingReceiptAsErrored_Call{Call: _e.mock.On("MarkOldTxesMissingReceiptAsErrored", ctx, blockNum, latestFinalizedBlockNum, chainID)} } -func (_c *EvmTxStore_MarkOldTxesMissingReceiptAsErrored_Call) Run(run func(ctx context.Context, blockNum int64, finalityDepth uint32, chainID *big.Int)) *EvmTxStore_MarkOldTxesMissingReceiptAsErrored_Call { +func (_c *EvmTxStore_MarkOldTxesMissingReceiptAsErrored_Call) Run(run func(ctx context.Context, blockNum int64, latestFinalizedBlockNum int64, chainID *big.Int)) *EvmTxStore_MarkOldTxesMissingReceiptAsErrored_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(int64), args[2].(uint32), args[3].(*big.Int)) + run(args[0].(context.Context), args[1].(int64), args[2].(int64), args[3].(*big.Int)) }) return _c } @@ -2258,7 +2258,7 @@ func (_c *EvmTxStore_MarkOldTxesMissingReceiptAsErrored_Call) Return(_a0 error) return _c } -func (_c *EvmTxStore_MarkOldTxesMissingReceiptAsErrored_Call) RunAndReturn(run func(context.Context, int64, uint32, *big.Int) error) *EvmTxStore_MarkOldTxesMissingReceiptAsErrored_Call { +func (_c *EvmTxStore_MarkOldTxesMissingReceiptAsErrored_Call) RunAndReturn(run func(context.Context, int64, int64, *big.Int) error) *EvmTxStore_MarkOldTxesMissingReceiptAsErrored_Call { _c.Call.Return(run) return _c } diff --git a/core/chains/evm/txmgr/txmgr_test.go b/core/chains/evm/txmgr/txmgr_test.go index 5f932db8720..3d52e6eb4f0 100644 --- a/core/chains/evm/txmgr/txmgr_test.go +++ b/core/chains/evm/txmgr/txmgr_test.go @@ -627,6 +627,7 @@ func TestTxm_GetTransactionStatus(t *testing.T) { ethClient.On("PendingNonceAt", mock.Anything, mock.Anything).Return(uint64(0), nil).Maybe() ethClient.On("HeadByNumber", mock.Anything, mock.Anything).Return(head, nil).Once() ethClient.On("HeadByNumber", mock.Anything, mock.Anything).Return(head.Parent, nil).Once() + ethClient.On("HeadByNumber", mock.Anything, mock.Anything).Return(head, nil) feeEstimator := gasmocks.NewEvmFeeEstimator(t) feeEstimator.On("Start", mock.Anything).Return(nil).Once() feeEstimator.On("Close", mock.Anything).Return(nil).Once() diff --git a/core/cmd/shell_local.go b/core/cmd/shell_local.go index e19cc485d8b..ed8b653c05b 100644 --- a/core/cmd/shell_local.go +++ b/core/cmd/shell_local.go @@ -669,7 +669,7 @@ func (s *Shell) RebroadcastTransactions(c *cli.Context) (err error) { feeCfg := txmgr.NewEvmTxmFeeConfig(chain.Config().EVM().GasEstimator()) stuckTxDetector := txmgr.NewStuckTxDetector(lggr, ethClient.ConfiguredChainID(), "", assets.NewWei(assets.NewEth(100).ToInt()), chain.Config().EVM().Transactions().AutoPurge(), nil, orm, ethClient) ec := txmgr.NewEvmConfirmer(orm, txmgr.NewEvmTxmClient(ethClient, chain.Config().EVM().NodePool().Errors()), - cfg, feeCfg, chain.Config().EVM().Transactions(), app.GetConfig().Database(), keyStore.Eth(), txBuilder, chain.Logger(), stuckTxDetector) + cfg, feeCfg, chain.Config().EVM().Transactions(), app.GetConfig().Database(), keyStore.Eth(), txBuilder, chain.Logger(), stuckTxDetector, chain.HeadTracker()) totalNonces := endingNonce - beginningNonce + 1 nonces := make([]evmtypes.Nonce, totalNonces) for i := int64(0); i < totalNonces; i++ { diff --git a/core/cmd/shell_local_test.go b/core/cmd/shell_local_test.go index 60545269e29..8ed48dcaa20 100644 --- a/core/cmd/shell_local_test.go +++ b/core/cmd/shell_local_test.go @@ -11,9 +11,8 @@ import ( commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config" "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" - "github.com/smartcontractkit/chainlink/v2/core/capabilities" - "github.com/smartcontractkit/chainlink/v2/common/client" + "github.com/smartcontractkit/chainlink/v2/core/capabilities" "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm" "github.com/smartcontractkit/chainlink/v2/core/cmd" cmdMocks "github.com/smartcontractkit/chainlink/v2/core/cmd/mocks" diff --git a/core/internal/cltest/mocks.go b/core/internal/cltest/mocks.go index 9e0ee2f3f20..fd01f72c131 100644 --- a/core/internal/cltest/mocks.go +++ b/core/internal/cltest/mocks.go @@ -392,6 +392,7 @@ func NewLegacyChainsWithMockChain(t testing.TB, ethClient evmclient.Client, cfg scopedCfg := evmtest.NewChainScopedConfig(t, cfg) ch.On("ID").Return(scopedCfg.EVM().ChainID()) ch.On("Config").Return(scopedCfg) + ch.On("HeadTracker").Return(nil) return NewLegacyChainsWithChain(ch, cfg) }