diff --git a/common/client/mock_rpc_test.go b/common/client/mock_rpc_test.go index 72c6eb19029..0a554f7895c 100644 --- a/common/client/mock_rpc_test.go +++ b/common/client/mock_rpc_test.go @@ -366,6 +366,34 @@ func (_m *mockRPC[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS return r0, r1 } +// FinalizedBlock provides a mock function with given fields: ctx +func (_m *mockRPC[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD]) FinalizedBlock(ctx context.Context) (HEAD, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for FinalizedBlock") + } + + var r0 HEAD + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (HEAD, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) HEAD); ok { + r0 = rf(ctx) + } else { + r0 = ret.Get(0).(HEAD) + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // LINKBalance provides a mock function with given fields: ctx, accountAddress, linkAddress func (_m *mockRPC[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD]) LINKBalance(ctx context.Context, accountAddress ADDR, linkAddress ADDR) (*assets.Link, error) { ret := _m.Called(ctx, accountAddress, linkAddress) diff --git a/common/client/multi_node.go b/common/client/multi_node.go index ae9b3afd0d4..e04efd6d120 100644 --- a/common/client/multi_node.go +++ b/common/client/multi_node.go @@ -815,3 +815,11 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP } return n.RPC().TransactionReceipt(ctx, txHash) } + +func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) FinalizedBlock(ctx context.Context) (head HEAD, wee error) { + n, err := c.selectNode() + if err != nil { + return head, err + } + return n.RPC().FinalizedBlock(ctx) +} diff --git a/common/client/types.go b/common/client/types.go index fe9e4d7d482..b125f336b72 100644 --- a/common/client/types.go +++ b/common/client/types.go @@ -5,6 +5,7 @@ import ( "math/big" "github.com/smartcontractkit/chainlink-common/pkg/assets" + feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types" "github.com/smartcontractkit/chainlink/v2/common/types" ) @@ -113,6 +114,7 @@ type clientAPI[ BlockByNumber(ctx context.Context, number *big.Int) (HEAD, error) BlockByHash(ctx context.Context, hash BLOCK_HASH) (HEAD, error) LatestBlockHeight(context.Context) (*big.Int, error) + FinalizedBlock(ctx context.Context) (HEAD, error) // Events FilterEvents(ctx context.Context, query EVENT_OPS) ([]EVENT, error) diff --git a/common/txmgr/confirmer.go b/common/txmgr/confirmer.go index c28216467a1..b1a7f97d0f3 100644 --- a/common/txmgr/confirmer.go +++ b/common/txmgr/confirmer.go @@ -510,6 +510,69 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) get return ec.client.SequenceAt(ctx, from, nil) } +func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) isReceiptValid( + ctx context.Context, + l logger.SugaredLogger, + attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + receipt R) bool { + + if ec.isReceiptNil(receipt) { + // NOTE: This should never happen, but it seems safer to check + // regardless to avoid a potential panic + l.AssumptionViolation("got nil receipt") + return false + } + + if receipt.IsZero() { + l.Debug("Still waiting for receipt") + return false + } + + l = l.With("blockHash", receipt.GetBlockHash().String(), "status", receipt.GetStatus(), "transactionIndex", receipt.GetTransactionIndex()) + + if receipt.IsUnmined() { + l.Debug("Got receipt for transaction but it's still in the mempool and not included in a block yet") + return false + } + + l.Debugw("Got receipt for transaction", "blockNumber", receipt.GetBlockNumber(), "feeUsed", receipt.GetFeeUsed()) + + if receipt.GetTxHash().String() != attempt.Hash.String() { + l.Errorf("Invariant violation, expected receipt with hash %s to have same hash as attempt with hash %s", receipt.GetTxHash().String(), attempt.Hash.String()) + return false + } + + if receipt.GetBlockNumber() == nil { + l.Error("Invariant violation, receipt was missing block number") + return false + } + + if receipt.GetStatus() == 0 { + rpcError, errExtract := ec.client.CallContract(ctx, attempt, receipt.GetBlockNumber()) + if errExtract == nil { + l.Warnw("transaction reverted on-chain", "hash", receipt.GetTxHash(), "rpcError", rpcError.String()) + } else { + l.Warnw("transaction reverted on-chain unable to extract revert reason", "hash", receipt.GetTxHash(), "err", errExtract) + } + // This might increment more than once e.g. in case of re-orgs going back and forth we might re-fetch the same receipt + promRevertedTxCount.WithLabelValues(ec.chainID.String()).Add(1) + } else { + promNumSuccessfulTxs.WithLabelValues(ec.chainID.String()).Add(1) + } + + // This is only recording forwarded tx that were mined and have a status. + // Counters are prone to being inaccurate due to re-orgs. + if ec.txConfig.ForwardersEnabled() { + meta, metaErr := attempt.Tx.GetMeta() + if metaErr == nil && meta != nil && meta.FwdrDestAddress != nil { + // promFwdTxCount takes two labels, chainId and a boolean of whether a tx was successful or not. + promFwdTxCount.WithLabelValues(ec.chainID.String(), strconv.FormatBool(receipt.GetStatus() != 0)).Add(1) + } + } + + return true +} + // Note this function will increment promRevertedTxCount upon receiving // a reverted transaction receipt. Should only be called with unconfirmed attempts. func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) batchFetchReceipts(ctx context.Context, attempts []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], blockNum int64) (receipts []R, err error) { @@ -533,69 +596,17 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) bat receipt := txReceipts[i] err := txErrs[i] - l := attempt.Tx.GetLogger(lggr).With("txHash", attempt.Hash.String(), "txAttemptID", attempt.ID, - "txID", attempt.TxID, "err", err, "sequence", attempt.Tx.Sequence, - ) + l := attempt.GetLogger(lggr) if err != nil { - l.Error("FetchReceipt failed") - continue - } - - if ec.isReceiptNil(receipt) { - // NOTE: This should never happen, but it seems safer to check - // regardless to avoid a potential panic - l.AssumptionViolation("got nil receipt") - continue - } - - if receipt.IsZero() { - l.Debug("Still waiting for receipt") + l.With("err", err).Error("FetchReceipt failed") continue } - l = l.With("blockHash", receipt.GetBlockHash().String(), "status", receipt.GetStatus(), "transactionIndex", receipt.GetTransactionIndex()) - - if receipt.IsUnmined() { - l.Debug("Got receipt for transaction but it's still in the mempool and not included in a block yet") - continue - } - - l.Debugw("Got receipt for transaction", "blockNumber", receipt.GetBlockNumber(), "feeUsed", receipt.GetFeeUsed()) - - if receipt.GetTxHash().String() != attempt.Hash.String() { - l.Errorf("Invariant violation, expected receipt with hash %s to have same hash as attempt with hash %s", receipt.GetTxHash().String(), attempt.Hash.String()) + if !ec.isReceiptValid(ctx, l, attempt, receipt) { continue } - if receipt.GetBlockNumber() == nil { - l.Error("Invariant violation, receipt was missing block number") - continue - } - - if receipt.GetStatus() == 0 { - rpcError, errExtract := ec.client.CallContract(ctx, attempt, receipt.GetBlockNumber()) - if errExtract == nil { - l.Warnw("transaction reverted on-chain", "hash", receipt.GetTxHash(), "rpcError", rpcError.String()) - } else { - l.Warnw("transaction reverted on-chain unable to extract revert reason", "hash", receipt.GetTxHash(), "err", err) - } - // This might increment more than once e.g. in case of re-orgs going back and forth we might re-fetch the same receipt - promRevertedTxCount.WithLabelValues(ec.chainID.String()).Add(1) - } else { - promNumSuccessfulTxs.WithLabelValues(ec.chainID.String()).Add(1) - } - - // This is only recording forwarded tx that were mined and have a status. - // Counters are prone to being inaccurate due to re-orgs. - if ec.txConfig.ForwardersEnabled() { - meta, metaErr := attempt.Tx.GetMeta() - if metaErr == nil && meta != nil && meta.FwdrDestAddress != nil { - // promFwdTxCount takes two labels, chainId and a boolean of whether a tx was successful or not. - promFwdTxCount.WithLabelValues(ec.chainID.String(), strconv.FormatBool(receipt.GetStatus() != 0)).Add(1) - } - } - receipts = append(receipts, receipt) } @@ -910,12 +921,38 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) han } } -// EnsureConfirmedTransactionsInLongestChain finds all confirmed txes up to the depth -// of the given chain and ensures that every one has a receipt with a block hash that is -// in the given chain. -// -// If any of the confirmed transactions does not have a receipt in the chain, it has been -// re-org'd out and will be rebroadcast. +func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findFinalizedHeadInChain(ctx context.Context, head types.Head[BLOCK_HASH]) (types.Head[BLOCK_HASH], error) { + if ec.chainConfig.FinalityTagEnabled() { + finalizedHash, finalizedBlockNumber, err := ec.client.FinalizedBlockHash(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get finalized block hash: %w", err) + } + + for head != nil { + if head.BlockNumber() == finalizedBlockNumber.Int64() && finalizedHash == head.BlockHash() { + return head, nil + } + head = head.GetParent() + } + + return nil, nil + } + + finalizedBlock := head.BlockNumber() - int64(ec.chainConfig.FinalityDepth()) + for head != nil { + if head.BlockNumber() == finalizedBlock { + return head, nil + } + head = head.GetParent() + } + + return nil, nil +} + +// EnsureConfirmedTransactionsInLongestChain finds all confirmed txes tries to find them in the provided chain or fetches +// it from an RPC. +// If any of the confirmed transactions does not have a receipt in the chain, it has been re-org'd out and will be rebroadcast. +// If finalized receipt was found marks transaction and the receipt as finalized. func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) EnsureConfirmedTransactionsInLongestChain(ctx context.Context, head types.Head[BLOCK_HASH]) error { if head.ChainLength() < ec.chainConfig.FinalityDepth() { logArgs := []interface{}{ @@ -932,63 +969,240 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Ens } else { ec.nConsecutiveBlocksChainTooShort = 0 } - etxs, err := ec.txStore.FindTransactionsConfirmedInBlockRange(ctx, head.BlockNumber(), head.EarliestHeadInChain().BlockNumber(), ec.chainID) + + // fetch all confirmed transactions. + confirmedTxs, err := ec.txStore.FindConfirmedTransactions(ctx, ec.chainID) if err != nil { - return fmt.Errorf("findTransactionsConfirmedInBlockRange failed: %w", err) + return fmt.Errorf("FindConfirmedTransactions failed: %w", err) } - for _, etx := range etxs { - if !hasReceiptInLongestChain(*etx, head) { - if err := ec.markForRebroadcast(*etx, head); err != nil { - return fmt.Errorf("markForRebroadcast failed for etx %v: %w", etx.ID, err) - } + // find transactions that are not present in the local chain + missingTxs, err := ec.markFinalizedWithLocalChain(ctx, head, confirmedTxs, ec.chainConfig.RPCDefaultBatchSize()) + if err != nil { + return fmt.Errorf("failed to find tx missing from the local chain: %w", err) + } + + // find transactions that were re-orged out of the chain and we no longer can find their receipts + missingTxs, err = ec.markFinalizedWithRPCReceipts(ctx, missingTxs) + if err != nil { + return err + } + + // mark all missing tx for rebroadcast + for _, tx := range missingTxs { + if err := ec.markForRebroadcast(*tx, head); err != nil { + return fmt.Errorf("markForRebroadcast failed for tx %v: %w", tx.ID, err) } } // It is safe to process separate keys concurrently // NOTE: This design will block one key if another takes a really long time to execute var wg sync.WaitGroup - errors := []error{} + var errs []error var errMu sync.Mutex wg.Add(len(ec.enabledAddresses)) for _, address := range ec.enabledAddresses { go func(fromAddress ADDR) { + defer wg.Done() if err := ec.handleAnyInProgressAttempts(ctx, fromAddress, head.BlockNumber()); err != nil { errMu.Lock() - errors = append(errors, err) + errs = append(errs, err) errMu.Unlock() ec.lggr.Errorw("Error in handleAnyInProgressAttempts", "err", err, "fromAddress", fromAddress) } - - wg.Done() }(address) } wg.Wait() - return multierr.Combine(errors...) + return multierr.Combine(errs...) } -func hasReceiptInLongestChain[ +func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) markFinalizedWithLocalChain(ctx context.Context, + head types.Head[BLOCK_HASH], confirmedTxs []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], batchSize uint32) ( + []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + if len(confirmedTxs) == 0 { + return nil, nil + } + + finalizedHead, err := ec.findFinalizedHeadInChain(ctx, head) + if err != nil { + return nil, fmt.Errorf("failed to find finalized block in chain: %w", err) + } + + if finalizedHead != nil { + ec.lggr.With("block_num", finalizedHead.BlockNumber(), "hash", finalizedHead.BlockHash()). + Debug("found finalized block in headTracker's chain - using it to find finalized txs") + } + + txsMissingInChain := make([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], 0, len(confirmedTxs)) + finalizedAttempts := make([]int64, 0, batchSize) + for _, tx := range confirmedTxs { + // transaction is finalized + if finalizedAttempt := findAttemptWithReceiptInChain(*tx, finalizedHead); finalizedAttempt != nil { + finalizedAttempts = append(finalizedAttempts, finalizedAttempt.ID) + + if uint32(len(finalizedAttempts)) >= batchSize { + err := ec.txStore.MarkFinalized(ctx, finalizedAttempts) + if err != nil { + return nil, fmt.Errorf("failed to mark attempts as finalized") + } + finalizedAttempts = finalizedAttempts[:0] + } + continue + } + + // tx is present in chain + if confirmedReceipt := findAttemptWithReceiptInChain(*tx, head); confirmedReceipt != nil { + continue + } + + // for some reason one of the receipts has block number from the future - it's safer to wait + if hasReceiptHigherThanHead(*tx, head) { + continue + } + + txsMissingInChain = append(txsMissingInChain, tx) + } + + if len(finalizedAttempts) >= 0 { + err := ec.txStore.MarkFinalized(ctx, finalizedAttempts) + if err != nil { + return nil, fmt.Errorf("failed to mark attempts as finalized") + } + } + + return txsMissingInChain, nil +} + +func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) markFinalizedWithRPCReceipts( + ctx context.Context, + txs []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], +) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + + missingTxIDs := make(map[int64]struct{}, len(txs)) + + attemptsBatch := make([]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + 0, ec.chainConfig.RPCDefaultBatchSize()) + for _, tx := range txs { + missingTxIDs[tx.ID] = struct{}{} + for _, attempt := range tx.TxAttempts { + attemptsBatch = append(attemptsBatch, attempt) + // save one for block fetching + if uint32(len(attemptsBatch)+1) >= ec.chainConfig.RPCDefaultBatchSize() { + err := ec.markFinalizedWithRPCReceiptsBatch(ctx, attemptsBatch, missingTxIDs) + if err != nil { + return nil, fmt.Errorf("failed to mark txs finalized: %w", err) + } + attemptsBatch = attemptsBatch[0:0] + } + } + } + + if len(attemptsBatch) > 0 { + err := ec.markFinalizedWithRPCReceiptsBatch(ctx, attemptsBatch, missingTxIDs) + if err != nil { + return nil, fmt.Errorf("failed to mark txs finalized: %w", err) + } + } + + missingTxs := make([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], 0, len(missingTxIDs)) + for _, tx := range txs { + if _, ok := missingTxIDs[tx.ID]; ok { + missingTxs = append(missingTxs, tx) + } + } + + return missingTxs, nil +} + +func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) markFinalizedWithRPCReceiptsBatch( + ctx context.Context, + attempts []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + missingTxs map[int64]struct{}, +) error { + + rpcFinalizedHeight, receipts, txErrs, err := ec.client.BatchGetReceiptsWithFinalizedHeight(ctx, attempts, + ec.chainConfig.FinalityTagEnabled(), ec.chainConfig.FinalityDepth()) + if err != nil { + return err + } + + if len(receipts) != len(attempts) || len(txErrs) != len(attempts) { + const errMsg = "invariant violation expected number of attempts to match number of resulting receipts/errors" + ec.lggr.With("receipts", receipts, "attempts", attempts, "txErrs", txErrs).Criticalw(errMsg) + return errors.New(errMsg) + } + + finalizedReceipts := make([]R, 0, len(attempts)) + for i, receipt := range receipts { + attempt := attempts[i] + txErr := txErrs[i] + l := attempt.GetLogger(ec.lggr) + if txErr != nil { + l.With("err", err).Error("ReceiptFetch failed") + continue + } + if !ec.isReceiptValid(ctx, l, attempt, receipt) { + continue + } + + delete(missingTxs, attempt.TxID) + if receipt.GetBlockNumber().Cmp(rpcFinalizedHeight) > 0 { + l.With("rpc_finalized_height", rpcFinalizedHeight).Debug("attempt is not finalized") + continue + } + + finalizedReceipts = append(finalizedReceipts, receipts[i]) + } + + err = ec.txStore.SaveFinalizedReceipts(ctx, finalizedReceipts, ec.chainID) + if err != nil { + return fmt.Errorf("failed to save finalized receipts: %w", err) + } + + return nil + +} + +func hasReceiptHigherThanHead[ CHAIN_ID types.ID, ADDR types.Hashable, TX_HASH, BLOCK_HASH types.Hashable, SEQ types.Sequence, FEE feetypes.Fee, ](etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], head types.Head[BLOCK_HASH]) bool { - for { + for _, attempt := range etx.TxAttempts { + for _, receipt := range attempt.Receipts { + if receipt.GetBlockNumber().Int64() > head.BlockNumber() { + return true + } + } + } + + return false +} + +func findAttemptWithReceiptInChain[ + CHAIN_ID types.ID, + ADDR types.Hashable, + TX_HASH, BLOCK_HASH types.Hashable, + SEQ types.Sequence, + FEE feetypes.Fee, +](etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], head types.Head[BLOCK_HASH]) *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { + for head != nil { for _, attempt := range etx.TxAttempts { for _, receipt := range attempt.Receipts { if receipt.GetBlockHash().String() == head.BlockHash().String() && receipt.GetBlockNumber().Int64() == head.BlockNumber() { - return true + return &attempt } } } - if head.GetParent() == nil { - return false - } + head = head.GetParent() } + + return nil } func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) markForRebroadcast(etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], head types.Head[BLOCK_HASH]) error { diff --git a/common/txmgr/models.go b/common/txmgr/models.go index dd121a2c7c4..ca5e7d4f251 100644 --- a/common/txmgr/models.go +++ b/common/txmgr/models.go @@ -11,4 +11,5 @@ const ( TxUnconfirmed = txmgrtypes.TxState("unconfirmed") TxConfirmed = txmgrtypes.TxState("confirmed") TxConfirmedMissingReceipt = txmgrtypes.TxState("confirmed_missing_receipt") + TxFinalized = txmgrtypes.TxState("finalized") ) diff --git a/common/txmgr/reaper.go b/common/txmgr/reaper.go index 3ed05b2caee..2032e47e8fd 100644 --- a/common/txmgr/reaper.go +++ b/common/txmgr/reaper.go @@ -16,7 +16,6 @@ import ( // Reaper handles periodic database cleanup for Txm type Reaper[CHAIN_ID types.ID] struct { store txmgrtypes.TxHistoryReaper[CHAIN_ID] - config txmgrtypes.ReaperChainConfig txConfig txmgrtypes.ReaperTransactionsConfig chainID CHAIN_ID log logger.Logger @@ -27,10 +26,9 @@ type Reaper[CHAIN_ID types.ID] struct { } // NewReaper instantiates a new reaper object -func NewReaper[CHAIN_ID types.ID](lggr logger.Logger, store txmgrtypes.TxHistoryReaper[CHAIN_ID], config txmgrtypes.ReaperChainConfig, txConfig txmgrtypes.ReaperTransactionsConfig, chainID CHAIN_ID) *Reaper[CHAIN_ID] { +func NewReaper[CHAIN_ID types.ID](lggr logger.Logger, store txmgrtypes.TxHistoryReaper[CHAIN_ID], txConfig txmgrtypes.ReaperTransactionsConfig, chainID CHAIN_ID) *Reaper[CHAIN_ID] { r := &Reaper[CHAIN_ID]{ store, - config, txConfig, chainID, logger.Named(lggr, "Reaper"), @@ -79,7 +77,7 @@ func (r *Reaper[CHAIN_ID]) work() { if latestBlockNum < 0 { return } - err := r.ReapTxes(latestBlockNum) + err := r.ReapTxes(time.Now()) if err != nil { r.log.Error("unable to reap old txes: ", err) } @@ -98,7 +96,7 @@ func (r *Reaper[CHAIN_ID]) SetLatestBlockNum(latestBlockNum int64) { } // ReapTxes deletes old txes -func (r *Reaper[CHAIN_ID]) ReapTxes(headNum int64) error { +func (r *Reaper[CHAIN_ID]) ReapTxes(now time.Time) error { ctx, cancel := r.chStop.NewCtx() defer cancel() threshold := r.txConfig.ReaperThreshold() @@ -106,17 +104,15 @@ func (r *Reaper[CHAIN_ID]) ReapTxes(headNum int64) error { r.log.Debug("Transactions.ReaperThreshold set to 0; skipping ReapTxes") return nil } - minBlockNumberToKeep := headNum - int64(r.config.FinalityDepth()) - mark := time.Now() - timeThreshold := mark.Add(-threshold) + timeThreshold := now.Add(-threshold) - r.log.Debugw(fmt.Sprintf("reaping old txes created before %s", timeThreshold.Format(time.RFC3339)), "ageThreshold", threshold, "timeThreshold", timeThreshold, "minBlockNumberToKeep", minBlockNumberToKeep) + r.log.Debugw(fmt.Sprintf("reaping old txes created before %s", timeThreshold.Format(time.RFC3339)), "ageThreshold", threshold, "timeThreshold", timeThreshold) - if err := r.store.ReapTxHistory(ctx, minBlockNumberToKeep, timeThreshold, r.chainID); err != nil { + if err := r.store.ReapTxHistory(ctx, timeThreshold, r.chainID); err != nil { return err } - r.log.Debugf("ReapTxes completed in %v", time.Since(mark)) + r.log.Debugf("ReapTxes completed in %v", time.Since(now)) return nil } diff --git a/common/txmgr/tracker.go b/common/txmgr/tracker.go index 8b66668c41e..0b8569dd80c 100644 --- a/common/txmgr/tracker.go +++ b/common/txmgr/tracker.go @@ -254,11 +254,7 @@ func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) handleTxesB } switch tx.State { - case TxConfirmed: - if err := tr.handleConfirmedTx(tx, blockHeight); err != nil { - return fmt.Errorf("failed to handle confirmed txes: %w", err) - } - case TxConfirmedMissingReceipt, TxUnconfirmed: + case TxConfirmedMissingReceipt, TxUnconfirmed, TxConfirmed: // Keep tracking tx case TxInProgress, TxUnstarted: // Tx could never be sent on chain even once. That means that we need to sign @@ -269,7 +265,7 @@ func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) handleTxesB return fmt.Errorf("failed to mark tx as fatal: %w", err) } delete(tr.txCache, id) - case TxFatalError: + case TxFatalError, TxFinalized: delete(tr.txCache, id) default: tr.lggr.Errorw(fmt.Sprintf("unhandled transaction state: %v", tx.State)) @@ -279,23 +275,6 @@ func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) handleTxesB return nil } -// handleConfirmedTx removes a transaction from the tracker if it's been finalized on chain -func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) handleConfirmedTx( - tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], - blockHeight int64, -) error { - finalized, err := tr.txStore.IsTxFinalized(tr.ctx, blockHeight, tx.ID, tr.chainID) - if err != nil { - return fmt.Errorf("failed to check if tx is finalized: %w", err) - } - - if finalized { - delete(tr.txCache, tx.ID) - } - - return nil -} - // insertTx inserts a transaction into the tracker as an AbandonedTx func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) insertTx( tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { diff --git a/common/txmgr/txmgr.go b/common/txmgr/txmgr.go index 3e3fa9a20db..5cdf1d10d32 100644 --- a/common/txmgr/txmgr.go +++ b/common/txmgr/txmgr.go @@ -168,7 +168,7 @@ func NewTxm[ b.logger.Info("Resender: Disabled") } if txCfg.ReaperThreshold() > 0 && txCfg.ReaperInterval() > 0 { - b.reaper = NewReaper[CHAIN_ID](lggr, b.txStore, cfg, txCfg, chainId) + b.reaper = NewReaper[CHAIN_ID](lggr, b.txStore, txCfg, chainId) } else { b.logger.Info("TxReaper: Disabled") } diff --git a/common/txmgr/types/client.go b/common/txmgr/types/client.go index 0db50e97ad3..6a5ae14aded 100644 --- a/common/txmgr/types/client.go +++ b/common/txmgr/types/client.go @@ -7,12 +7,15 @@ import ( "time" "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink/v2/common/client" feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types" "github.com/smartcontractkit/chainlink/v2/common/types" ) // TxmClient is a superset of all the methods needed for the txm +// +//go:generate mockery --quiet --name TxmClient --output ./mocks/ --case=underscore type TxmClient[ CHAIN_ID types.ID, ADDR types.Hashable, @@ -30,6 +33,11 @@ type TxmClient[ ctx context.Context, attempts []TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], ) (txReceipt []R, txErr []error, err error) + BatchGetReceiptsWithFinalizedHeight(ctx context.Context, + attempts []TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + useFinalityTag bool, finalityDepth uint32) ( + finalizedBlock *big.Int, txReceipt []R, txErr []error, funcErr error) + FinalizedBlockHash(ctx context.Context) (BLOCK_HASH, *big.Int, error) } // TransactionClient contains the methods for building, simulating, broadcasting transactions diff --git a/common/txmgr/types/config.go b/common/txmgr/types/config.go index 502a7f42d5c..0f5b91215ab 100644 --- a/common/txmgr/types/config.go +++ b/common/txmgr/types/config.go @@ -53,6 +53,7 @@ type ConfirmerFeeConfig interface { type ConfirmerChainConfig interface { RPCDefaultBatchSize() uint32 FinalityDepth() uint32 + FinalityTagEnabled() bool } type ConfirmerDatabaseConfig interface { diff --git a/common/txmgr/types/mocks/tx_store.go b/common/txmgr/types/mocks/tx_store.go index 353f398316d..945b3e8de6d 100644 --- a/common/txmgr/types/mocks/tx_store.go +++ b/common/txmgr/types/mocks/tx_store.go @@ -196,6 +196,36 @@ func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) DeleteInPro return r0 } +// FindConfirmedTransactions provides a mock function with given fields: ctx, chainID +func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindConfirmedTransactions(ctx context.Context, chainID CHAIN_ID) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + ret := _m.Called(ctx, chainID) + + if len(ret) == 0 { + panic("no return value specified for FindConfirmedTransactions") + } + + var r0 []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, CHAIN_ID) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error)); ok { + return rf(ctx, chainID) + } + if rf, ok := ret.Get(0).(func(context.Context, CHAIN_ID) []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]); ok { + r0 = rf(ctx, chainID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, CHAIN_ID) error); ok { + r1 = rf(ctx, chainID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // FindEarliestUnconfirmedBroadcastTime provides a mock function with given fields: ctx, chainID func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindEarliestUnconfirmedBroadcastTime(ctx context.Context, chainID CHAIN_ID) (null.Time, error) { ret := _m.Called(ctx, chainID) @@ -298,36 +328,6 @@ func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindNextUns return r0 } -// FindTransactionsConfirmedInBlockRange provides a mock function with given fields: ctx, highBlockNumber, lowBlockNumber, chainID -func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTransactionsConfirmedInBlockRange(ctx context.Context, highBlockNumber int64, lowBlockNumber int64, chainID CHAIN_ID) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { - ret := _m.Called(ctx, highBlockNumber, lowBlockNumber, chainID) - - if len(ret) == 0 { - panic("no return value specified for FindTransactionsConfirmedInBlockRange") - } - - var r0 []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, int64, int64, CHAIN_ID) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error)); ok { - return rf(ctx, highBlockNumber, lowBlockNumber, chainID) - } - if rf, ok := ret.Get(0).(func(context.Context, int64, int64, CHAIN_ID) []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]); ok { - r0 = rf(ctx, highBlockNumber, lowBlockNumber, chainID) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, int64, int64, CHAIN_ID) error); ok { - r1 = rf(ctx, highBlockNumber, lowBlockNumber, chainID) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - // FindTxAttemptsConfirmedMissingReceipt provides a mock function with given fields: ctx, chainID func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxAttemptsConfirmedMissingReceipt(ctx context.Context, chainID CHAIN_ID) ([]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { ret := _m.Called(ctx, chainID) @@ -836,34 +836,6 @@ func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) HasInProgre return r0, r1 } -// IsTxFinalized provides a mock function with given fields: ctx, blockHeight, txID, chainID -func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) IsTxFinalized(ctx context.Context, blockHeight int64, txID int64, chainID CHAIN_ID) (bool, error) { - ret := _m.Called(ctx, blockHeight, txID, chainID) - - if len(ret) == 0 { - panic("no return value specified for IsTxFinalized") - } - - var r0 bool - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, int64, int64, CHAIN_ID) (bool, error)); ok { - return rf(ctx, blockHeight, txID, chainID) - } - if rf, ok := ret.Get(0).(func(context.Context, int64, int64, CHAIN_ID) bool); ok { - r0 = rf(ctx, blockHeight, txID, chainID) - } else { - r0 = ret.Get(0).(bool) - } - - if rf, ok := ret.Get(1).(func(context.Context, int64, int64, CHAIN_ID) error); ok { - r1 = rf(ctx, blockHeight, txID, chainID) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - // LoadTxAttempts provides a mock function with given fields: ctx, etx func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) LoadTxAttempts(ctx context.Context, etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { ret := _m.Called(ctx, etx) @@ -900,6 +872,24 @@ func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MarkAllConf return r0 } +// MarkFinalized provides a mock function with given fields: ctx, attempts +func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MarkFinalized(ctx context.Context, attempts []int64) error { + ret := _m.Called(ctx, attempts) + + if len(ret) == 0 { + panic("no return value specified for MarkFinalized") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, []int64) error); ok { + r0 = rf(ctx, attempts) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // MarkOldTxesMissingReceiptAsErrored provides a mock function with given fields: ctx, blockNum, finalityDepth, chainID func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MarkOldTxesMissingReceiptAsErrored(ctx context.Context, blockNum int64, finalityDepth uint32, chainID CHAIN_ID) error { ret := _m.Called(ctx, blockNum, finalityDepth, chainID) @@ -966,17 +956,17 @@ func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PruneUnstar return r0, r1 } -// ReapTxHistory provides a mock function with given fields: ctx, minBlockNumberToKeep, timeThreshold, chainID -func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ReapTxHistory(ctx context.Context, minBlockNumberToKeep int64, timeThreshold time.Time, chainID CHAIN_ID) error { - ret := _m.Called(ctx, minBlockNumberToKeep, timeThreshold, chainID) +// ReapTxHistory provides a mock function with given fields: ctx, timeThreshold, chainID +func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ReapTxHistory(ctx context.Context, timeThreshold time.Time, chainID CHAIN_ID) error { + ret := _m.Called(ctx, timeThreshold, chainID) if len(ret) == 0 { panic("no return value specified for ReapTxHistory") } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, int64, time.Time, CHAIN_ID) error); ok { - r0 = rf(ctx, minBlockNumberToKeep, timeThreshold, chainID) + if rf, ok := ret.Get(0).(func(context.Context, time.Time, CHAIN_ID) error); ok { + r0 = rf(ctx, timeThreshold, chainID) } else { r0 = ret.Error(0) } @@ -1020,6 +1010,24 @@ func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveFetched return r0 } +// SaveFinalizedReceipts provides a mock function with given fields: ctx, receipts, chainID +func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveFinalizedReceipts(ctx context.Context, receipts []R, chainID CHAIN_ID) error { + ret := _m.Called(ctx, receipts, chainID) + + if len(ret) == 0 { + panic("no return value specified for SaveFinalizedReceipts") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, []R, CHAIN_ID) error); ok { + r0 = rf(ctx, receipts, chainID) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // SaveInProgressAttempt provides a mock function with given fields: ctx, attempt func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveInProgressAttempt(ctx context.Context, attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { ret := _m.Called(ctx, attempt) diff --git a/common/txmgr/types/mocks/txm_client.go b/common/txmgr/types/mocks/txm_client.go new file mode 100644 index 00000000000..8d1a52747ef --- /dev/null +++ b/common/txmgr/types/mocks/txm_client.go @@ -0,0 +1,382 @@ +// Code generated by mockery v2.38.0. DO NOT EDIT. + +package mocks + +import ( + context "context" + big "math/big" + + client "github.com/smartcontractkit/chainlink/v2/common/client" + + feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types" + + fmt "fmt" + + logger "github.com/smartcontractkit/chainlink-common/pkg/logger" + + mock "github.com/stretchr/testify/mock" + + time "time" + + txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" + + types "github.com/smartcontractkit/chainlink/v2/common/types" +) + +// TxmClient is an autogenerated mock type for the TxmClient type +type TxmClient[CHAIN_ID types.ID, ADDR types.Hashable, TX_HASH types.Hashable, BLOCK_HASH types.Hashable, R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], SEQ types.Sequence, FEE feetypes.Fee] struct { + mock.Mock +} + +// BatchGetReceipts provides a mock function with given fields: ctx, attempts +func (_m *TxmClient[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) BatchGetReceipts(ctx context.Context, attempts []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) ([]R, []error, error) { + ret := _m.Called(ctx, attempts) + + if len(ret) == 0 { + panic("no return value specified for BatchGetReceipts") + } + + var r0 []R + var r1 []error + var r2 error + if rf, ok := ret.Get(0).(func(context.Context, []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) ([]R, []error, error)); ok { + return rf(ctx, attempts) + } + if rf, ok := ret.Get(0).(func(context.Context, []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) []R); ok { + r0 = rf(ctx, attempts) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]R) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) []error); ok { + r1 = rf(ctx, attempts) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).([]error) + } + } + + if rf, ok := ret.Get(2).(func(context.Context, []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error); ok { + r2 = rf(ctx, attempts) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// BatchGetReceiptsWithFinalizedHeight provides a mock function with given fields: ctx, attempts, useFinalityTag, finalityDepth +func (_m *TxmClient[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) BatchGetReceiptsWithFinalizedHeight(ctx context.Context, attempts []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], useFinalityTag bool, finalityDepth uint32) (*big.Int, []R, []error, error) { + ret := _m.Called(ctx, attempts, useFinalityTag, finalityDepth) + + if len(ret) == 0 { + panic("no return value specified for BatchGetReceiptsWithFinalizedHeight") + } + + var r0 *big.Int + var r1 []R + var r2 []error + var r3 error + if rf, ok := ret.Get(0).(func(context.Context, []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], bool, uint32) (*big.Int, []R, []error, error)); ok { + return rf(ctx, attempts, useFinalityTag, finalityDepth) + } + if rf, ok := ret.Get(0).(func(context.Context, []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], bool, uint32) *big.Int); ok { + r0 = rf(ctx, attempts, useFinalityTag, finalityDepth) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*big.Int) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], bool, uint32) []R); ok { + r1 = rf(ctx, attempts, useFinalityTag, finalityDepth) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).([]R) + } + } + + if rf, ok := ret.Get(2).(func(context.Context, []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], bool, uint32) []error); ok { + r2 = rf(ctx, attempts, useFinalityTag, finalityDepth) + } else { + if ret.Get(2) != nil { + r2 = ret.Get(2).([]error) + } + } + + if rf, ok := ret.Get(3).(func(context.Context, []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], bool, uint32) error); ok { + r3 = rf(ctx, attempts, useFinalityTag, finalityDepth) + } else { + r3 = ret.Error(3) + } + + return r0, r1, r2, r3 +} + +// BatchSendTransactions provides a mock function with given fields: ctx, attempts, bathSize, lggr +func (_m *TxmClient[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) BatchSendTransactions(ctx context.Context, attempts []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], bathSize int, lggr logger.SugaredLogger) ([]client.SendTxReturnCode, []error, time.Time, []int64, error) { + ret := _m.Called(ctx, attempts, bathSize, lggr) + + if len(ret) == 0 { + panic("no return value specified for BatchSendTransactions") + } + + var r0 []client.SendTxReturnCode + var r1 []error + var r2 time.Time + var r3 []int64 + var r4 error + if rf, ok := ret.Get(0).(func(context.Context, []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], int, logger.SugaredLogger) ([]client.SendTxReturnCode, []error, time.Time, []int64, error)); ok { + return rf(ctx, attempts, bathSize, lggr) + } + if rf, ok := ret.Get(0).(func(context.Context, []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], int, logger.SugaredLogger) []client.SendTxReturnCode); ok { + r0 = rf(ctx, attempts, bathSize, lggr) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]client.SendTxReturnCode) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], int, logger.SugaredLogger) []error); ok { + r1 = rf(ctx, attempts, bathSize, lggr) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).([]error) + } + } + + if rf, ok := ret.Get(2).(func(context.Context, []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], int, logger.SugaredLogger) time.Time); ok { + r2 = rf(ctx, attempts, bathSize, lggr) + } else { + r2 = ret.Get(2).(time.Time) + } + + if rf, ok := ret.Get(3).(func(context.Context, []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], int, logger.SugaredLogger) []int64); ok { + r3 = rf(ctx, attempts, bathSize, lggr) + } else { + if ret.Get(3) != nil { + r3 = ret.Get(3).([]int64) + } + } + + if rf, ok := ret.Get(4).(func(context.Context, []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], int, logger.SugaredLogger) error); ok { + r4 = rf(ctx, attempts, bathSize, lggr) + } else { + r4 = ret.Error(4) + } + + return r0, r1, r2, r3, r4 +} + +// CallContract provides a mock function with given fields: ctx, attempt, blockNumber +func (_m *TxmClient[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CallContract(ctx context.Context, attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], blockNumber *big.Int) (fmt.Stringer, error) { + ret := _m.Called(ctx, attempt, blockNumber) + + if len(ret) == 0 { + panic("no return value specified for CallContract") + } + + var r0 fmt.Stringer + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], *big.Int) (fmt.Stringer, error)); ok { + return rf(ctx, attempt, blockNumber) + } + if rf, ok := ret.Get(0).(func(context.Context, txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], *big.Int) fmt.Stringer); ok { + r0 = rf(ctx, attempt, blockNumber) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(fmt.Stringer) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], *big.Int) error); ok { + r1 = rf(ctx, attempt, blockNumber) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ConfiguredChainID provides a mock function with given fields: +func (_m *TxmClient[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ConfiguredChainID() CHAIN_ID { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for ConfiguredChainID") + } + + var r0 CHAIN_ID + if rf, ok := ret.Get(0).(func() CHAIN_ID); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(CHAIN_ID) + } + + return r0 +} + +// FinalizedBlockHash provides a mock function with given fields: ctx +func (_m *TxmClient[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FinalizedBlockHash(ctx context.Context) (BLOCK_HASH, *big.Int, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for FinalizedBlockHash") + } + + var r0 BLOCK_HASH + var r1 *big.Int + var r2 error + if rf, ok := ret.Get(0).(func(context.Context) (BLOCK_HASH, *big.Int, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) BLOCK_HASH); ok { + r0 = rf(ctx) + } else { + r0 = ret.Get(0).(BLOCK_HASH) + } + + if rf, ok := ret.Get(1).(func(context.Context) *big.Int); ok { + r1 = rf(ctx) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(*big.Int) + } + } + + if rf, ok := ret.Get(2).(func(context.Context) error); ok { + r2 = rf(ctx) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// PendingSequenceAt provides a mock function with given fields: ctx, addr +func (_m *TxmClient[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PendingSequenceAt(ctx context.Context, addr ADDR) (SEQ, error) { + ret := _m.Called(ctx, addr) + + if len(ret) == 0 { + panic("no return value specified for PendingSequenceAt") + } + + var r0 SEQ + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, ADDR) (SEQ, error)); ok { + return rf(ctx, addr) + } + if rf, ok := ret.Get(0).(func(context.Context, ADDR) SEQ); ok { + r0 = rf(ctx, addr) + } else { + r0 = ret.Get(0).(SEQ) + } + + if rf, ok := ret.Get(1).(func(context.Context, ADDR) error); ok { + r1 = rf(ctx, addr) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// SendEmptyTransaction provides a mock function with given fields: ctx, newTxAttempt, seq, gasLimit, fee, fromAddress +func (_m *TxmClient[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SendEmptyTransaction(ctx context.Context, newTxAttempt func(SEQ, uint32, FEE, ADDR) (txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error), seq SEQ, gasLimit uint32, fee FEE, fromAddress ADDR) (string, error) { + ret := _m.Called(ctx, newTxAttempt, seq, gasLimit, fee, fromAddress) + + if len(ret) == 0 { + panic("no return value specified for SendEmptyTransaction") + } + + var r0 string + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, func(SEQ, uint32, FEE, ADDR) (txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error), SEQ, uint32, FEE, ADDR) (string, error)); ok { + return rf(ctx, newTxAttempt, seq, gasLimit, fee, fromAddress) + } + if rf, ok := ret.Get(0).(func(context.Context, func(SEQ, uint32, FEE, ADDR) (txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error), SEQ, uint32, FEE, ADDR) string); ok { + r0 = rf(ctx, newTxAttempt, seq, gasLimit, fee, fromAddress) + } else { + r0 = ret.Get(0).(string) + } + + if rf, ok := ret.Get(1).(func(context.Context, func(SEQ, uint32, FEE, ADDR) (txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error), SEQ, uint32, FEE, ADDR) error); ok { + r1 = rf(ctx, newTxAttempt, seq, gasLimit, fee, fromAddress) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// SendTransactionReturnCode provides a mock function with given fields: ctx, tx, attempt, lggr +func (_m *TxmClient[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SendTransactionReturnCode(ctx context.Context, tx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], lggr logger.SugaredLogger) (client.SendTxReturnCode, error) { + ret := _m.Called(ctx, tx, attempt, lggr) + + if len(ret) == 0 { + panic("no return value specified for SendTransactionReturnCode") + } + + var r0 client.SendTxReturnCode + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], logger.SugaredLogger) (client.SendTxReturnCode, error)); ok { + return rf(ctx, tx, attempt, lggr) + } + if rf, ok := ret.Get(0).(func(context.Context, txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], logger.SugaredLogger) client.SendTxReturnCode); ok { + r0 = rf(ctx, tx, attempt, lggr) + } else { + r0 = ret.Get(0).(client.SendTxReturnCode) + } + + if rf, ok := ret.Get(1).(func(context.Context, txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], logger.SugaredLogger) error); ok { + r1 = rf(ctx, tx, attempt, lggr) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// SequenceAt provides a mock function with given fields: ctx, addr, blockNum +func (_m *TxmClient[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SequenceAt(ctx context.Context, addr ADDR, blockNum *big.Int) (SEQ, error) { + ret := _m.Called(ctx, addr, blockNum) + + if len(ret) == 0 { + panic("no return value specified for SequenceAt") + } + + var r0 SEQ + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, ADDR, *big.Int) (SEQ, error)); ok { + return rf(ctx, addr, blockNum) + } + if rf, ok := ret.Get(0).(func(context.Context, ADDR, *big.Int) SEQ); ok { + r0 = rf(ctx, addr, blockNum) + } else { + r0 = ret.Get(0).(SEQ) + } + + if rf, ok := ret.Get(1).(func(context.Context, ADDR, *big.Int) error); ok { + r1 = rf(ctx, addr, blockNum) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// NewTxmClient creates a new instance of TxmClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewTxmClient[CHAIN_ID types.ID, ADDR types.Hashable, TX_HASH types.Hashable, BLOCK_HASH types.Hashable, R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], SEQ types.Sequence, FEE feetypes.Fee](t interface { + mock.TestingT + Cleanup(func()) +}) *TxmClient[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { + mock := &TxmClient[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/common/txmgr/types/tx.go b/common/txmgr/types/tx.go index 3b294adcd07..fa827520922 100644 --- a/common/txmgr/types/tx.go +++ b/common/txmgr/types/tx.go @@ -41,6 +41,7 @@ const ( TxAttemptInProgress TxAttemptState = iota + 1 TxAttemptInsufficientFunds TxAttemptBroadcast + TxAttemptFinalized txAttemptStateCount // always at end to calculate number of states ) @@ -49,6 +50,7 @@ var txAttemptStateStrings = []string{ TxAttemptInProgress: "in_progress", TxAttemptInsufficientFunds: "insufficient_funds", TxAttemptBroadcast: "broadcast", + TxAttemptFinalized: "finalized", } func NewTxAttemptState(state string) (s TxAttemptState) { @@ -189,6 +191,15 @@ func (a *TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) String() stri return fmt.Sprintf("TxAttempt(ID:%d,TxID:%d,Fee:%s,TxType:%d", a.ID, a.TxID, a.TxFee, a.TxType) } +func (a *TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) GetLogger(lgr logger.Logger) logger.SugaredLogger { + l := logger.Sugared(lgr) + if a.Tx.ID != 0 { + l = a.Tx.GetLogger(lgr) + } + + return l.With("txHash", a.Hash.String(), "txAttemptID", a.ID) +} + type Tx[ CHAIN_ID types.ID, ADDR types.Hashable, diff --git a/common/txmgr/types/tx_store.go b/common/txmgr/types/tx_store.go index 742a1740033..7d682e151ed 100644 --- a/common/txmgr/types/tx_store.go +++ b/common/txmgr/types/tx_store.go @@ -40,6 +40,8 @@ type TxStore[ // Update tx to mark that its callback has been signaled UpdateTxCallbackCompleted(ctx context.Context, pipelineTaskRunRid uuid.UUID, chainId CHAIN_ID) error SaveFetchedReceipts(ctx context.Context, receipts []R, chainID CHAIN_ID) (err error) + SaveFinalizedReceipts(ctx context.Context, receipts []R, chainID CHAIN_ID) (err error) + MarkFinalized(ctx context.Context, attempts []int64) error // additional methods for tx store management CheckTxQueueCapacity(ctx context.Context, fromAddress ADDR, maxQueuedTransactions uint64, chainID CHAIN_ID) (err error) @@ -80,7 +82,6 @@ type TransactionStore[ // Search for Tx using the fromAddress and sequence FindTxWithSequence(ctx context.Context, fromAddress ADDR, seq SEQ) (etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) FindNextUnstartedTransactionFromAddress(ctx context.Context, etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], fromAddress ADDR, chainID CHAIN_ID) error - FindTransactionsConfirmedInBlockRange(ctx context.Context, highBlockNumber, lowBlockNumber int64, chainID CHAIN_ID) (etxs []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) FindEarliestUnconfirmedBroadcastTime(ctx context.Context, chainID CHAIN_ID) (null.Time, error) FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context, chainID CHAIN_ID) (null.Int, error) GetTxInProgress(ctx context.Context, fromAddress ADDR) (etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) @@ -106,11 +107,11 @@ type TransactionStore[ UpdateTxUnstartedToInProgress(ctx context.Context, etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt *TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error UpdateTxFatalError(ctx context.Context, etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error UpdateTxForRebroadcast(ctx context.Context, etx Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], etxAttempt TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error - IsTxFinalized(ctx context.Context, blockHeight int64, txID int64, chainID CHAIN_ID) (finalized bool, err error) + FindConfirmedTransactions(ctx context.Context, chainID CHAIN_ID) (etxs []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) } type TxHistoryReaper[CHAIN_ID types.ID] interface { - ReapTxHistory(ctx context.Context, minBlockNumberToKeep int64, timeThreshold time.Time, chainID CHAIN_ID) error + ReapTxHistory(ctx context.Context, timeThreshold time.Time, chainID CHAIN_ID) error } type UnstartedTxQueuePruner interface { diff --git a/core/chains/evm/client/chain_client.go b/core/chains/evm/client/chain_client.go index b16054b69a8..1f3c09b1c50 100644 --- a/core/chains/evm/client/chain_client.go +++ b/core/chains/evm/client/chain_client.go @@ -281,3 +281,7 @@ func (c *chainClient) TransactionReceipt(ctx context.Context, txHash common.Hash //return rpc.TransactionReceipt(ctx, txHash) return rpc.TransactionReceiptGeth(ctx, txHash) } + +func (c *chainClient) FinalizedBlock(ctx context.Context) (*evmtypes.Head, error) { + return c.multiNode.FinalizedBlock(ctx) +} diff --git a/core/chains/evm/client/client.go b/core/chains/evm/client/client.go index e2ae8c26403..e71085b3479 100644 --- a/core/chains/evm/client/client.go +++ b/core/chains/evm/client/client.go @@ -63,6 +63,7 @@ type Client interface { HeadByNumber(ctx context.Context, n *big.Int) (*evmtypes.Head, error) HeadByHash(ctx context.Context, n common.Hash) (*evmtypes.Head, error) SubscribeNewHead(ctx context.Context, ch chan<- *evmtypes.Head) (ethereum.Subscription, error) + FinalizedBlock(ctx context.Context) (head *evmtypes.Head, err error) SendTransactionReturnCode(ctx context.Context, tx *types.Transaction, fromAddress common.Address) (commonclient.SendTxReturnCode, error) @@ -285,7 +286,11 @@ func (client *client) LatestBlockHeight(ctx context.Context) (*big.Int, error) { func (client *client) HeadByNumber(ctx context.Context, number *big.Int) (head *evmtypes.Head, err error) { hex := ToBlockNumArg(number) - err = client.pool.CallContext(ctx, &head, "eth_getBlockByNumber", hex, false) + return client.headByNumber(ctx, hex) +} + +func (client *client) headByNumber(ctx context.Context, num string) (head *evmtypes.Head, err error) { + err = client.pool.CallContext(ctx, &head, "eth_getBlockByNumber", num, false) if err != nil { return nil, err } @@ -297,6 +302,10 @@ func (client *client) HeadByNumber(ctx context.Context, number *big.Int) (head * return } +func (client *client) FinalizedBlock(ctx context.Context) (head *evmtypes.Head, err error) { + return client.headByNumber(ctx, rpc.FinalizedBlockNumber.String()) +} + func (client *client) HeadByHash(ctx context.Context, hash common.Hash) (head *evmtypes.Head, err error) { err = client.pool.CallContext(ctx, &head, "eth_getBlockByHash", hash.Hex(), false) if err != nil { diff --git a/core/chains/evm/client/mocks/client.go b/core/chains/evm/client/mocks/client.go index bbaaafd7615..61121b618d8 100644 --- a/core/chains/evm/client/mocks/client.go +++ b/core/chains/evm/client/mocks/client.go @@ -367,6 +367,36 @@ func (_m *Client) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]typ return r0, r1 } +// FinalizedBlock provides a mock function with given fields: ctx +func (_m *Client) FinalizedBlock(ctx context.Context) (*evmtypes.Head, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for FinalizedBlock") + } + + var r0 *evmtypes.Head + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (*evmtypes.Head, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) *evmtypes.Head); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*evmtypes.Head) + } + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // HeadByHash provides a mock function with given fields: ctx, n func (_m *Client) HeadByHash(ctx context.Context, n common.Hash) (*evmtypes.Head, error) { ret := _m.Called(ctx, n) diff --git a/core/chains/evm/client/null_client.go b/core/chains/evm/client/null_client.go index 3cbae9e9dde..5365427f460 100644 --- a/core/chains/evm/client/null_client.go +++ b/core/chains/evm/client/null_client.go @@ -11,6 +11,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/assets" "github.com/smartcontractkit/chainlink-common/pkg/logger" + commonclient "github.com/smartcontractkit/chainlink/v2/common/client" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" ) @@ -67,6 +68,11 @@ func (nc *NullClient) HeadByHash(ctx context.Context, h common.Hash) (*evmtypes. return nil, nil } +func (nc *NullClient) FinalizedBlock(ctx context.Context) (*evmtypes.Head, error) { + nc.lggr.Debug("FinalizedBlock") + return nil, nil +} + type nullSubscription struct { lggr logger.Logger } diff --git a/core/chains/evm/client/rpc_client.go b/core/chains/evm/client/rpc_client.go index 54656cf1d3e..3990b5458e9 100644 --- a/core/chains/evm/client/rpc_client.go +++ b/core/chains/evm/client/rpc_client.go @@ -479,9 +479,17 @@ func (r *rpcClient) HeaderByHash(ctx context.Context, hash common.Hash) (header return } +func (r *rpcClient) FinalizedBlock(ctx context.Context) (head *evmtypes.Head, err error) { + return r.blockByNumber(ctx, rpc.FinalizedBlockNumber.String()) +} + func (r *rpcClient) BlockByNumber(ctx context.Context, number *big.Int) (head *evmtypes.Head, err error) { hex := ToBlockNumArg(number) - err = r.CallContext(ctx, &head, "eth_getBlockByNumber", hex, false) + return r.blockByNumber(ctx, hex) +} + +func (r *rpcClient) blockByNumber(ctx context.Context, number string) (head *evmtypes.Head, err error) { + err = r.CallContext(ctx, &head, "eth_getBlockByNumber", number, false) if err != nil { return nil, err } diff --git a/core/chains/evm/client/simulated_backend_client.go b/core/chains/evm/client/simulated_backend_client.go index c49637e7890..4b9763bb6b6 100644 --- a/core/chains/evm/client/simulated_backend_client.go +++ b/core/chains/evm/client/simulated_backend_client.go @@ -198,13 +198,17 @@ func (c *SimulatedBackendClient) HeadByNumber(ctx context.Context, n *big.Int) ( } else if header == nil { return nil, ethereum.NotFound } + return c.headerToHead(header), nil +} + +func (c *SimulatedBackendClient) headerToHead(header *types.Header) *evmtypes.Head { return &evmtypes.Head{ EVMChainID: ubig.NewI(c.chainId.Int64()), Hash: header.Hash(), Number: header.Number.Int64(), ParentHash: header.ParentHash, Timestamp: time.Unix(int64(header.Time), 0), - }, nil + } } // HeadByHash returns our own header type. @@ -215,13 +219,7 @@ func (c *SimulatedBackendClient) HeadByHash(ctx context.Context, h common.Hash) } else if header == nil { return nil, ethereum.NotFound } - return &evmtypes.Head{ - EVMChainID: ubig.NewI(c.chainId.Int64()), - Hash: header.Hash(), - Number: header.Number.Int64(), - ParentHash: header.ParentHash, - Timestamp: time.Unix(int64(header.Time), 0), - }, nil + return c.headerToHead(header), nil } // BlockByNumber returns a geth block type. @@ -677,6 +675,11 @@ func (c *SimulatedBackendClient) ethGetHeaderByNumber(ctx context.Context, resul return nil } +func (c *SimulatedBackendClient) FinalizedBlock(ctx context.Context) (head *evmtypes.Head, err error) { + header := c.b.Blockchain().CurrentFinalBlock() + return c.headerToHead(header), nil +} + func toCallMsg(params map[string]interface{}) ethereum.CallMsg { var callMsg ethereum.CallMsg toAddr, err := interfaceToAddress(params["to"]) diff --git a/core/chains/evm/txmgr/builder.go b/core/chains/evm/txmgr/builder.go index f0cbcbf8d92..2042d98dc02 100644 --- a/core/chains/evm/txmgr/builder.go +++ b/core/chains/evm/txmgr/builder.go @@ -7,6 +7,7 @@ import ( "github.com/jmoiron/sqlx" "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink/v2/common/txmgr" txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" @@ -98,7 +99,7 @@ func NewEvmResender( // NewEvmReaper instantiates a new EVM-specific reaper object func NewEvmReaper(lggr logger.Logger, store txmgrtypes.TxHistoryReaper[*big.Int], config EvmReaperConfig, txConfig txmgrtypes.ReaperTransactionsConfig, chainID *big.Int) *Reaper { - return txmgr.NewReaper(lggr, store, config, txConfig, chainID) + return txmgr.NewReaper(lggr, store, txConfig, chainID) } // NewEvmConfirmer instantiates a new EVM confirmer diff --git a/core/chains/evm/txmgr/client.go b/core/chains/evm/txmgr/client.go index 0aa03536276..20d65fda33f 100644 --- a/core/chains/evm/txmgr/client.go +++ b/core/chains/evm/txmgr/client.go @@ -120,25 +120,86 @@ func (c *evmTxmClient) SequenceAt(ctx context.Context, addr common.Address, bloc func (c *evmTxmClient) BatchGetReceipts(ctx context.Context, attempts []TxAttempt) (txReceipt []*evmtypes.Receipt, txErr []error, funcErr error) { var reqs []rpc.BatchElem - for _, attempt := range attempts { + reqs, txReceipt, txErr = newGetBatchReceiptsReq(attempts) + + if err := c.client.BatchCallContext(ctx, reqs); err != nil { + return nil, nil, fmt.Errorf("EthConfirmer#batchFetchReceipts error fetching receipts with BatchCallContext: %w", err) + } + + for i, req := range reqs { + txErr[i] = req.Error + } + return txReceipt, txErr, nil +} + +func newGetBatchReceiptsReq(attempts []TxAttempt) (reqs []rpc.BatchElem, txReceipts []*evmtypes.Receipt, txErrs []error) { + reqs = make([]rpc.BatchElem, len(attempts)) + txReceipts = make([]*evmtypes.Receipt, len(attempts)) + txErrs = make([]error, len(attempts)) + for i, attempt := range attempts { res := &evmtypes.Receipt{} req := rpc.BatchElem{ Method: "eth_getTransactionReceipt", Args: []interface{}{attempt.Hash}, Result: res, } - txReceipt = append(txReceipt, res) - reqs = append(reqs, req) + txReceipts[i] = res + reqs[i] = req } + return reqs, txReceipts, txErrs +} + +// FinalizedBlockHash - returns hash and block number of latest finalized block. +// Must not be called on chains that do not support finality tag. +func (c *evmTxmClient) FinalizedBlockHash(ctx context.Context) (common.Hash, *big.Int, error) { + head, err := c.client.FinalizedBlock(ctx) + if err != nil || head == nil { + return common.Hash{}, big.NewInt(0), err + } + + return head.Hash, big.NewInt(head.BlockNumber()), nil +} + +// BatchGetReceiptsWithFinalizedHeight - returns most recently finalized block and receipts for the attempts. +// If finality tag is enabled - uses corresponding RPC request to get finalizaed block. +// Otherwise calculates it based on finalityDepth and latest block number. +func (c *evmTxmClient) BatchGetReceiptsWithFinalizedHeight(ctx context.Context, attempts []TxAttempt, useFinalityTag bool, finalityDepth uint32) ( + finalizedBlock *big.Int, receipts []*evmtypes.Receipt, receiptErrs []error, funcErr error) { + + var reqs []rpc.BatchElem + reqs, receipts, receiptErrs = newGetBatchReceiptsReq(attempts) + + blockNumber := rpc.LatestBlockNumber + if useFinalityTag { + blockNumber = rpc.FinalizedBlockNumber + } + + var head evmtypes.Head + blockRequest := rpc.BatchElem{ + Method: "eth_getBlockByNumber", + Args: []interface{}{blockNumber, false}, + Result: &head, + } + reqs = append(reqs, blockRequest) + if err := c.client.BatchCallContext(ctx, reqs); err != nil { - return nil, nil, fmt.Errorf("EthConfirmer#batchFetchReceipts error fetching receipts with BatchCallContext: %w", err) + return nil, nil, nil, fmt.Errorf("BatchGetReceiptsWithFinalizedHeight error fetching receipts with BatchCallContext: %w", err) } - for _, req := range reqs { - txErr = append(txErr, req.Error) + if blockRequest.Error != nil { + return nil, nil, nil, fmt.Errorf("failed to fetch finalized block with BatchCallContext: %w", blockRequest.Error) } - return txReceipt, txErr, nil + + for i := range receiptErrs { + receiptErrs[i] = reqs[i].Error + } + + finalizedBlock = big.NewInt(head.BlockNumber() - int64(finalityDepth)) + if useFinalityTag { + finalizedBlock = big.NewInt(head.BlockNumber()) + } + return finalizedBlock, receipts, receiptErrs, nil } // sendEmptyTransaction sends a transaction with 0 Eth and an empty payload to the burn address diff --git a/core/chains/evm/txmgr/client_test.go b/core/chains/evm/txmgr/client_test.go new file mode 100644 index 00000000000..ac2651a0a3b --- /dev/null +++ b/core/chains/evm/txmgr/client_test.go @@ -0,0 +1,89 @@ +package txmgr_test + +import ( + "math/big" + "testing" + + "github.com/ethereum/go-ethereum/rpc" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" + evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest" +) + +func TestClient_BatchGetReceiptsWithFinalizedHeight(t *testing.T) { + client := evmtest.NewEthClientMockWithDefaultChain(t) + txmClient := txmgr.NewEvmTxmClient(client) + testCases := []struct { + Name string + // inputs + UseFinalityTag bool + FinalityDepth uint32 + + // RPC response + RPCErr error + BlockError error + RPCHead evmtypes.Head + + // Call Results + ExpectedBlock *big.Int + ExpectedErr error + }{ + { + Name: "returns error if call fails", + UseFinalityTag: true, + FinalityDepth: 10, + + RPCErr: errors.New("failed to call RPC"), + + ExpectedErr: errors.New("failed to call RPC"), + }, + { + Name: "returns error if fail to fetch block", + + BlockError: errors.New("failed to get bock"), + + ExpectedErr: errors.New("failed to get block"), + }, + { + Name: "Returns block as is, if we are using finality tag", + UseFinalityTag: true, + FinalityDepth: 10, + + ExpectedBlock: big.NewInt(100), + + RPCHead: evmtypes.Head{Number: 100}, + }, + { + Name: "Subtracts finality depth if finality tag is disabled", + UseFinalityTag: false, + FinalityDepth: 10, + + ExpectedBlock: big.NewInt(90), + + RPCHead: evmtypes.Head{Number: 100}, + }, + } + + for _, testCase := range testCases { + client.On("BatchCallContext", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + reqs := args.Get(1).([]rpc.BatchElem) + blockReq := reqs[len(reqs)-1] + blockReq.Error = testCase.BlockError + reqHead := blockReq.Result.(*evmtypes.Head) + *reqHead = testCase.RPCHead + }).Return(testCase.ExpectedErr).Once() + block, _, _, err := txmClient.BatchGetReceiptsWithFinalizedHeight(testutils.Context(t), nil, testCase.UseFinalityTag, testCase.FinalityDepth) + if testCase.ExpectedErr != nil { + assert.Error(t, testCase.ExpectedErr, err) + } else { + assert.NoError(t, err) + } + + assert.Equal(t, testCase.ExpectedBlock, block) + } +} diff --git a/core/chains/evm/txmgr/confirmer_test.go b/core/chains/evm/txmgr/confirmer_test.go index 6cb14a8d618..03506db5c2d 100644 --- a/core/chains/evm/txmgr/confirmer_test.go +++ b/core/chains/evm/txmgr/confirmer_test.go @@ -22,6 +22,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" + commonclient "github.com/smartcontractkit/chainlink/v2/common/client" commonfee "github.com/smartcontractkit/chainlink/v2/common/fee" txmgrcommon "github.com/smartcontractkit/chainlink/v2/common/txmgr" @@ -32,6 +33,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas" gasmocks "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas/mocks" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" + txmgrmocks "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr/mocks" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils" "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" @@ -2653,17 +2655,20 @@ func TestEthConfirmer_EnsureConfirmedTransactionsInLongestChain(t *testing.T) { t.Parallel() db := pgtest.NewSqlxDB(t) - cfg := configtest.NewTestGeneralConfig(t) - txStore := cltest.NewTestTxStore(t, db, cfg.Database()) + generalCfg := configtest.NewGeneralConfig(t, func(config *chainlink.Config, _ *chainlink.Secrets) { + config.EVM[0].FinalityDepth = ptr(uint32(2)) + }) + txStore := cltest.NewTestTxStore(t, db, generalCfg.Database()) - ethKeyStore := cltest.NewKeyStore(t, db, cfg.Database()).Eth() + ethKeyStore := cltest.NewKeyStore(t, db, generalCfg.Database()).Eth() _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore) - ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + evmCfg := evmtest.NewChainScopedConfig(t, generalCfg) + ec := newEthConfirmer(t, txStore, evmtest.NewEthClientMockWithDefaultChain(t), evmCfg, ethKeyStore, nil) - config := newTestChainScopedConfig(t) - ec := newEthConfirmer(t, txStore, ethClient, config, ethKeyStore, nil) + ethClient := txmgrmocks.NewEvmTxmClientWithDefaultChain(t) + ec.XXXTestSetClient(ethClient) head := evmtypes.Head{ Hash: utils.NewHash(), @@ -2694,9 +2699,23 @@ func TestEthConfirmer_EnsureConfirmedTransactionsInLongestChain(t *testing.T) { assert.Equal(t, txmgrcommon.TxUnconfirmed, etx.State) }) - t.Run("does nothing to confirmed transactions with receipts within head height of the chain and included in the chain", func(t *testing.T) { + t.Run("does nothing to confirmed missing receipt transactions", func(t *testing.T) { + missingReceipt := cltest.MustInsertConfirmedMissingReceiptEthTxWithLegacyAttempt(t, txStore, 1, 1, fromAddress) + + // Do the thing + require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(testutils.Context(t), &head)) + + missingReceipt, err := txStore.FindTxWithAttempts(missingReceipt.ID) + require.NoError(t, err) + assert.Equal(t, txmgrcommon.TxConfirmedMissingReceipt, missingReceipt.State) + }) + + t.Run("does nothing to confirmed transactions with receipts within head height of the chain and included in the chain, but not finalized", func(t *testing.T) { + // clean up before running the test + require.NoError(t, txStore.DeleteAll(testutils.Context(t))) + etx := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 2, 1, fromAddress) - mustInsertEthReceipt(t, txStore, head.Number, head.Hash, etx.TxAttempts[0].Hash) + _ = mustInsertEthReceipt(t, txStore, head.Number, head.Hash, etx.TxAttempts[0].Hash) // Do the thing require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(testutils.Context(t), &head)) @@ -2705,11 +2724,17 @@ func TestEthConfirmer_EnsureConfirmedTransactionsInLongestChain(t *testing.T) { require.NoError(t, err) assert.Equal(t, txmgrcommon.TxConfirmed, etx.State) }) + t.Run("does nothing to confirmed transactions with receipts older than head height of the chain that are present in RPC response", func(t *testing.T) { + // clean up before running the test + require.NoError(t, txStore.DeleteAll(testutils.Context(t))) - t.Run("does nothing to confirmed transactions that only have receipts older than the start of the chain", func(t *testing.T) { etx := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 3, 1, fromAddress) // Add receipt that is older than the lowest block of the chain - mustInsertEthReceipt(t, txStore, head.Parent.Parent.Number-1, utils.NewHash(), etx.TxAttempts[0].Hash) + receipt := mustInsertEthReceipt(t, txStore, head.Parent.Parent.Number-1, utils.NewHash(), etx.TxAttempts[0].Hash) + + ethClient.On("BatchGetReceiptsWithFinalizedHeight", mock.Anything, mock.Anything, + evmCfg.EVM().FinalityTagEnabled(), evmCfg.EVM().FinalityDepth(), + ).Return(big.NewInt(0), []*evmtypes.Receipt{&receipt.Receipt}, []error{nil}, nil).Once() // Do the thing require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(testutils.Context(t), &head)) @@ -2718,32 +2743,86 @@ func TestEthConfirmer_EnsureConfirmedTransactionsInLongestChain(t *testing.T) { require.NoError(t, err) assert.Equal(t, txmgrcommon.TxConfirmed, etx.State) }) + t.Run("marks finalized confirmed transactions with receipts within head height of the chain and included in the finalized block", func(t *testing.T) { + // clean up before running the test + require.NoError(t, txStore.DeleteAll(testutils.Context(t))) - t.Run("unconfirms and rebroadcasts transactions that have receipts within head height of the chain but not included in the chain", func(t *testing.T) { - etx := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 4, 1, fromAddress) - attempt := etx.TxAttempts[0] - // Include one within head height but a different block hash - mustInsertEthReceipt(t, txStore, head.Parent.Number, utils.NewHash(), attempt.Hash) - - ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *types.Transaction) bool { - atx, err := txmgr.GetGethSignedTx(attempt.SignedRawTx) - require.NoError(t, err) - // Keeps gas price and nonce the same - return atx.GasPrice().Cmp(tx.GasPrice()) == 0 && atx.Nonce() == tx.Nonce() - }), fromAddress).Return(commonclient.Successful, nil).Once() + etx := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 2, 1, fromAddress) + _ = mustInsertEthReceipt(t, txStore, head.Parent.Parent.Number, head.Parent.Parent.Hash, etx.TxAttempts[0].Hash) // Do the thing require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(testutils.Context(t), &head)) + etx, err := txStore.FindTxWithAttempts(etx.ID) + require.NoError(t, err) + assert.Equal(t, txmgrcommon.TxFinalized, etx.State) + }) + + assertRebroadcastedTx := func(t *testing.T, etx txmgr.Tx) { etx, err := txStore.FindTxWithAttempts(etx.ID) require.NoError(t, err) assert.Equal(t, txmgrcommon.TxUnconfirmed, etx.State) require.Len(t, etx.TxAttempts, 1) - attempt = etx.TxAttempts[0] + attempt := etx.TxAttempts[0] assert.Equal(t, txmgrtypes.TxAttemptBroadcast, attempt.State) + } + + t.Run("unconfirms and rebroadcasts confirmed transactions that only have db receipts but no chain receipts, even if db receipts are older than the start of the local chain", func(t *testing.T) { + // clean up before running the test + require.NoError(t, txStore.DeleteAll(testutils.Context(t))) + + etx := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 3, 1, fromAddress) + // Add receipt that is older than the lowest block of the chain + mustInsertEthReceipt(t, txStore, head.Parent.Parent.Number-1, utils.NewHash(), etx.TxAttempts[0].Hash) + + ethClient.On("BatchGetReceiptsWithFinalizedHeight", mock.Anything, mock.Anything, + evmCfg.EVM().FinalityTagEnabled(), evmCfg.EVM().FinalityDepth(), + ).Return(big.NewInt(0), []*evmtypes.Receipt{nil}, []error{nil}, nil).Once() + + attempt := etx.TxAttempts[0] + ethClient.On("SendTransactionReturnCode", mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + actualAttempt := args.Get(2).(txmgr.TxAttempt) + assert.Equal(t, attempt.SignedRawTx, actualAttempt.SignedRawTx) + }). + Return(commonclient.Successful, nil).Once() + + // Do the thing + require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(testutils.Context(t), &head)) + + assertRebroadcastedTx(t, etx) + }) + + t.Run("unconfirms and rebroadcasts transactions that have receipts within head height of the local chain but not included in the HeadTracker's chain or have RPC receipts", func(t *testing.T) { + // clean up before running the test + require.NoError(t, txStore.DeleteAll(testutils.Context(t))) + + etx := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 4, 1, fromAddress) + attempt := etx.TxAttempts[0] + // Include one within head height but a different block hash + mustInsertEthReceipt(t, txStore, head.Parent.Number, utils.NewHash(), attempt.Hash) + + ethClient.On("BatchGetReceiptsWithFinalizedHeight", mock.Anything, mock.Anything, + evmCfg.EVM().FinalityTagEnabled(), evmCfg.EVM().FinalityDepth(), + ).Return(big.NewInt(0), []*evmtypes.Receipt{nil}, []error{nil}, nil).Once() + + ethClient.On("SendTransactionReturnCode", mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + actualAttempt := args.Get(2).(txmgr.TxAttempt) + assert.Equal(t, attempt.SignedRawTx, actualAttempt.SignedRawTx) + }). + Return(commonclient.Successful, nil).Once() + + // Do the thing + require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(testutils.Context(t), &head)) + + assertRebroadcastedTx(t, etx) }) t.Run("unconfirms and rebroadcasts transactions that have receipts within head height of chain but not included in the chain even if a receipt exists older than the start of the chain", func(t *testing.T) { + // clean up before running the test + require.NoError(t, txStore.DeleteAll(testutils.Context(t))) + etx := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 5, 1, fromAddress) attempt := etx.TxAttempts[0] attemptHash := attempt.Hash @@ -2752,21 +2831,28 @@ func TestEthConfirmer_EnsureConfirmedTransactionsInLongestChain(t *testing.T) { // Include one within head height but a different block hash mustInsertEthReceipt(t, txStore, head.Parent.Number, utils.NewHash(), attemptHash) - ethClient.On("SendTransactionReturnCode", mock.Anything, mock.Anything, fromAddress).Return( - commonclient.Successful, nil).Once() + // none of the receipts are found + ethClient.On("BatchGetReceiptsWithFinalizedHeight", mock.Anything, mock.Anything, + evmCfg.EVM().FinalityTagEnabled(), evmCfg.EVM().FinalityDepth(), + ).Return(big.NewInt(0), []*evmtypes.Receipt{nil}, []error{nil}, nil).Once() + + ethClient.On("SendTransactionReturnCode", mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + actualAttempt := args.Get(2).(txmgr.TxAttempt) + assert.Equal(t, attempt.SignedRawTx, actualAttempt.SignedRawTx) + }). + Return(commonclient.Successful, nil).Once() // Do the thing require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(testutils.Context(t), &head)) - etx, err := txStore.FindTxWithAttempts(etx.ID) - require.NoError(t, err) - assert.Equal(t, txmgrcommon.TxUnconfirmed, etx.State) - require.Len(t, etx.TxAttempts, 1) - attempt = etx.TxAttempts[0] - assert.Equal(t, txmgrtypes.TxAttemptBroadcast, attempt.State) + assertRebroadcastedTx(t, etx) }) t.Run("if more than one attempt has a receipt (should not be possible but isn't prevented by database constraints) unconfirms and rebroadcasts only the attempt with the highest gas price", func(t *testing.T) { + // clean up before running the test + require.NoError(t, txStore.DeleteAll(testutils.Context(t))) + etx := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 6, 1, fromAddress) require.Len(t, etx.TxAttempts, 1) // Sanity check to assert the included attempt has the lowest gas price @@ -2784,11 +2870,17 @@ func TestEthConfirmer_EnsureConfirmedTransactionsInLongestChain(t *testing.T) { // Receipt is within head height but a different block hash mustInsertEthReceipt(t, txStore, head.Parent.Number, utils.NewHash(), attempt3.Hash) - ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *types.Transaction) bool { - s, err := txmgr.GetGethSignedTx(attempt3.SignedRawTx) - require.NoError(t, err) - return tx.Hash() == s.Hash() - }), fromAddress).Return(commonclient.Successful, nil).Once() + // none of the receipts are found + ethClient.On("BatchGetReceiptsWithFinalizedHeight", mock.Anything, mock.Anything, + evmCfg.EVM().FinalityTagEnabled(), evmCfg.EVM().FinalityDepth(), + ).Return(big.NewInt(0), []*evmtypes.Receipt{nil, nil, nil}, []error{nil, nil, nil}, nil).Once() + + ethClient.On("SendTransactionReturnCode", mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + actualAttempt := args.Get(2).(txmgr.TxAttempt) + assert.Equal(t, attempt3.SignedRawTx, actualAttempt.SignedRawTx) + }). + Return(commonclient.Successful, nil).Once() // Do the thing require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(testutils.Context(t), &head)) @@ -2806,6 +2898,9 @@ func TestEthConfirmer_EnsureConfirmedTransactionsInLongestChain(t *testing.T) { }) t.Run("if receipt has a block number that is in the future, does not mark for rebroadcast (the safe thing to do is simply wait until heads catches up)", func(t *testing.T) { + // clean up before running the test + require.NoError(t, txStore.DeleteAll(testutils.Context(t))) + etx := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 7, 1, fromAddress) attempt := etx.TxAttempts[0] // Add receipt that is higher than head @@ -2821,6 +2916,26 @@ func TestEthConfirmer_EnsureConfirmedTransactionsInLongestChain(t *testing.T) { assert.Equal(t, txmgrtypes.TxAttemptBroadcast, attempt.State) assert.Len(t, attempt.Receipts, 1) }) + t.Run("mark finalized confirmed transactions with receipts older than head height of the chain but present in finalized block received from the RPC", func(t *testing.T) { + // clean up before running the test + require.NoError(t, txStore.DeleteAll(testutils.Context(t))) + + etx := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 3, 1, fromAddress) + // Add receipt that is older than the lowest block of the chain + finalizedBlock := head.Parent.Parent.Number - 3 + receipt := mustInsertEthReceipt(t, txStore, finalizedBlock, utils.NewHash(), etx.TxAttempts[0].Hash) + + ethClient.On("BatchGetReceiptsWithFinalizedHeight", mock.Anything, mock.Anything, + evmCfg.EVM().FinalityTagEnabled(), evmCfg.EVM().FinalityDepth(), + ).Return(big.NewInt(finalizedBlock), []*evmtypes.Receipt{&receipt.Receipt}, []error{nil}, nil).Once() + + // Do the thing + require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(testutils.Context(t), &head)) + + etx, err := txStore.FindTxWithAttempts(etx.ID) + require.NoError(t, err) + assert.Equal(t, txmgrcommon.TxFinalized, etx.State) + }) } func TestEthConfirmer_ForceRebroadcast(t *testing.T) { diff --git a/core/chains/evm/txmgr/evm_tx_store.go b/core/chains/evm/txmgr/evm_tx_store.go index ae986acee27..16fbbfd8d76 100644 --- a/core/chains/evm/txmgr/evm_tx_store.go +++ b/core/chains/evm/txmgr/evm_tx_store.go @@ -76,6 +76,7 @@ type TestEvmTxStore interface { CountTxesByStateAndSubject(ctx context.Context, state txmgrtypes.TxState, subject uuid.UUID) (count int, err error) FindTxesByFromAddressAndState(ctx context.Context, fromAddress common.Address, state string) (txes []*Tx, err error) UpdateTxAttemptBroadcastBeforeBlockNum(ctx context.Context, id int64, blockNum uint) error + DeleteAll(ctx context.Context) error } type evmTxStore struct { @@ -820,7 +821,7 @@ ORDER BY evm.txes.nonce ASC, evm.tx_attempts.gas_price DESC, evm.tx_attempts.gas return } -func (o *evmTxStore) SaveFetchedReceipts(ctx context.Context, r []*evmtypes.Receipt, chainID *big.Int) (err error) { +func (o *evmTxStore) saveFetchedReceipts(ctx context.Context, r []*evmtypes.Receipt, chainID *big.Int, txState txmgrtypes.TxState, attemptState txmgrtypes.TxAttemptState) (err error) { var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) defer cancel() @@ -884,20 +885,20 @@ func (o *evmTxStore) SaveFetchedReceipts(ctx context.Context, r []*evmtypes.Rece updated_eth_tx_attempts AS ( UPDATE evm.tx_attempts SET - state = 'broadcast', + state = '%s', broadcast_before_block_num = COALESCE(evm.tx_attempts.broadcast_before_block_num, inserted_receipts.block_number) FROM inserted_receipts WHERE inserted_receipts.tx_hash = evm.tx_attempts.hash RETURNING evm.tx_attempts.eth_tx_id ) UPDATE evm.txes - SET state = 'confirmed' + SET state = '%s' FROM updated_eth_tx_attempts WHERE updated_eth_tx_attempts.eth_tx_id = evm.txes.id AND evm_chain_id = ? ` - stmt := fmt.Sprintf(sql, strings.Join(valueStrs, ",")) + stmt := fmt.Sprintf(sql, strings.Join(valueStrs, ","), attemptState, txState) stmt = sqlx.Rebind(sqlx.DOLLAR, stmt) @@ -905,6 +906,36 @@ func (o *evmTxStore) SaveFetchedReceipts(ctx context.Context, r []*evmtypes.Rece return pkgerrors.Wrap(err, "SaveFetchedReceipts failed to save receipts") } +func (o *evmTxStore) SaveFetchedReceipts(ctx context.Context, r []*evmtypes.Receipt, chainID *big.Int) error { + return o.saveFetchedReceipts(ctx, r, chainID, "confirmed", txmgrtypes.TxAttemptBroadcast) +} + +func (o *evmTxStore) SaveFinalizedReceipts(ctx context.Context, r []*evmtypes.Receipt, chainID *big.Int) error { + return o.saveFetchedReceipts(ctx, r, chainID, "finalized", txmgrtypes.TxAttemptFinalized) +} + +func (o *evmTxStore) MarkFinalized(ctx context.Context, attempts []int64) error { + var cancel context.CancelFunc + ctx, cancel = o.mergeContexts(ctx) + defer cancel() + qq := o.q.WithOpts(pg.WithParentCtx(ctx)) + if len(attempts) == 0 { + return nil + } + + stmt := ` + WITH updated_eth_tx_attempts AS ( + UPDATE evm.tx_attempts SET state = 'finalized' WHERE id = ANY ($1) RETURNING evm.tx_attempts.eth_tx_id + ) + UPDATE evm.txes + SET state = 'finalized' + FROM updated_eth_tx_attempts + WHERE updated_eth_tx_attempts.eth_tx_id = evm.txes.id + ` + err := qq.ExecQ(stmt, pq.Array(attempts)) + return pkgerrors.Wrap(err, "MarkFinalized failed to update state") +} + // MarkAllConfirmedMissingReceipt // It is possible that we can fail to get a receipt for all evm.tx_attempts // even though a transaction with this nonce has long since been confirmed (we @@ -1109,7 +1140,7 @@ func (o *evmTxStore) UpdateTxForRebroadcast(ctx context.Context, etx Tx, etxAtte }) } -func (o *evmTxStore) FindTransactionsConfirmedInBlockRange(ctx context.Context, highBlockNumber, lowBlockNumber int64, chainID *big.Int) (etxs []*Tx, err error) { +func (o *evmTxStore) FindConfirmedTransactions(ctx context.Context, chainID *big.Int) (etxs []*Tx, err error) { var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) defer cancel() @@ -1120,21 +1151,21 @@ func (o *evmTxStore) FindTransactionsConfirmedInBlockRange(ctx context.Context, SELECT DISTINCT evm.txes.* FROM evm.txes INNER JOIN evm.tx_attempts ON evm.txes.id = evm.tx_attempts.eth_tx_id AND evm.tx_attempts.state = 'broadcast' INNER JOIN evm.receipts ON evm.receipts.tx_hash = evm.tx_attempts.hash -WHERE evm.txes.state IN ('confirmed', 'confirmed_missing_receipt') AND block_number BETWEEN $1 AND $2 AND evm_chain_id = $3 +WHERE evm.txes.state IN ('confirmed') AND evm_chain_id = $1 ORDER BY nonce ASC -`, lowBlockNumber, highBlockNumber, chainID.String()) +`, chainID.String()) if err != nil { - return pkgerrors.Wrap(err, "FindTransactionsConfirmedInBlockRange failed to load evm.txes") + return pkgerrors.Wrap(err, "FindConfirmedTransactions failed to load evm.txes") } etxs = make([]*Tx, len(dbEtxs)) dbEthTxsToEvmEthTxPtrs(dbEtxs, etxs) if err = o.LoadTxesAttempts(etxs, pg.WithParentCtx(ctx), pg.WithQueryer(tx)); err != nil { - return pkgerrors.Wrap(err, "FindTransactionsConfirmedInBlockRange failed to load evm.tx_attempts") + return pkgerrors.Wrap(err, "FindConfirmedTransactions failed to load evm.tx_attempts") } err = loadEthTxesAttemptsReceipts(tx, etxs) - return pkgerrors.Wrap(err, "FindTransactionsConfirmedInBlockRange failed to load evm.receipts") + return pkgerrors.Wrap(err, "FindConfirmedTransactions failed to load evm.receipts") }, pg.OptReadOnlyTx()) - return etxs, pkgerrors.Wrap(err, "FindTransactionsConfirmedInBlockRange failed") + return etxs, pkgerrors.Wrap(err, "FindConfirmedTransactions failed") } func (o *evmTxStore) FindEarliestUnconfirmedBroadcastTime(ctx context.Context, chainID *big.Int) (broadcastAt nullv4.Time, err error) { @@ -1170,25 +1201,6 @@ AND evm_chain_id = $1`, chainID.String()).Scan(&earliestUnconfirmedTxBlock) return earliestUnconfirmedTxBlock, err } -func (o *evmTxStore) IsTxFinalized(ctx context.Context, blockHeight int64, txID int64, chainID *big.Int) (finalized bool, err error) { - var cancel context.CancelFunc - ctx, cancel = o.mergeContexts(ctx) - defer cancel() - - var count int32 - qq := o.q.WithOpts(pg.WithParentCtx(ctx)) - err = qq.GetContext(ctx, &count, ` - SELECT COUNT(evm.receipts.receipt) FROM evm.txes - INNER JOIN evm.tx_attempts ON evm.txes.id = evm.tx_attempts.eth_tx_id - INNER JOIN evm.receipts ON evm.tx_attempts.hash = evm.receipts.tx_hash - WHERE evm.receipts.block_number <= ($1 - evm.txes.min_confirmations) - AND evm.txes.id = $2 AND evm.txes.evm_chain_id = $3`, blockHeight, txID, chainID.String()) - if err != nil { - return false, fmt.Errorf("failed to retrieve transaction reciepts: %w", err) - } - return count > 0, nil -} - func saveAttemptWithNewState(ctx context.Context, q pg.Queryer, logger logger.Logger, attempt TxAttempt, broadcastAt time.Time) error { var dbAttempt DbEthTxAttempt dbAttempt.FromTxAttempt(&attempt) @@ -1877,7 +1889,7 @@ id < ( return } -func (o *evmTxStore) ReapTxHistory(ctx context.Context, minBlockNumberToKeep int64, timeThreshold time.Time, chainID *big.Int) error { +func (o *evmTxStore) ReapTxHistory(ctx context.Context, timeThreshold time.Time, chainID *big.Int) error { var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) defer cancel() @@ -1887,19 +1899,18 @@ func (o *evmTxStore) ReapTxHistory(ctx context.Context, minBlockNumberToKeep int // the evm.tx_attempts and evm.receipts linked to every eth_tx err := pg.Batch(func(_, limit uint) (count uint, err error) { res, err := qq.Exec(` -WITH old_enough_receipts AS ( - SELECT tx_hash FROM evm.receipts - WHERE block_number < $1 - ORDER BY block_number ASC, id ASC - LIMIT $2 +WITH old_enough_finalized_txs AS ( + SELECT evm.txes.id FROM evm.txes + WHERE evm.txes.state = 'finalized' + AND evm_chain_id = $1 + AND evm.txes.created_at < $2 + ORDER BY evm.txes.id ASC + LIMIT $3 ) DELETE FROM evm.txes -USING old_enough_receipts, evm.tx_attempts -WHERE evm.tx_attempts.eth_tx_id = evm.txes.id -AND evm.tx_attempts.hash = old_enough_receipts.tx_hash -AND evm.txes.created_at < $3 -AND evm.txes.state = 'confirmed' -AND evm_chain_id = $4`, minBlockNumberToKeep, limit, timeThreshold, chainID.String()) +USING old_enough_finalized_txs +WHERE evm.txes.id = old_enough_finalized_txs.id +`, chainID.String(), timeThreshold, limit) if err != nil { return count, pkgerrors.Wrap(err, "ReapTxes failed to delete old confirmed evm.txes") } @@ -2083,3 +2094,13 @@ func (o *evmTxStore) mergeContexts(ctx context.Context) (context.Context, contex cancel(context.Canceled) } } + +// DeleteAll - removes all of the transactions and child entities. Only for tests +func (o *evmTxStore) DeleteAll(ctx context.Context) error { + var cancel context.CancelFunc + ctx, cancel = o.mergeContexts(ctx) + defer cancel() + qq := o.q.WithOpts(pg.WithParentCtx(ctx)) + _, err := qq.Exec(`DELETE FROM evm.txes`) + return err +} diff --git a/core/chains/evm/txmgr/evm_tx_store_test.go b/core/chains/evm/txmgr/evm_tx_store_test.go index 35d684727d1..05def8c9f6c 100644 --- a/core/chains/evm/txmgr/evm_tx_store_test.go +++ b/core/chains/evm/txmgr/evm_tx_store_test.go @@ -9,6 +9,7 @@ import ( commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config" "github.com/smartcontractkit/chainlink-common/pkg/logger" + txmgrcommon "github.com/smartcontractkit/chainlink/v2/common/txmgr" txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets" @@ -498,7 +499,44 @@ func TestORM_FindTxAttemptsRequiringReceiptFetch(t *testing.T) { assert.Equal(t, etx0.TxAttempts[0].ID, attempts[0].ID) } -func TestORM_SaveFetchedReceipts(t *testing.T) { +func TestORM_MarkFinalized(t *testing.T) { + t.Parallel() + + db := pgtest.NewSqlxDB(t) + cfg := newTestChainScopedConfig(t) + txStore := cltest.NewTestTxStore(t, db, cfg.Database()) + ethKeyStore := cltest.NewKeyStore(t, db, cfg.Database()).Eth() + _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore) + + blockNum := int64(42) + etx0 := mustInsertConfirmedEthTxWithReceipt(t, txStore, fromAddress, 0, blockNum) + newAttempt := cltest.NewLegacyEthTxAttempt(t, etx0.ID) + newAttempt.BroadcastBeforeBlockNum = &blockNum + newAttempt.State = txmgrtypes.TxAttemptBroadcast + // ensure that gas prices are unique + newAttempt.TxFee.Legacy = newAttempt.TxFee.Legacy.Add(assets.NewWei(big.NewInt(10))) + require.NoError(t, txStore.InsertTxAttempt(&newAttempt)) + + err := txStore.MarkFinalized(testutils.Context(t), []int64{etx0.TxAttempts[0].ID}) + require.NoError(t, err) + + etx1 := mustInsertConfirmedEthTxWithReceipt(t, txStore, fromAddress, 1, blockNum) + + etx0, err = txStore.FindTxWithAttempts(etx0.ID) + require.NoError(t, err) + require.Len(t, etx0.TxAttempts, 2) + // as we've finalized attempt with lower price - it will be second + require.Equal(t, txmgrtypes.TxAttemptBroadcast, etx0.TxAttempts[0].State) + require.Equal(t, txmgrtypes.TxAttemptFinalized, etx0.TxAttempts[1].State) + require.Equal(t, txmgrcommon.TxFinalized, etx0.State) + + etx1, err = txStore.FindTxWithAttempts(etx1.ID) + require.NoError(t, err) + require.Len(t, etx1.TxAttempts, 1) + require.Equal(t, txmgrcommon.TxConfirmed, etx1.State) +} + +func TestORM_SaveReceipts(t *testing.T) { t.Parallel() db := pgtest.NewSqlxDB(t) @@ -530,6 +568,17 @@ func TestORM_SaveFetchedReceipts(t *testing.T) { require.Len(t, etx0.TxAttempts[0].Receipts, 1) require.Equal(t, txmReceipt.BlockHash, etx0.TxAttempts[0].Receipts[0].GetBlockHash()) require.Equal(t, txmgrcommon.TxConfirmed, etx0.State) + + err = txStore.SaveFinalizedReceipts(testutils.Context(t), []*evmtypes.Receipt{&txmReceipt}, ethClient.ConfiguredChainID()) + require.NoError(t, err) + + etx0, err = txStore.FindTxWithAttempts(etx0.ID) + require.NoError(t, err) + require.Len(t, etx0.TxAttempts, 1) + require.Equal(t, etx0.TxAttempts[0].State, txmgrtypes.TxAttemptFinalized) + require.Len(t, etx0.TxAttempts[0].Receipts, 1) + require.Equal(t, txmReceipt.BlockHash, etx0.TxAttempts[0].Receipts[0].GetBlockHash()) + require.Equal(t, txmgrcommon.TxFinalized, etx0.State) } func TestORM_MarkAllConfirmedMissingReceipt(t *testing.T) { @@ -785,32 +834,7 @@ func TestORM_UpdateTxForRebroadcast(t *testing.T) { }) } -func TestORM_IsTxFinalized(t *testing.T) { - t.Parallel() - - db := pgtest.NewSqlxDB(t) - cfg := newTestChainScopedConfig(t) - txStore := cltest.NewTestTxStore(t, db, cfg.Database()) - ethClient := evmtest.NewEthClientMockWithDefaultChain(t) - - t.Run("confirmed tx not past finality_depth", func(t *testing.T) { - confirmedAddr := cltest.MustGenerateRandomKey(t).Address - tx := mustInsertConfirmedEthTxWithReceipt(t, txStore, confirmedAddr, 123, 1) - finalized, err := txStore.IsTxFinalized(testutils.Context(t), 2, tx.ID, ethClient.ConfiguredChainID()) - require.NoError(t, err) - require.False(t, finalized) - }) - - t.Run("confirmed tx past finality_depth", func(t *testing.T) { - confirmedAddr := cltest.MustGenerateRandomKey(t).Address - tx := mustInsertConfirmedEthTxWithReceipt(t, txStore, confirmedAddr, 123, 1) - finalized, err := txStore.IsTxFinalized(testutils.Context(t), 10, tx.ID, ethClient.ConfiguredChainID()) - require.NoError(t, err) - require.True(t, finalized) - }) -} - -func TestORM_FindTransactionsConfirmedInBlockRange(t *testing.T) { +func TestORM_FindConfirmedTransactions(t *testing.T) { t.Parallel() db := pgtest.NewSqlxDB(t) @@ -820,25 +844,15 @@ func TestORM_FindTransactionsConfirmedInBlockRange(t *testing.T) { ethClient := evmtest.NewEthClientMockWithDefaultChain(t) _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore) - head := evmtypes.Head{ - Hash: utils.NewHash(), - Number: 10, - Parent: &evmtypes.Head{ - Hash: utils.NewHash(), - Number: 9, - Parent: &evmtypes.Head{ - Number: 8, - Hash: utils.NewHash(), - Parent: nil, - }, - }, - } - - t.Run("find all transactions confirmed in range", func(t *testing.T) { + t.Run("find all confirmed transactions", func(t *testing.T) { etx_8 := mustInsertConfirmedEthTxWithReceipt(t, txStore, fromAddress, 700, 8) etx_9 := mustInsertConfirmedEthTxWithReceipt(t, txStore, fromAddress, 777, 9) + _ = mustInsertFatalErrorEthTx(t, txStore, fromAddress) + _ = mustInsertConfirmedMissingReceiptEthTxWithLegacyAttempt(t, txStore, 699, 7, + time.Unix(1616509100, 0), fromAddress) + _ = mustInsertInProgressEthTxWithAttempt(t, txStore, 778, fromAddress) - etxes, err := txStore.FindTransactionsConfirmedInBlockRange(testutils.Context(t), head.Number, 8, ethClient.ConfiguredChainID()) + etxes, err := txStore.FindConfirmedTransactions(testutils.Context(t), ethClient.ConfiguredChainID()) require.NoError(t, err) assert.Len(t, etxes, 2) assert.Equal(t, etxes[0].Sequence, etx_8.Sequence) diff --git a/core/chains/evm/txmgr/mocks/client.go b/core/chains/evm/txmgr/mocks/client.go new file mode 100644 index 00000000000..4d6a84299e3 --- /dev/null +++ b/core/chains/evm/txmgr/mocks/client.go @@ -0,0 +1,26 @@ +package mocks + +import ( + "math/big" + "testing" + + gethCommon "github.com/ethereum/go-ethereum/common" + + txmgrtypesmocks "github.com/smartcontractkit/chainlink/v2/common/txmgr/types/mocks" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas" + evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" +) + +type EvmTxmClient = txmgrtypesmocks.TxmClient[*big.Int, gethCommon.Address, gethCommon.Hash, gethCommon.Hash, *evmtypes.Receipt, evmtypes.Nonce, gas.EvmFee] + +func NewEvmTxmClient(t testing.TB) *EvmTxmClient { + return txmgrtypesmocks.NewTxmClient[*big.Int, gethCommon.Address, gethCommon.Hash, gethCommon.Hash, *evmtypes.Receipt, evmtypes.Nonce, gas.EvmFee](t) +} + +func NewEvmTxmClientWithDefaultChain(t testing.TB) *EvmTxmClient { + c := NewEvmTxmClient(t) + c.On("ConfiguredChainID").Return(testutils.FixtureChainID).Maybe() + c.On("IsL2").Return(false).Maybe() + return c +} diff --git a/core/chains/evm/txmgr/mocks/evm_tx_store.go b/core/chains/evm/txmgr/mocks/evm_tx_store.go index 9690bf9728d..a11b6ec71e6 100644 --- a/core/chains/evm/txmgr/mocks/evm_tx_store.go +++ b/core/chains/evm/txmgr/mocks/evm_tx_store.go @@ -199,6 +199,36 @@ func (_m *EvmTxStore) DeleteInProgressAttempt(ctx context.Context, attempt types return r0 } +// FindConfirmedTransactions provides a mock function with given fields: ctx, chainID +func (_m *EvmTxStore) FindConfirmedTransactions(ctx context.Context, chainID *big.Int) ([]*types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee], error) { + ret := _m.Called(ctx, chainID) + + if len(ret) == 0 { + panic("no return value specified for FindConfirmedTransactions") + } + + var r0 []*types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee] + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *big.Int) ([]*types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee], error)); ok { + return rf(ctx, chainID) + } + if rf, ok := ret.Get(0).(func(context.Context, *big.Int) []*types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee]); ok { + r0 = rf(ctx, chainID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee]) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *big.Int) error); ok { + r1 = rf(ctx, chainID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // FindEarliestUnconfirmedBroadcastTime provides a mock function with given fields: ctx, chainID func (_m *EvmTxStore) FindEarliestUnconfirmedBroadcastTime(ctx context.Context, chainID *big.Int) (null.Time, error) { ret := _m.Called(ctx, chainID) @@ -301,36 +331,6 @@ func (_m *EvmTxStore) FindNextUnstartedTransactionFromAddress(ctx context.Contex return r0 } -// FindTransactionsConfirmedInBlockRange provides a mock function with given fields: ctx, highBlockNumber, lowBlockNumber, chainID -func (_m *EvmTxStore) FindTransactionsConfirmedInBlockRange(ctx context.Context, highBlockNumber int64, lowBlockNumber int64, chainID *big.Int) ([]*types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee], error) { - ret := _m.Called(ctx, highBlockNumber, lowBlockNumber, chainID) - - if len(ret) == 0 { - panic("no return value specified for FindTransactionsConfirmedInBlockRange") - } - - var r0 []*types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee] - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, int64, int64, *big.Int) ([]*types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee], error)); ok { - return rf(ctx, highBlockNumber, lowBlockNumber, chainID) - } - if rf, ok := ret.Get(0).(func(context.Context, int64, int64, *big.Int) []*types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee]); ok { - r0 = rf(ctx, highBlockNumber, lowBlockNumber, chainID) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]*types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee]) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, int64, int64, *big.Int) error); ok { - r1 = rf(ctx, highBlockNumber, lowBlockNumber, chainID) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - // FindTxAttempt provides a mock function with given fields: hash func (_m *EvmTxStore) FindTxAttempt(hash common.Hash) (*types.TxAttempt[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee], error) { ret := _m.Called(hash) @@ -957,34 +957,6 @@ func (_m *EvmTxStore) HasInProgressTransaction(ctx context.Context, account comm return r0, r1 } -// IsTxFinalized provides a mock function with given fields: ctx, blockHeight, txID, chainID -func (_m *EvmTxStore) IsTxFinalized(ctx context.Context, blockHeight int64, txID int64, chainID *big.Int) (bool, error) { - ret := _m.Called(ctx, blockHeight, txID, chainID) - - if len(ret) == 0 { - panic("no return value specified for IsTxFinalized") - } - - var r0 bool - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, int64, int64, *big.Int) (bool, error)); ok { - return rf(ctx, blockHeight, txID, chainID) - } - if rf, ok := ret.Get(0).(func(context.Context, int64, int64, *big.Int) bool); ok { - r0 = rf(ctx, blockHeight, txID, chainID) - } else { - r0 = ret.Get(0).(bool) - } - - if rf, ok := ret.Get(1).(func(context.Context, int64, int64, *big.Int) error); ok { - r1 = rf(ctx, blockHeight, txID, chainID) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - // LoadTxAttempts provides a mock function with given fields: ctx, etx func (_m *EvmTxStore) LoadTxAttempts(ctx context.Context, etx *types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee]) error { ret := _m.Called(ctx, etx) @@ -1021,6 +993,24 @@ func (_m *EvmTxStore) MarkAllConfirmedMissingReceipt(ctx context.Context, chainI return r0 } +// MarkFinalized provides a mock function with given fields: ctx, attempts +func (_m *EvmTxStore) MarkFinalized(ctx context.Context, attempts []int64) error { + ret := _m.Called(ctx, attempts) + + if len(ret) == 0 { + panic("no return value specified for MarkFinalized") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, []int64) error); ok { + r0 = rf(ctx, attempts) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // MarkOldTxesMissingReceiptAsErrored provides a mock function with given fields: ctx, blockNum, finalityDepth, chainID func (_m *EvmTxStore) MarkOldTxesMissingReceiptAsErrored(ctx context.Context, blockNum int64, finalityDepth uint32, chainID *big.Int) error { ret := _m.Called(ctx, blockNum, finalityDepth, chainID) @@ -1087,17 +1077,17 @@ func (_m *EvmTxStore) PruneUnstartedTxQueue(ctx context.Context, queueSize uint3 return r0, r1 } -// ReapTxHistory provides a mock function with given fields: ctx, minBlockNumberToKeep, timeThreshold, chainID -func (_m *EvmTxStore) ReapTxHistory(ctx context.Context, minBlockNumberToKeep int64, timeThreshold time.Time, chainID *big.Int) error { - ret := _m.Called(ctx, minBlockNumberToKeep, timeThreshold, chainID) +// ReapTxHistory provides a mock function with given fields: ctx, timeThreshold, chainID +func (_m *EvmTxStore) ReapTxHistory(ctx context.Context, timeThreshold time.Time, chainID *big.Int) error { + ret := _m.Called(ctx, timeThreshold, chainID) if len(ret) == 0 { panic("no return value specified for ReapTxHistory") } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, int64, time.Time, *big.Int) error); ok { - r0 = rf(ctx, minBlockNumberToKeep, timeThreshold, chainID) + if rf, ok := ret.Get(0).(func(context.Context, time.Time, *big.Int) error); ok { + r0 = rf(ctx, timeThreshold, chainID) } else { r0 = ret.Error(0) } @@ -1141,6 +1131,24 @@ func (_m *EvmTxStore) SaveFetchedReceipts(ctx context.Context, receipts []*evmty return r0 } +// SaveFinalizedReceipts provides a mock function with given fields: ctx, receipts, chainID +func (_m *EvmTxStore) SaveFinalizedReceipts(ctx context.Context, receipts []*evmtypes.Receipt, chainID *big.Int) error { + ret := _m.Called(ctx, receipts, chainID) + + if len(ret) == 0 { + panic("no return value specified for SaveFinalizedReceipts") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, []*evmtypes.Receipt, *big.Int) error); ok { + r0 = rf(ctx, receipts, chainID) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // SaveInProgressAttempt provides a mock function with given fields: ctx, attempt func (_m *EvmTxStore) SaveInProgressAttempt(ctx context.Context, attempt *types.TxAttempt[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee]) error { ret := _m.Called(ctx, attempt) diff --git a/core/chains/evm/txmgr/reaper_test.go b/core/chains/evm/txmgr/reaper_test.go index a539f0ac8cb..fa257313bf0 100644 --- a/core/chains/evm/txmgr/reaper_test.go +++ b/core/chains/evm/txmgr/reaper_test.go @@ -6,10 +6,8 @@ import ( "time" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" "github.com/smartcontractkit/chainlink-common/pkg/logger" - "github.com/smartcontractkit/chainlink-common/pkg/utils" txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" txmgrmocks "github.com/smartcontractkit/chainlink/v2/common/txmgr/types/mocks" @@ -50,22 +48,22 @@ func TestReaper_ReapTxes(t *testing.T) { _, from := cltest.MustInsertRandomKey(t, ethKeyStore) var nonce int64 - oneDayAgo := time.Now().Add(-24 * time.Hour) t.Run("with nothing in the database, doesn't error", func(t *testing.T) { config := txmgrmocks.NewReaperConfig(t) - config.On("FinalityDepth").Return(uint32(10)) tc := &reaperConfig{reaperThreshold: 1 * time.Hour} r := newReaper(t, txStore, config, tc) - err := r.ReapTxes(42) + err := r.ReapTxes(time.Now()) assert.NoError(t, err) }) - // Confirmed in block number 5 - mustInsertConfirmedEthTxWithReceipt(t, txStore, from, nonce, 5) + // Finalized with confirmed receipt in block number 5 + mustInsertFinalizedEthTxWithReceipt(t, txStore, from, nonce, 5) + now := time.Now() + tomorrow := now.Add(24 * time.Hour) t.Run("skips if threshold=0", func(t *testing.T) { config := txmgrmocks.NewReaperConfig(t) @@ -74,7 +72,7 @@ func TestReaper_ReapTxes(t *testing.T) { r := newReaper(t, txStore, config, tc) - err := r.ReapTxes(42) + err := r.ReapTxes(tomorrow) assert.NoError(t, err) cltest.AssertCount(t, db, "evm.txes", 1) @@ -82,41 +80,34 @@ func TestReaper_ReapTxes(t *testing.T) { t.Run("doesn't touch ethtxes with different chain ID", func(t *testing.T) { config := txmgrmocks.NewReaperConfig(t) - config.On("FinalityDepth").Return(uint32(10)) tc := &reaperConfig{reaperThreshold: 1 * time.Hour} r := newReaperWithChainID(t, txStore, config, tc, big.NewInt(42)) - err := r.ReapTxes(42) + // use time in the future to ensure that just created txs match the filter + err := r.ReapTxes(tomorrow) assert.NoError(t, err) // Didn't delete because eth_tx has chain ID of 0 cltest.AssertCount(t, db, "evm.txes", 1) }) - t.Run("deletes confirmed evm.txes that exceed the age threshold with at least EVM.FinalityDepth blocks above their receipt", func(t *testing.T) { + t.Run("deletes finalized evm.txes that exceed the age threshold", func(t *testing.T) { config := txmgrmocks.NewReaperConfig(t) - config.On("FinalityDepth").Return(uint32(10)) tc := &reaperConfig{reaperThreshold: 1 * time.Hour} r := newReaper(t, txStore, config, tc) - err := r.ReapTxes(42) + err := r.ReapTxes(now) assert.NoError(t, err) // Didn't delete because eth_tx was not old enough cltest.AssertCount(t, db, "evm.txes", 1) - pgtest.MustExec(t, db, `UPDATE evm.txes SET created_at=$1`, oneDayAgo) - - err = r.ReapTxes(12) + // use time in the future to ensure that just created txs match the filter + err = r.ReapTxes(tomorrow) assert.NoError(t, err) - // Didn't delete because eth_tx although old enough, was still within EVM.FinalityDepth of the current head - cltest.AssertCount(t, db, "evm.txes", 1) - - err = r.ReapTxes(42) - assert.NoError(t, err) - // Now it deleted because the eth_tx was past EVM.FinalityDepth + // Now it deleted because the eth_tx was old enough cltest.AssertCount(t, db, "evm.txes", 0) }) @@ -124,22 +115,38 @@ func TestReaper_ReapTxes(t *testing.T) { t.Run("deletes errored evm.txes that exceed the age threshold", func(t *testing.T) { config := txmgrmocks.NewReaperConfig(t) - config.On("FinalityDepth").Return(uint32(10)) tc := &reaperConfig{reaperThreshold: 1 * time.Hour} r := newReaper(t, txStore, config, tc) - err := r.ReapTxes(42) + err := r.ReapTxes(now) assert.NoError(t, err) // Didn't delete because eth_tx was not old enough cltest.AssertCount(t, db, "evm.txes", 1) - require.NoError(t, utils.JustError(db.Exec(`UPDATE evm.txes SET created_at=$1`, oneDayAgo))) - - err = r.ReapTxes(42) + err = r.ReapTxes(tomorrow) assert.NoError(t, err) // Deleted because it is old enough now cltest.AssertCount(t, db, "evm.txes", 0) }) + t.Run("does not delete old confirmed txs", func(t *testing.T) { + mustInsertConfirmedEthTxWithReceipt(t, txStore, from, 2, 2) + + config := txmgrmocks.NewReaperConfig(t) + + tc := &reaperConfig{reaperThreshold: 1 * time.Hour} + + r := newReaper(t, txStore, config, tc) + + err := r.ReapTxes(now) + assert.NoError(t, err) + // Didn't delete because eth_tx not finalized + cltest.AssertCount(t, db, "evm.txes", 1) + + err = r.ReapTxes(tomorrow) + assert.NoError(t, err) + // Didn't delete old enough but not finalized + cltest.AssertCount(t, db, "evm.txes", 1) + }) } diff --git a/core/chains/evm/txmgr/tracker_test.go b/core/chains/evm/txmgr/tracker_test.go index d3083372789..c5f6ffcafd5 100644 --- a/core/chains/evm/txmgr/tracker_test.go +++ b/core/chains/evm/txmgr/tracker_test.go @@ -7,8 +7,11 @@ import ( "time" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" + evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils" ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/logger" @@ -99,7 +102,7 @@ func TestEvmTracker_AddressTracking(t *testing.T) { t.Skip("BCI-2638 tracker disabled") tracker, txStore, _, _ := newTestEvmTrackerSetup(t) confirmedAddr := cltest.MustGenerateRandomKey(t).Address - _ = mustInsertConfirmedEthTxWithReceipt(t, txStore, confirmedAddr, 123, 1) + etx := mustInsertConfirmedEthTxWithReceipt(t, txStore, confirmedAddr, 123, 1) err := tracker.Start(context.Background()) require.NoError(t, err) @@ -108,15 +111,21 @@ func TestEvmTracker_AddressTracking(t *testing.T) { require.NoError(t, err) }(tracker) - addrs := tracker.GetAbandonedAddresses() - require.Contains(t, addrs, confirmedAddr) - - // deliver block past minConfirmations to finalize tx - tracker.XXXDeliverBlock(10) - time.Sleep(waitTime) - - addrs = tracker.GetAbandonedAddresses() - require.NotContains(t, addrs, confirmedAddr) + // nothing happens - still tracking confirmed tx + require.NoError(t, tracker.HandleTxesByState(testutils.Context(t), 10)) + require.Contains(t, tracker.GetAbandonedAddresses(), confirmedAddr) + + // mark finalized + require.NoError(t, txStore.SaveFinalizedReceipts(testutils.Context(t), []*evmtypes.Receipt{&evmtypes.Receipt{ + TxHash: etx.TxAttempts[0].Hash, + BlockHash: utils.NewHash(), + BlockNumber: big.NewInt(123), + TransactionIndex: uint(1), + }}, etx.ChainID)) + + // remove transaction as it's now finalized + require.NoError(t, tracker.HandleTxesByState(testutils.Context(t), 11)) + require.NotContains(t, tracker.GetAbandonedAddresses(), confirmedAddr) }) } diff --git a/core/chains/evm/txmgr/txmgr_test.go b/core/chains/evm/txmgr/txmgr_test.go index 0e28f2948ee..fee9956119a 100644 --- a/core/chains/evm/txmgr/txmgr_test.go +++ b/core/chains/evm/txmgr/txmgr_test.go @@ -628,6 +628,13 @@ func mustInsertConfirmedEthTxWithReceipt(t *testing.T, txStore txmgr.TestEvmTxSt return etx } +func mustInsertFinalizedEthTxWithReceipt(t *testing.T, txStore txmgr.TestEvmTxStore, fromAddress common.Address, nonce, blockNum int64) (etx txmgr.Tx) { + etx = mustInsertConfirmedEthTxWithReceipt(t, txStore, fromAddress, nonce, blockNum) + err := txStore.MarkFinalized(testutils.Context(t), []int64{etx.TxAttempts[0].ID}) + require.NoError(t, err) + return etx +} + func mustInsertConfirmedEthTxBySaveFetchedReceipts(t *testing.T, txStore txmgr.TestEvmTxStore, fromAddress common.Address, nonce int64, blockNum int64, chainID big.Int) (etx txmgr.Tx) { etx = cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, nonce, blockNum, fromAddress) receipt := evmtypes.Receipt{ diff --git a/core/internal/cltest/cltest.go b/core/internal/cltest/cltest.go index 332513b28d4..695c02f1d8c 100644 --- a/core/internal/cltest/cltest.go +++ b/core/internal/cltest/cltest.go @@ -1284,6 +1284,11 @@ func MockApplicationEthCalls(t *testing.T, app *TestApplication, ethClient *evmc ethClient.On("Close").Return().Maybe() } +func RequireBatchElemMatchesParams(t *testing.T, req rpc.BatchElem, method string, args ...interface{}) { + require.Equal(t, method, req.Method) + require.Equal(t, args, req.Args) +} + func BatchElemMatchesParams(req rpc.BatchElem, arg interface{}, method string) bool { return req.Method == method && len(req.Args) == 1 && req.Args[0] == arg diff --git a/core/internal/cltest/factories.go b/core/internal/cltest/factories.go index 804dbe2d088..d017552e379 100644 --- a/core/internal/cltest/factories.go +++ b/core/internal/cltest/factories.go @@ -199,6 +199,14 @@ func MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t *testing.T, txStore } func MustInsertConfirmedEthTxWithLegacyAttempt(t *testing.T, txStore txmgr.TestEvmTxStore, nonce int64, broadcastBeforeBlockNum int64, fromAddress common.Address) txmgr.Tx { + return mustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, nonce, broadcastBeforeBlockNum, fromAddress, false) +} + +func MustInsertConfirmedMissingReceiptEthTxWithLegacyAttempt(t *testing.T, txStore txmgr.TestEvmTxStore, nonce int64, broadcastBeforeBlockNum int64, fromAddress common.Address) txmgr.Tx { + return mustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, nonce, broadcastBeforeBlockNum, fromAddress, true) +} + +func mustInsertConfirmedEthTxWithLegacyAttempt(t *testing.T, txStore txmgr.TestEvmTxStore, nonce int64, broadcastBeforeBlockNum int64, fromAddress common.Address, isMissingReceipt bool) txmgr.Tx { timeNow := time.Now() etx := NewEthTx(fromAddress) @@ -207,6 +215,9 @@ func MustInsertConfirmedEthTxWithLegacyAttempt(t *testing.T, txStore txmgr.TestE n := evmtypes.Nonce(nonce) etx.Sequence = &n etx.State = txmgrcommon.TxConfirmed + if isMissingReceipt { + etx.State = txmgrcommon.TxConfirmedMissingReceipt + } etx.MinConfirmations.SetValid(6) require.NoError(t, txStore.InsertTx(&etx)) attempt := NewLegacyEthTxAttempt(t, etx.ID) diff --git a/core/store/migrate/migrations/0223_add_finalized_tx_state.sql b/core/store/migrate/migrations/0223_add_finalized_tx_state.sql new file mode 100644 index 00000000000..537b2c384bc --- /dev/null +++ b/core/store/migrate/migrations/0223_add_finalized_tx_state.sql @@ -0,0 +1,15 @@ +-- +goose NO TRANSACTION +-- +goose Up + +-- NOTE: see 0222_add_finalized_tx_state_part_2 for more details. + +ALTER TYPE eth_txes_state ADD VALUE IF NOT EXISTS 'finalized' AFTER 'confirmed'; + +ALTER TYPE eth_tx_attempts_state ADD VALUE IF NOT EXISTS 'finalized' AFTER 'broadcast'; + + + + +-- +goose Down + +-- removal is handled in 0222_add_finalized_tx_state_part_2 \ No newline at end of file diff --git a/core/store/migrate/migrations/0224_add_finalized_tx_state_part_2.sql b/core/store/migrate/migrations/0224_add_finalized_tx_state_part_2.sql new file mode 100644 index 00000000000..5ccab8f9482 --- /dev/null +++ b/core/store/migrate/migrations/0224_add_finalized_tx_state_part_2.sql @@ -0,0 +1,80 @@ +-- +goose Up + +-- NOTE: 0221 & 0222 supposed to be single migration, but new enum values must be committed before they can be used. See notes section https://www.postgresql.org/docs/16/sql-altertype.html + +-- drop constraints that we might have added on migrate down +ALTER TABLE evm.txes DROP CONSTRAINT IF EXISTS eth_txes_state_finalized_removed; +ALTER TABLE evm.tx_attempts DROP CONSTRAINT IF EXISTS eth_tx_attempts_state_finalized_removed; + +-- allow tx_attempts to use finalized state +ALTER TABLE evm.tx_attempts DROP CONSTRAINT chk_eth_tx_attempts_fsm; +ALTER TABLE evm.tx_attempts ADD CONSTRAINT chk_eth_tx_attempts_fsm CHECK + ( + ((state = ANY (ARRAY['in_progress'::public.eth_tx_attempts_state, 'insufficient_eth'::public.eth_tx_attempts_state])) + AND (broadcast_before_block_num IS NULL)) + OR (state = ANY (ARRAY['broadcast'::public.eth_tx_attempts_state, 'finalized'::public.eth_tx_attempts_state])) + ) NOT VALID; + +-- allow txes to use finalized state +ALTER TABLE evm.txes DROP CONSTRAINT chk_eth_txes_fsm; +ALTER TABLE evm.txes ADD CONSTRAINT chk_eth_txes_fsm CHECK ( + state = 'unstarted'::eth_txes_state AND nonce IS NULL AND error IS NULL AND broadcast_at IS NULL AND initial_broadcast_at IS NULL + OR + state = 'in_progress'::eth_txes_state AND nonce IS NOT NULL AND error IS NULL AND broadcast_at IS NULL AND initial_broadcast_at IS NULL + OR + state = 'fatal_error'::eth_txes_state AND nonce IS NULL AND error IS NOT NULL + OR + state = 'unconfirmed'::eth_txes_state AND nonce IS NOT NULL AND error IS NULL AND broadcast_at IS NOT NULL AND initial_broadcast_at IS NOT NULL + OR + state = 'confirmed'::eth_txes_state AND nonce IS NOT NULL AND error IS NULL AND broadcast_at IS NOT NULL AND initial_broadcast_at IS NOT NULL + OR + state = 'confirmed_missing_receipt'::eth_txes_state AND nonce IS NOT NULL AND error IS NULL AND broadcast_at IS NOT NULL AND initial_broadcast_at IS NOT NULL + OR + state = 'finalized'::eth_txes_state AND nonce IS NOT NULL AND error IS NULL AND broadcast_at IS NOT NULL AND initial_broadcast_at IS NOT NULL + ) NOT VALID; + +-- if the ReaperThreshold config value is high, we might endup with large number of 'confirmed' transactions. +-- Lets mark old transactions as finalized to prevent unnecessary RPC calls +UPDATE evm.txes set state = 'finalized'::eth_txes_state WHERE state = 'confirmed'::eth_txes_state AND created_at < (now() - interval '240 hours'); + +-- Mark the most recent attempts as finalized for old transactions to ensure that we do not violate an invariant that each finalized tx has a single finalized attempt. +WITH most_recent_attempts as ( + SELECT MAX(evm.tx_attempts.id) as id + FROM evm.tx_attempts JOIN evm.txes ON evm.tx_attempts.eth_tx_id = evm.txes.id + WHERE evm.txes.state = 'finalized'::eth_txes_state + GROUP BY evm.tx_attempts.eth_tx_id +) +UPDATE evm.tx_attempts set state = 'finalized'::eth_tx_attempts_state FROM most_recent_attempts WHERE evm.tx_attempts.id = most_recent_attempts.id; + +-- +goose Down + +-- it's not possible to remove label from the enum. The only option is to restrict it's usage; +UPDATE evm.txes set state = 'confirmed'::eth_txes_state WHERE state = 'finalized'::eth_txes_state; +ALTER TABLE evm.txes ADD CONSTRAINT eth_txes_state_finalized_removed CHECK (state <> 'finalized'::eth_txes_state); + +UPDATE evm.tx_attempts set state = 'broadcast'::eth_tx_attempts_state WHERE state = 'finalized'::eth_tx_attempts_state; +ALTER TABLE evm.tx_attempts ADD CONSTRAINT eth_tx_attempts_state_finalized_removed CHECK (state <> 'finalized'::eth_tx_attempts_state); + +-- rollback changes to the constraints +ALTER TABLE evm.tx_attempts DROP CONSTRAINT chk_eth_tx_attempts_fsm; +ALTER TABLE evm.tx_attempts ADD CONSTRAINT chk_eth_tx_attempts_fsm CHECK + (( + ((state = ANY (ARRAY['in_progress'::public.eth_tx_attempts_state, 'insufficient_eth'::public.eth_tx_attempts_state])) + AND (broadcast_before_block_num IS NULL)) + OR (state = 'broadcast'::public.eth_tx_attempts_state) + )) NOT VALID; + +ALTER TABLE evm.txes DROP CONSTRAINT chk_eth_txes_fsm; +ALTER TABLE evm.txes ADD CONSTRAINT chk_eth_txes_fsm CHECK ( + state = 'unstarted'::eth_txes_state AND nonce IS NULL AND error IS NULL AND broadcast_at IS NULL AND initial_broadcast_at IS NULL + OR + state = 'in_progress'::eth_txes_state AND nonce IS NOT NULL AND error IS NULL AND broadcast_at IS NULL AND initial_broadcast_at IS NULL + OR + state = 'fatal_error'::eth_txes_state AND nonce IS NULL AND error IS NOT NULL + OR + state = 'unconfirmed'::eth_txes_state AND nonce IS NOT NULL AND error IS NULL AND broadcast_at IS NOT NULL AND initial_broadcast_at IS NOT NULL + OR + state = 'confirmed'::eth_txes_state AND nonce IS NOT NULL AND error IS NULL AND broadcast_at IS NOT NULL AND initial_broadcast_at IS NOT NULL + OR + state = 'confirmed_missing_receipt'::eth_txes_state AND nonce IS NOT NULL AND error IS NULL AND broadcast_at IS NOT NULL AND initial_broadcast_at IS NOT NULL + ) NOT VALID; \ No newline at end of file diff --git a/core/web/evm_transactions_controller.go b/core/web/evm_transactions_controller.go index 2b2fd2d554f..ed381e5dd11 100644 --- a/core/web/evm_transactions_controller.go +++ b/core/web/evm_transactions_controller.go @@ -4,6 +4,7 @@ import ( "database/sql" "net/http" + txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" "github.com/smartcontractkit/chainlink/v2/core/services/chainlink" "github.com/smartcontractkit/chainlink/v2/core/web/presenters" @@ -22,8 +23,16 @@ func (tc *TransactionsController) Index(c *gin.Context, size, page, offset int) txs, count, err := tc.App.TxmStorageService().TransactionsWithAttempts(offset, size) ptxs := make([]presenters.EthTxResource, len(txs)) for i, tx := range txs { - tx.TxAttempts[0].Tx = tx - ptxs[i] = presenters.NewEthTxResourceFromAttempt(tx.TxAttempts[0]) + attempt := tx.TxAttempts[0] + // prefer finalized attempt to any other + for _, at := range tx.TxAttempts { + if at.State == txmgrtypes.TxAttemptFinalized { + attempt = at + break + } + } + attempt.Tx = tx + ptxs[i] = presenters.NewEthTxResourceFromAttempt(attempt) } paginatedResponse(c, "transactions", size, page, ptxs, count, err) } diff --git a/core/web/evm_transactions_controller_test.go b/core/web/evm_transactions_controller_test.go index 9135be432de..87a74e049a6 100644 --- a/core/web/evm_transactions_controller_test.go +++ b/core/web/evm_transactions_controller_test.go @@ -32,15 +32,21 @@ func TestTransactionsController_Index_Success(t *testing.T) { cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 0, 1, from) // tx1 tx2 := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 3, 2, from) // tx2 - cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 4, 4, from) // tx3 - - // add second tx attempt for tx2 - blockNum := int64(3) - attempt := cltest.NewLegacyEthTxAttempt(t, tx2.ID) - attempt.State = txmgrtypes.TxAttemptBroadcast - attempt.TxFee = gas.EvmFee{Legacy: assets.NewWeiI(3)} - attempt.BroadcastBeforeBlockNum = &blockNum - require.NoError(t, txStore.InsertTxAttempt(&attempt)) + + // add second tx attempt for tx2 with higher block num + newAttempt := func(txID int64, block int64) { + attempt := cltest.NewLegacyEthTxAttempt(t, txID) + attempt.State = txmgrtypes.TxAttemptBroadcast + attempt.TxFee = gas.EvmFee{Legacy: assets.NewWeiI(3)} + attempt.BroadcastBeforeBlockNum = &block + require.NoError(t, txStore.InsertTxAttempt(&attempt)) + } + newAttempt(tx2.ID, 3) + + tx3 := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 4, 4, from) // tx3 + // add second tx attempt for tx3 with higher block num, but finalized first one + newAttempt(tx3.ID, 5) + require.NoError(t, txStore.MarkFinalized(testutils.Context(t), []int64{tx3.TxAttempts[0].ID})) _, count, err := txStore.TransactionsWithAttempts(0, 100) require.NoError(t, err)