diff --git a/core/chains/evm/config/toml/config.go b/core/chains/evm/config/toml/config.go index 0f8b1eceee5..36c7d0f052b 100644 --- a/core/chains/evm/config/toml/config.go +++ b/core/chains/evm/config/toml/config.go @@ -6,6 +6,7 @@ import ( "net/url" "slices" "strconv" + "time" "github.com/ethereum/go-ethereum/core/txpool/legacypool" "github.com/pelletier/go-toml/v2" @@ -451,6 +452,20 @@ func (c *Chain) ValidateConfig() (err error) { err = multierr.Append(err, commonconfig.ErrInvalid{Name: "GasEstimator.BumpThreshold", Value: 0, Msg: fmt.Sprintf("cannot be 0 if Transactions.AutoPurge.MinAttempts is set for %s", chainType)}) } } + case chaintype.ChainDualBroadcast: + if c.Transactions.AutoPurge.DetectionApiUrl == nil { + err = multierr.Append(err, commonconfig.ErrMissing{Name: "Transactions.AutoPurge.DetectionApiUrl", Msg: fmt.Sprintf("must be set for %s", chainType)}) + } + if c.Transactions.AutoPurge.Threshold == nil { + err = multierr.Append(err, commonconfig.ErrMissing{Name: "Transactions.AutoPurge.Threshold", Msg: fmt.Sprintf("needs to be set if auto-purge feature is enabled for %s", chainType)}) + } else if *c.Transactions.AutoPurge.Threshold == 0 { + err = multierr.Append(err, commonconfig.ErrInvalid{Name: "Transactions.AutoPurge.Threshold", Value: 0, Msg: fmt.Sprintf("cannot be 0 if auto-purge feature is enabled for %s", chainType)}) + } + if c.TxmV2.Enabled != nil && *c.TxmV2.Enabled { + if c.TxmV2.CustomURL == nil { + err = multierr.Append(err, commonconfig.ErrMissing{Name: "TxmV2.CustomURL", Msg: fmt.Sprintf("must be set for %s", chainType)}) + } + } default: // Bump Threshold is required because the stuck tx heuristic relies on a minimum number of bump attempts to exist if c.GasEstimator.BumpThreshold == nil { @@ -494,6 +509,18 @@ func (t *TxmV2) setFrom(f *TxmV2) { } } +func (t *TxmV2) ValidateConfig() (err error) { + if t.Enabled != nil && *t.Enabled { + if t.BlockTime == nil { + err = multierr.Append(err, commonconfig.ErrMissing{Name: "TxmV2.BlockTime", Msg: "must be set if txmv2 feature is enabled"}) + } + if t.BlockTime.Duration() < 2*time.Second { + err = multierr.Append(err, commonconfig.ErrInvalid{Name: "TxmV2.BlockTime", Msg: "must be equal to or greater than 2 seconds"}) + } + } + return +} + type Transactions struct { ForwardersEnabled *bool MaxInFlight *uint32 diff --git a/core/chains/evm/txm/attempt_builder.go b/core/chains/evm/txm/attempt_builder.go index fd23bf867e7..16ed0f1a86a 100644 --- a/core/chains/evm/txm/attempt_builder.go +++ b/core/chains/evm/txm/attempt_builder.go @@ -20,23 +20,23 @@ type AttemptBuilderKeystore interface { } type attemptBuilder struct { - chainID *big.Int - priceMax *assets.Wei gas.EvmFeeEstimator - keystore AttemptBuilderKeystore + chainID *big.Int + priceMaxKey func(common.Address) *assets.Wei + keystore AttemptBuilderKeystore } -func NewAttemptBuilder(chainID *big.Int, priceMax *assets.Wei, estimator gas.EvmFeeEstimator, keystore AttemptBuilderKeystore) *attemptBuilder { +func NewAttemptBuilder(chainID *big.Int, priceMaxKey func(common.Address) *assets.Wei, estimator gas.EvmFeeEstimator, keystore AttemptBuilderKeystore) *attemptBuilder { return &attemptBuilder{ chainID: chainID, - priceMax: priceMax, + priceMaxKey: priceMaxKey, EvmFeeEstimator: estimator, keystore: keystore, } } func (a *attemptBuilder) NewAttempt(ctx context.Context, lggr logger.Logger, tx *types.Transaction, dynamic bool) (*types.Attempt, error) { - fee, estimatedGasLimit, err := a.EvmFeeEstimator.GetFee(ctx, tx.Data, tx.SpecifiedGasLimit, a.priceMax, &tx.FromAddress, &tx.ToAddress) + fee, estimatedGasLimit, err := a.EvmFeeEstimator.GetFee(ctx, tx.Data, tx.SpecifiedGasLimit, a.priceMaxKey(tx.FromAddress), &tx.FromAddress, &tx.ToAddress) if err != nil { return nil, err } @@ -48,7 +48,7 @@ func (a *attemptBuilder) NewAttempt(ctx context.Context, lggr logger.Logger, tx } func (a *attemptBuilder) NewBumpAttempt(ctx context.Context, lggr logger.Logger, tx *types.Transaction, previousAttempt types.Attempt) (*types.Attempt, error) { - bumpedFee, bumpedFeeLimit, err := a.EvmFeeEstimator.BumpFee(ctx, previousAttempt.Fee, tx.SpecifiedGasLimit, a.priceMax, nil) + bumpedFee, bumpedFeeLimit, err := a.EvmFeeEstimator.BumpFee(ctx, previousAttempt.Fee, tx.SpecifiedGasLimit, a.priceMaxKey(tx.FromAddress), nil) if err != nil { return nil, err } @@ -114,6 +114,7 @@ func (a *attemptBuilder) newLegacyAttempt(ctx context.Context, tx *types.Transac Fee: gas.EvmFee{GasPrice: gasPrice}, Hash: signedTx.Hash(), GasLimit: estimatedGasLimit, + Type: evmtypes.LegacyTxType, SignedTransaction: signedTx, } diff --git a/core/chains/evm/txm/clientwrappers/dual_broadcast_client.go b/core/chains/evm/txm/clientwrappers/dual_broadcast_client.go index 0bbd2530765..77111c2d48c 100644 --- a/core/chains/evm/txm/clientwrappers/dual_broadcast_client.go +++ b/core/chains/evm/txm/clientwrappers/dual_broadcast_client.go @@ -42,7 +42,7 @@ func (d *DualBroadcastClient) NonceAt(ctx context.Context, address common.Addres } func (d *DualBroadcastClient) PendingNonceAt(ctx context.Context, address common.Address) (uint64, error) { - body := []byte(fmt.Sprintf(`{"jsonrpc":"2.0","method":"eth_getTransactionCount","params":["%s","pending"]}`, address.String())) + body := []byte(fmt.Sprintf(`{"jsonrpc":"2.0","method":"eth_getTransactionCount","params":["%s","pending"], "id":1}`, address.String())) response, err := d.signAndPostMessage(ctx, address, body, "") if err != nil { return 0, err @@ -70,7 +70,7 @@ func (d *DualBroadcastClient) SendTransaction(ctx context.Context, tx *types.Tra if meta.DualBroadcastParams != nil { params = *meta.DualBroadcastParams } - body := []byte(fmt.Sprintf(`{"jsonrpc":"2.0","method":"eth_sendRawTransaction","params":["%s"]}`, hexutil.Encode(data))) + body := []byte(fmt.Sprintf(`{"jsonrpc":"2.0","method":"eth_sendRawTransaction","params":["%s"], "id":1}`, hexutil.Encode(data))) if _, err = d.signAndPostMessage(ctx, tx.FromAddress, body, params); err != nil { return err } diff --git a/core/chains/evm/txm/metrics.go b/core/chains/evm/txm/metrics.go new file mode 100644 index 00000000000..5ccc711ef09 --- /dev/null +++ b/core/chains/evm/txm/metrics.go @@ -0,0 +1,93 @@ +package txm + +import ( + "context" + "fmt" + "math/big" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "go.opentelemetry.io/otel/metric" + + "github.com/smartcontractkit/chainlink-common/pkg/beholder" + "github.com/smartcontractkit/chainlink-common/pkg/metrics" +) + +var ( + promNumBroadcastedTxs = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "txm_num_broadcasted_transactions", + Help: "Total number of successful broadcasted transactions.", + }, []string{"chainID"}) + promNumConfirmedTxs = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "txm_num_confirmed_transactions", + Help: "Total number of confirmed transactions. Note that this can happen multiple times per transaction in the case of re-orgs or when filling the nonce for untracked transactions.", + }, []string{"chainID"}) + promNumNonceGaps = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "txm_num_nonce_gaps", + Help: "Total number of nonce gaps created that the transaction manager had to fill.", + }, []string{"chainID"}) + promTimeUntilTxConfirmed = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "txm_time_until_tx_confirmed", + Help: "The amount of time elapsed from a transaction being broadcast to being included in a block.", + }, []string{"chainID"}) +) + +type txmMetrics struct { + metrics.Labeler + chainID *big.Int + numBroadcastedTxs metric.Int64Counter + numConfirmedTxs metric.Int64Counter + numNonceGaps metric.Int64Counter + timeUntilTxConfirmed metric.Float64Histogram +} + +func NewTxmMetrics(chainID *big.Int) (*txmMetrics, error) { + numBroadcastedTxs, err := beholder.GetMeter().Int64Counter("txm_num_broadcasted_transactions") + if err != nil { + return nil, fmt.Errorf("failed to register broadcasted txs number: %w", err) + } + + numConfirmedTxs, err := beholder.GetMeter().Int64Counter("txm_num_confirmed_transactions") + if err != nil { + return nil, fmt.Errorf("failed to register confirmed txs number: %w", err) + } + + numNonceGaps, err := beholder.GetMeter().Int64Counter("txm_num_nonce_gaps") + if err != nil { + return nil, fmt.Errorf("failed to register nonce gaps number: %w", err) + } + + timeUntilTxConfirmed, err := beholder.GetMeter().Float64Histogram("txm_time_until_tx_confirmed") + if err != nil { + return nil, fmt.Errorf("failed to register time until tx confirmed: %w", err) + } + + return &txmMetrics{ + chainID: chainID, + Labeler: metrics.NewLabeler().With("chainID", chainID.String()), + numBroadcastedTxs: numBroadcastedTxs, + numConfirmedTxs: numConfirmedTxs, + numNonceGaps: numNonceGaps, + timeUntilTxConfirmed: timeUntilTxConfirmed, + }, nil +} + +func (m *txmMetrics) IncrementNumBroadcastedTxs(ctx context.Context) { + promNumBroadcastedTxs.WithLabelValues(m.chainID.String()).Add(float64(1)) + m.numBroadcastedTxs.Add(ctx, 1) +} + +func (m *txmMetrics) IncrementNumConfirmedTxs(ctx context.Context, confirmedTransactions int) { + promNumConfirmedTxs.WithLabelValues(m.chainID.String()).Add(float64(confirmedTransactions)) + m.numConfirmedTxs.Add(ctx, int64(confirmedTransactions)) +} + +func (m *txmMetrics) IncrementNumNonceGaps(ctx context.Context) { + promNumNonceGaps.WithLabelValues(m.chainID.String()).Add(float64(1)) + m.numNonceGaps.Add(ctx, 1) +} + +func (m *txmMetrics) RecordTimeUntilTxConfirmed(ctx context.Context, duration float64) { + promTimeUntilTxConfirmed.WithLabelValues(m.chainID.String()).Observe(duration) + m.timeUntilTxConfirmed.Record(ctx, duration) +} diff --git a/core/chains/evm/txm/orchestrator.go b/core/chains/evm/txm/orchestrator.go index 8915a534253..3b5a4a18ce1 100644 --- a/core/chains/evm/txm/orchestrator.go +++ b/core/chains/evm/txm/orchestrator.go @@ -31,10 +31,11 @@ import ( type OrchestratorTxStore interface { Add(addresses ...common.Address) error FetchUnconfirmedTransactionAtNonceWithCount(context.Context, uint64, common.Address) (*txmtypes.Transaction, int, error) - FindTxWithIdempotencyKey(context.Context, *string) (*txmtypes.Transaction, error) + FindTxWithIdempotencyKey(context.Context, string) (*txmtypes.Transaction, error) } type OrchestratorKeystore interface { + CheckEnabled(ctx context.Context, address common.Address, chainID *big.Int) error EnabledAddressesForChain(ctx context.Context, chainID *big.Int) (addresses []common.Address, err error) } @@ -120,15 +121,15 @@ func (o *Orchestrator[BLOCK_HASH, HEAD]) Close() (merr error) { merr = errors.Join(merr, fmt.Errorf("Orchestrator failed to stop ForwarderManager: %w", err)) } } + if err := o.txm.Close(); err != nil { + merr = errors.Join(merr, fmt.Errorf("Orchestrator failed to stop Txm: %w", err)) + } if err := o.attemptBuilder.Close(); err != nil { // TODO: hacky fix for DualBroadcast if !strings.Contains(err.Error(), "already been stopped") { merr = errors.Join(merr, fmt.Errorf("Orchestrator failed to stop AttemptBuilder: %w", err)) } } - if err := o.txm.Close(); err != nil { - merr = errors.Join(merr, fmt.Errorf("Orchestrator failed to stop Txm: %w", err)) - } return merr }) } @@ -172,14 +173,20 @@ func (o *Orchestrator[BLOCK_HASH, HEAD]) OnNewLongestChain(ctx context.Context, func (o *Orchestrator[BLOCK_HASH, HEAD]) CreateTransaction(ctx context.Context, request txmgrtypes.TxRequest[common.Address, common.Hash]) (tx txmgrtypes.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee], err error) { var wrappedTx *txmtypes.Transaction - wrappedTx, err = o.txStore.FindTxWithIdempotencyKey(ctx, request.IdempotencyKey) - if err != nil { - return + if request.IdempotencyKey != nil { + wrappedTx, err = o.txStore.FindTxWithIdempotencyKey(ctx, *request.IdempotencyKey) + if err != nil { + return + } } if wrappedTx != nil { o.lggr.Infof("Found Tx with IdempotencyKey: %v. Returning existing Tx without creating a new one.", *wrappedTx.IdempotencyKey) } else { + if kErr := o.keystore.CheckEnabled(ctx, request.FromAddress, o.chainID); kErr != nil { + return tx, fmt.Errorf("cannot send transaction from %s on chain ID %s: %w", request.FromAddress, o.chainID.String(), kErr) + } + var pipelineTaskRunID uuid.NullUUID if request.PipelineTaskRunID != nil { pipelineTaskRunID.UUID = *request.PipelineTaskRunID @@ -324,7 +331,7 @@ func (o *Orchestrator[BLOCK_HASH, HEAD]) GetForwarderForEOAOCR2Feeds(ctx context func (o *Orchestrator[BLOCK_HASH, HEAD]) GetTransactionStatus(ctx context.Context, transactionID string) (status commontypes.TransactionStatus, err error) { // Loads attempts and receipts in the transaction - tx, err := o.txStore.FindTxWithIdempotencyKey(ctx, &transactionID) + tx, err := o.txStore.FindTxWithIdempotencyKey(ctx, transactionID) if err != nil || tx == nil { return status, fmt.Errorf("failed to find transaction with IdempotencyKey %s: %w", transactionID, err) } diff --git a/core/chains/evm/txm/storage/inmemory_store.go b/core/chains/evm/txm/storage/inmemory_store.go index 013f7844a44..2f5cde53a56 100644 --- a/core/chains/evm/txm/storage/inmemory_store.go +++ b/core/chains/evm/txm/storage/inmemory_store.go @@ -42,19 +42,24 @@ func NewInMemoryStore(lggr logger.Logger, address common.Address, chainID *big.I lggr: logger.Named(lggr, "InMemoryStore"), address: address, chainID: chainID, + UnstartedTransactions: make([]*types.Transaction, 0, maxQueuedTransactions), UnconfirmedTransactions: make(map[uint64]*types.Transaction), - ConfirmedTransactions: make(map[uint64]*types.Transaction), + ConfirmedTransactions: make(map[uint64]*types.Transaction, maxQueuedTransactions), Transactions: make(map[uint64]*types.Transaction), } } func (m *InMemoryStore) AbandonPendingTransactions() { + // TODO: append existing fatal transactions and cap the size m.Lock() defer m.Unlock() for _, tx := range m.UnstartedTransactions { tx.State = types.TxFatalError } + for _, tx := range m.FatalTransactions { + delete(m.Transactions, tx.ID) + } m.FatalTransactions = m.UnstartedTransactions m.UnstartedTransactions = []*types.Transaction{} @@ -97,7 +102,6 @@ func (m *InMemoryStore) CreateEmptyUnconfirmedTransaction(nonce uint64, gasLimit m.Lock() defer m.Unlock() - m.txIDCount++ emptyTx := &types.Transaction{ ID: m.txIDCount, ChainID: m.chainID, @@ -114,10 +118,11 @@ func (m *InMemoryStore) CreateEmptyUnconfirmedTransaction(nonce uint64, gasLimit return nil, fmt.Errorf("an unconfirmed tx with the same nonce already exists: %v", m.UnconfirmedTransactions[nonce]) } - if _, exists := m.Transactions[nonce]; exists { - return nil, fmt.Errorf("a tx with the same nonce already exists: %v", m.Transactions[nonce]) + if _, exists := m.ConfirmedTransactions[nonce]; exists { + return nil, fmt.Errorf("a confirmed tx with the same nonce already exists: %v", m.ConfirmedTransactions[nonce]) } + m.txIDCount++ m.UnconfirmedTransactions[nonce] = emptyTx m.Transactions[emptyTx.ID] = emptyTx @@ -128,8 +133,6 @@ func (m *InMemoryStore) CreateTransaction(txRequest *types.TxRequest) *types.Tra m.Lock() defer m.Unlock() - m.txIDCount++ - tx := &types.Transaction{ ID: m.txIDCount, IdempotencyKey: txRequest.IdempotencyKey, @@ -147,13 +150,17 @@ func (m *InMemoryStore) CreateTransaction(txRequest *types.TxRequest) *types.Tra SignalCallback: txRequest.SignalCallback, } - if len(m.UnstartedTransactions) == maxQueuedTransactions { - m.lggr.Warnf("Unstarted transactions queue for address: %v reached max limit of: %d. Dropping oldest transaction: %v.", - m.address, maxQueuedTransactions, m.UnstartedTransactions[0]) - delete(m.Transactions, m.UnstartedTransactions[0].ID) - m.UnstartedTransactions = m.UnstartedTransactions[1:maxQueuedTransactions] + uLen := len(m.UnstartedTransactions) + if uLen >= maxQueuedTransactions { + m.lggr.Warnw(fmt.Sprintf("Unstarted transactions queue for address: %v reached max limit of: %d. Dropping oldest transactions", m.address, maxQueuedTransactions), + "txs", m.UnstartedTransactions[0:uLen-maxQueuedTransactions+1]) // need to make room for the new tx + for _, tx := range m.UnstartedTransactions[0 : uLen-maxQueuedTransactions+1] { + delete(m.Transactions, tx.ID) + } + m.UnstartedTransactions = m.UnstartedTransactions[uLen-maxQueuedTransactions+1:] } + m.txIDCount++ txCopy := tx.DeepCopy() m.Transactions[txCopy.ID] = txCopy m.UnstartedTransactions = append(m.UnstartedTransactions, txCopy) @@ -181,6 +188,11 @@ func (m *InMemoryStore) MarkConfirmedAndReorgedTransactions(latestNonce uint64) if tx.Nonce == nil { return nil, nil, fmt.Errorf("nonce for txID: %v is empty", tx.ID) } + existingTx, exists := m.ConfirmedTransactions[*tx.Nonce] + if exists { + m.lggr.Errorw("Another confirmed transaction with the same nonce exists. Transaction will overwritten.", + "existingTx", existingTx, "newTx", tx) + } if *tx.Nonce < latestNonce { tx.State = types.TxConfirmed confirmedTransactions = append(confirmedTransactions, tx.DeepCopy()) @@ -194,19 +206,24 @@ func (m *InMemoryStore) MarkConfirmedAndReorgedTransactions(latestNonce uint64) if tx.Nonce == nil { return nil, nil, fmt.Errorf("nonce for txID: %v is empty", tx.ID) } + existingTx, exists := m.UnconfirmedTransactions[*tx.Nonce] + if exists { + m.lggr.Errorw("Another unconfirmed transaction with the same nonce exists. Transaction will overwritten.", + "existingTx", existingTx, "newTx", tx) + } if *tx.Nonce >= latestNonce { tx.State = types.TxUnconfirmed - tx.LastBroadcastAt = time.Time{} // Mark reorged transaction as if it wasn't broadcasted before + tx.LastBroadcastAt = nil // Mark reorged transaction as if it wasn't broadcasted before unconfirmedTransactionIDs = append(unconfirmedTransactionIDs, tx.ID) m.UnconfirmedTransactions[*tx.Nonce] = tx delete(m.ConfirmedTransactions, *tx.Nonce) } } - if len(m.ConfirmedTransactions) >= maxQueuedTransactions { + if len(m.ConfirmedTransactions) > maxQueuedTransactions { prunedTxIDs := m.pruneConfirmedTransactions() - m.lggr.Debugf("Confirmed transactions map for address: %v reached max limit of: %d. Pruned 1/3 of the oldest confirmed transactions. TxIDs: %v", - m.address, maxQueuedTransactions, prunedTxIDs) + m.lggr.Debugf("Confirmed transactions map for address: %v reached max limit of: %d. Pruned 1/%d of the oldest confirmed transactions. TxIDs: %v", + m.address, maxQueuedTransactions, pruneSubset, prunedTxIDs) } sort.Slice(confirmedTransactions, func(i, j int) bool { return confirmedTransactions[i].ID < confirmedTransactions[j].ID }) sort.Slice(unconfirmedTransactionIDs, func(i, j int) bool { return unconfirmedTransactionIDs[i] < unconfirmedTransactionIDs[j] }) @@ -238,15 +255,15 @@ func (m *InMemoryStore) UpdateTransactionBroadcast(txID uint64, txNonce uint64, // Set the same time for both the tx and its attempt now := time.Now() - unconfirmedTx.LastBroadcastAt = now - if unconfirmedTx.InitialBroadcastAt.IsZero() { - unconfirmedTx.InitialBroadcastAt = now + unconfirmedTx.LastBroadcastAt = &now + if unconfirmedTx.InitialBroadcastAt == nil { + unconfirmedTx.InitialBroadcastAt = &now } a, err := unconfirmedTx.FindAttemptByHash(attemptHash) if err != nil { - return err + return fmt.Errorf("UpdateTransactionBroadcast failed to find attempt. %w", err) } - a.BroadcastAt = now + a.BroadcastAt = &now return nil } @@ -264,10 +281,6 @@ func (m *InMemoryStore) UpdateUnstartedTransactionWithNonce(nonce uint64) (*type return nil, fmt.Errorf("an unconfirmed tx with the same nonce already exists: %v", m.UnconfirmedTransactions[nonce]) } - if _, exists := m.Transactions[nonce]; exists { - return nil, fmt.Errorf("a tx with the same nonce already exists: %v", m.Transactions[nonce]) - } - tx := m.UnstartedTransactions[0] tx.Nonce = &nonce tx.State = types.TxUnconfirmed @@ -328,15 +341,13 @@ func (m *InMemoryStore) MarkTxFatal(*types.Transaction) error { } // Orchestrator -func (m *InMemoryStore) FindTxWithIdempotencyKey(idempotencyKey *string) *types.Transaction { +func (m *InMemoryStore) FindTxWithIdempotencyKey(idempotencyKey string) *types.Transaction { m.RLock() defer m.RUnlock() - if idempotencyKey != nil { - for _, tx := range m.Transactions { - if tx.IdempotencyKey != nil && tx.IdempotencyKey == idempotencyKey { - return tx.DeepCopy() - } + for _, tx := range m.Transactions { + if tx.IdempotencyKey != nil && *tx.IdempotencyKey == idempotencyKey { + return tx.DeepCopy() } } diff --git a/core/chains/evm/txm/storage/inmemory_store_manager.go b/core/chains/evm/txm/storage/inmemory_store_manager.go index 7e0871c3a7f..86abaf4b7cc 100644 --- a/core/chains/evm/txm/storage/inmemory_store_manager.go +++ b/core/chains/evm/txm/storage/inmemory_store_manager.go @@ -6,6 +6,7 @@ import ( "math/big" "github.com/ethereum/go-ethereum/common" + "go.uber.org/multierr" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txm/types" @@ -35,14 +36,14 @@ func (m *InMemoryStoreManager) AbandonPendingTransactions(_ context.Context, fro return fmt.Errorf(StoreNotFoundForAddress, fromAddress) } -func (m *InMemoryStoreManager) Add(addresses ...common.Address) error { +func (m *InMemoryStoreManager) Add(addresses ...common.Address) (err error) { for _, address := range addresses { if _, exists := m.InMemoryStoreMap[address]; exists { - return fmt.Errorf("address %v already exists in store manager", address) + err = multierr.Append(err, fmt.Errorf("address %v already exists in store manager", address)) } m.InMemoryStoreMap[address] = NewInMemoryStore(m.lggr, address, m.chainID) } - return nil + return } func (m *InMemoryStoreManager) AppendAttemptToTransaction(_ context.Context, txNonce uint64, fromAddress common.Address, attempt *types.Attempt) error { @@ -124,7 +125,7 @@ func (m *InMemoryStoreManager) MarkTxFatal(_ context.Context, tx *types.Transact return fmt.Errorf(StoreNotFoundForAddress, fromAddress) } -func (m *InMemoryStoreManager) FindTxWithIdempotencyKey(_ context.Context, idempotencyKey *string) (*types.Transaction, error) { +func (m *InMemoryStoreManager) FindTxWithIdempotencyKey(_ context.Context, idempotencyKey string) (*types.Transaction, error) { for _, store := range m.InMemoryStoreMap { tx := store.FindTxWithIdempotencyKey(idempotencyKey) if tx != nil { diff --git a/core/chains/evm/txm/storage/inmemory_store_test.go b/core/chains/evm/txm/storage/inmemory_store_test.go index b2a3a068018..919a36dde50 100644 --- a/core/chains/evm/txm/storage/inmemory_store_test.go +++ b/core/chains/evm/txm/storage/inmemory_store_test.go @@ -8,8 +8,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap" "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/testutils" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txm/types" @@ -19,8 +21,8 @@ func TestAbandonPendingTransactions(t *testing.T) { t.Parallel() fromAddress := testutils.NewAddress() - m := NewInMemoryStore(logger.Test(t), fromAddress, testutils.FixtureChainID) t.Run("abandons unstarted and unconfirmed transactions", func(t *testing.T) { + m := NewInMemoryStore(logger.Test(t), fromAddress, testutils.FixtureChainID) // Unstarted tx1 := insertUnstartedTransaction(m) tx2 := insertUnstartedTransaction(m) @@ -40,6 +42,7 @@ func TestAbandonPendingTransactions(t *testing.T) { }) t.Run("skips all types apart from unstarted and unconfirmed transactions", func(t *testing.T) { + m := NewInMemoryStore(logger.Test(t), fromAddress, testutils.FixtureChainID) // Fatal tx1 := insertFataTransaction(m) tx2 := insertFataTransaction(m) @@ -56,6 +59,7 @@ func TestAbandonPendingTransactions(t *testing.T) { assert.Equal(t, types.TxFatalError, tx2.State) assert.Equal(t, types.TxConfirmed, tx3.State) assert.Equal(t, types.TxConfirmed, tx4.State) + assert.Len(t, m.Transactions, 2) // tx1, tx2 were dropped }) } @@ -65,33 +69,39 @@ func TestAppendAttemptToTransaction(t *testing.T) { fromAddress := testutils.NewAddress() m := NewInMemoryStore(logger.Test(t), fromAddress, testutils.FixtureChainID) - _, err := insertUnconfirmedTransaction(m, 0) // txID = 1 + _, err := insertUnconfirmedTransaction(m, 10) // txID = 1, nonce = 10 require.NoError(t, err) - _, err = insertConfirmedTransaction(m, 2) // txID = 1 + _, err = insertConfirmedTransaction(m, 2) // txID = 2, nonce = 2 require.NoError(t, err) t.Run("fails if corresponding unconfirmed transaction for attempt was not found", func(t *testing.T) { var nonce uint64 = 1 - newAttempt := &types.Attempt{ - TxID: 1, - } - require.Error(t, m.AppendAttemptToTransaction(nonce, newAttempt)) + newAttempt := &types.Attempt{} + err := m.AppendAttemptToTransaction(nonce, newAttempt) + require.Error(t, err) + require.ErrorContains(t, err, "unconfirmed tx was not found") }) - t.Run("fails if unconfirmed transaction was found but has doesn't match the txID", func(t *testing.T) { - var nonce uint64 + t.Run("fails if unconfirmed transaction was found but doesn't match the txID", func(t *testing.T) { + var nonce uint64 = 10 newAttempt := &types.Attempt{ TxID: 2, } - require.Error(t, m.AppendAttemptToTransaction(nonce, newAttempt)) + err := m.AppendAttemptToTransaction(nonce, newAttempt) + require.Error(t, err) + require.ErrorContains(t, err, "attempt points to a different txID") }) t.Run("appends attempt to transaction", func(t *testing.T) { - var nonce uint64 + var nonce uint64 = 10 newAttempt := &types.Attempt{ TxID: 1, } require.NoError(t, m.AppendAttemptToTransaction(nonce, newAttempt)) + tx, _ := m.FetchUnconfirmedTransactionAtNonceWithCount(10) + assert.Len(t, tx.Attempts, 1) + assert.Equal(t, uint16(1), tx.AttemptCount) + assert.False(t, tx.Attempts[0].CreatedAt.IsZero()) }) } @@ -105,6 +115,10 @@ func TestCountUnstartedTransactions(t *testing.T) { insertUnstartedTransaction(m) assert.Equal(t, 1, m.CountUnstartedTransactions()) + + _, err := insertConfirmedTransaction(m, 10) + require.NoError(t, err) + assert.Equal(t, 1, m.CountUnstartedTransactions()) } func TestCreateEmptyUnconfirmedTransaction(t *testing.T) { @@ -112,16 +126,23 @@ func TestCreateEmptyUnconfirmedTransaction(t *testing.T) { fromAddress := testutils.NewAddress() m := NewInMemoryStore(logger.Test(t), fromAddress, testutils.FixtureChainID) - _, err := insertUnconfirmedTransaction(m, 0) + _, err := insertUnconfirmedTransaction(m, 1) + require.NoError(t, err) + _, err = insertConfirmedTransaction(m, 0) require.NoError(t, err) t.Run("fails if unconfirmed transaction with the same nonce exists", func(t *testing.T) { + _, err := m.CreateEmptyUnconfirmedTransaction(1, 0) + require.Error(t, err) + }) + + t.Run("fails if confirmed transaction with the same nonce exists", func(t *testing.T) { _, err := m.CreateEmptyUnconfirmedTransaction(0, 0) require.Error(t, err) }) t.Run("creates a new empty unconfirmed transaction", func(t *testing.T) { - tx, err := m.CreateEmptyUnconfirmedTransaction(1, 0) + tx, err := m.CreateEmptyUnconfirmedTransaction(2, 0) require.NoError(t, err) assert.Equal(t, types.TxUnconfirmed, tx.State) }) @@ -138,11 +159,11 @@ func TestCreateTransaction(t *testing.T) { txR1 := &types.TxRequest{} txR2 := &types.TxRequest{} tx1 := m.CreateTransaction(txR1) - assert.Equal(t, uint64(1), tx1.ID) + assert.Equal(t, uint64(0), tx1.ID) assert.LessOrEqual(t, now, tx1.CreatedAt) tx2 := m.CreateTransaction(txR2) - assert.Equal(t, uint64(2), tx2.ID) + assert.Equal(t, uint64(1), tx2.ID) assert.LessOrEqual(t, now, tx2.CreatedAt) assert.Equal(t, 2, m.CountUnstartedTransactions()) @@ -151,7 +172,7 @@ func TestCreateTransaction(t *testing.T) { t.Run("prunes oldest unstarted transactions if limit is reached", func(t *testing.T) { m := NewInMemoryStore(logger.Test(t), fromAddress, testutils.FixtureChainID) overshot := 5 - for i := 1; i < maxQueuedTransactions+overshot; i++ { + for i := 0; i < maxQueuedTransactions+overshot; i++ { r := &types.TxRequest{} tx := m.CreateTransaction(r) //nolint:gosec // this won't overflow @@ -185,7 +206,7 @@ func TestFetchUnconfirmedTransactionAtNonceWithCount(t *testing.T) { assert.Equal(t, 1, count) } -func TestMarkTransactionsConfirmed(t *testing.T) { +func TestMarkConfirmedAndReorgedTransactions(t *testing.T) { t.Parallel() fromAddress := testutils.NewAddress() @@ -245,17 +266,33 @@ func TestMarkTransactionsConfirmed(t *testing.T) { assert.Equal(t, utxs[0], ctx2.ID) assert.Empty(t, ctxs) }) + + t.Run("logs an error during confirmation if a transaction with the same nonce already exists", func(t *testing.T) { + lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) + m := NewInMemoryStore(lggr, fromAddress, testutils.FixtureChainID) + _, err := insertConfirmedTransaction(m, 0) + require.NoError(t, err) + _, err = insertUnconfirmedTransaction(m, 0) + require.NoError(t, err) + + _, _, err = m.MarkConfirmedAndReorgedTransactions(1) + require.NoError(t, err) + tests.AssertLogEventually(t, observedLogs, "Another confirmed transaction with the same nonce exists") + }) + t.Run("prunes confirmed transactions map if it reaches the limit", func(t *testing.T) { m := NewInMemoryStore(logger.Test(t), fromAddress, testutils.FixtureChainID) - for i := 0; i < maxQueuedTransactions; i++ { + overshot := 5 + for i := 0; i < maxQueuedTransactions+overshot; i++ { //nolint:gosec // this won't overflow _, err := insertConfirmedTransaction(m, uint64(i)) require.NoError(t, err) } - assert.Len(t, m.ConfirmedTransactions, maxQueuedTransactions) - _, _, err := m.MarkConfirmedAndReorgedTransactions(maxQueuedTransactions) + assert.Len(t, m.ConfirmedTransactions, maxQueuedTransactions+overshot) + //nolint:gosec // this won't overflow + _, _, err := m.MarkConfirmedAndReorgedTransactions(uint64(maxQueuedTransactions + overshot)) require.NoError(t, err) - assert.Len(t, m.ConfirmedTransactions, (maxQueuedTransactions - maxQueuedTransactions/pruneSubset)) + assert.Len(t, m.ConfirmedTransactions, 170) }) } @@ -389,6 +426,23 @@ func TestDeleteAttemptForUnconfirmedTx(t *testing.T) { }) } +func TestFindTxWithIdempotencyKey(t *testing.T) { + t.Parallel() + fromAddress := testutils.NewAddress() + m := NewInMemoryStore(logger.Test(t), fromAddress, testutils.FixtureChainID) + tx, err := insertConfirmedTransaction(m, 0) + require.NoError(t, err) + + ik := "IK" + tx.IdempotencyKey = &ik + itx := m.FindTxWithIdempotencyKey(ik) + assert.Equal(t, ik, *itx.IdempotencyKey) + + uik := "Unknown" + itx = m.FindTxWithIdempotencyKey(uik) + assert.Nil(t, itx) +} + func TestPruneConfirmedTransactions(t *testing.T) { t.Parallel() fromAddress := testutils.NewAddress() @@ -424,6 +478,7 @@ func insertUnstartedTransaction(m *InMemoryStore) *types.Transaction { } m.UnstartedTransactions = append(m.UnstartedTransactions, tx) + m.Transactions[tx.ID] = tx return tx } @@ -449,6 +504,7 @@ func insertUnconfirmedTransaction(m *InMemoryStore, nonce uint64) (*types.Transa } m.UnconfirmedTransactions[nonce] = tx + m.Transactions[tx.ID] = tx return tx, nil } @@ -474,6 +530,7 @@ func insertConfirmedTransaction(m *InMemoryStore, nonce uint64) (*types.Transact } m.ConfirmedTransactions[nonce] = tx + m.Transactions[tx.ID] = tx return tx, nil } @@ -496,5 +553,6 @@ func insertFataTransaction(m *InMemoryStore) *types.Transaction { } m.FatalTransactions = append(m.FatalTransactions, tx) + m.Transactions[tx.ID] = tx return tx } diff --git a/core/chains/evm/txm/stuck_tx_detector.go b/core/chains/evm/txm/stuck_tx_detector.go index 68d8caf0ed1..33905ead80c 100644 --- a/core/chains/evm/txm/stuck_tx_detector.go +++ b/core/chains/evm/txm/stuck_tx_detector.go @@ -23,22 +23,23 @@ type StuckTxDetectorConfig struct { } type stuckTxDetector struct { - lggr logger.Logger - chainType chaintype.ChainType - config StuckTxDetectorConfig + lggr logger.Logger + chainType chaintype.ChainType + config StuckTxDetectorConfig + lastPurgeMap map[common.Address]time.Time } func NewStuckTxDetector(lggr logger.Logger, chaintype chaintype.ChainType, config StuckTxDetectorConfig) *stuckTxDetector { return &stuckTxDetector{ - lggr: lggr, - chainType: chaintype, - config: config, + lggr: lggr, + chainType: chaintype, + config: config, + lastPurgeMap: make(map[common.Address]time.Time), } } func (s *stuckTxDetector) DetectStuckTransaction(ctx context.Context, tx *types.Transaction) (bool, error) { switch s.chainType { - // TODO: rename case chaintype.ChainDualBroadcast: result, err := s.dualBroadcastDetection(ctx, tx) if result || err != nil { @@ -50,11 +51,20 @@ func (s *stuckTxDetector) DetectStuckTransaction(ctx context.Context, tx *types. } } +// timeBasedDetection marks a transaction if all the following conditions are met: +// - LastBroadcastAt is not nil +// - Time since last broadcast is above the threshold +// - Time since last purge is above threshold +// +// NOTE: Potentially we can use a subset of threhsold for last purge check, because the transaction would have already been broadcasted to the mempool +// so it is more likely to be picked up compared to a transaction that hasn't been broadcasted before. This would avoid slowing down TXM for sebsequent transactions +// in case the current one is stuck. func (s *stuckTxDetector) timeBasedDetection(tx *types.Transaction) bool { threshold := (s.config.BlockTime * time.Duration(s.config.StuckTxBlockThreshold)) - if time.Since(tx.LastBroadcastAt) > threshold && !tx.LastBroadcastAt.IsZero() { - s.lggr.Debugf("TxID: %v last broadcast was: %v which is more than the max configured duration: %v. Transaction is now considered stuck and will be purged.", - tx.ID, tx.LastBroadcastAt, threshold) + if tx.LastBroadcastAt != nil && min(time.Since(*tx.LastBroadcastAt), time.Since(s.lastPurgeMap[tx.FromAddress])) > threshold { + s.lggr.Debugf("TxID: %v last broadcast was: %v and last purge: %v which is more than the max configured duration: %v. Transaction is now considered stuck and will be purged.", + tx.ID, tx.LastBroadcastAt, s.lastPurgeMap[tx.FromAddress], threshold) + s.lastPurgeMap[tx.FromAddress] = time.Now() return true } return false diff --git a/core/chains/evm/txm/stuck_tx_detector_test.go b/core/chains/evm/txm/stuck_tx_detector_test.go new file mode 100644 index 00000000000..af5a765dcdb --- /dev/null +++ b/core/chains/evm/txm/stuck_tx_detector_test.go @@ -0,0 +1,80 @@ +package txm + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/testutils" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txm/types" +) + +func TestTimeBasedDetection(t *testing.T) { + t.Parallel() + + t.Run("returns false if transaction is not stuck", func(t *testing.T) { + config := StuckTxDetectorConfig{ + BlockTime: 10 * time.Second, + StuckTxBlockThreshold: 5, + } + fromAddress := testutils.NewAddress() + s := NewStuckTxDetector(logger.Test(t), "", config) + + // No previous broadcast + tx := &types.Transaction{ + ID: 1, + LastBroadcastAt: nil, + FromAddress: fromAddress, + } + assert.False(t, s.timeBasedDetection(tx)) + // Not enough time has passed since last broadcast + now := time.Now() + tx.LastBroadcastAt = &now + assert.False(t, s.timeBasedDetection(tx)) + // Not enough time has passed since last purge + tx.LastBroadcastAt = &time.Time{} + s.lastPurgeMap[fromAddress] = now + assert.False(t, s.timeBasedDetection(tx)) + }) + + t.Run("returns true if transaction is stuck", func(t *testing.T) { + config := StuckTxDetectorConfig{ + BlockTime: 10 * time.Second, + StuckTxBlockThreshold: 5, + } + fromAddress := testutils.NewAddress() + s := NewStuckTxDetector(logger.Test(t), "", config) + + tx := &types.Transaction{ + ID: 1, + LastBroadcastAt: &time.Time{}, + FromAddress: fromAddress, + } + assert.True(t, s.timeBasedDetection(tx)) + }) + + t.Run("marks first tx as stuck, updates purge time for address, and returns false for the second tx with the same broadcast time", func(t *testing.T) { + config := StuckTxDetectorConfig{ + BlockTime: 1 * time.Second, + StuckTxBlockThreshold: 10, + } + fromAddress := testutils.NewAddress() + s := NewStuckTxDetector(logger.Test(t), "", config) + + tx1 := &types.Transaction{ + ID: 1, + LastBroadcastAt: &time.Time{}, + FromAddress: fromAddress, + } + tx2 := &types.Transaction{ + ID: 2, + LastBroadcastAt: &time.Time{}, + FromAddress: fromAddress, + } + assert.True(t, s.timeBasedDetection(tx1)) + assert.False(t, s.timeBasedDetection(tx2)) + }) +} diff --git a/core/chains/evm/txm/txm.go b/core/chains/evm/txm/txm.go index c37099d3783..bf53e00e81a 100644 --- a/core/chains/evm/txm/txm.go +++ b/core/chains/evm/txm/txm.go @@ -9,8 +9,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/jpillora/backoff" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" @@ -22,7 +20,7 @@ import ( const ( broadcastInterval time.Duration = 30 * time.Second maxInFlightTransactions int = 16 - maxInFlightSubset int = 3 + maxInFlightSubset int = 5 maxAllowedAttempts uint16 = 10 pendingNonceDefaultTimeout time.Duration = 30 * time.Second pendingNonceRecheckInterval time.Duration = 1 * time.Second @@ -67,25 +65,6 @@ type Keystore interface { EnabledAddressesForChain(ctx context.Context, chainID *big.Int) (addresses []common.Address, err error) } -var ( - promNumBroadcastedTxs = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "txm_num_broadcasted_transactions", - Help: "Total number of successful broadcasted transactions.", - }, []string{"chainID"}) - promNumConfirmedTxs = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "txm_num_confirmed_transactions", - Help: "Total number of confirmed transactions. Note that this can happen multiple times per transaction in the case of re-orgs or when filling the nonce for untracked transactions.", - }, []string{"chainID"}) - promNumNonceGaps = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "txm_num_nonce_gaps", - Help: "Total number of nonce gaps created that the transaction manager had to fill.", - }, []string{"chainID"}) - promTimeUntilTxConfirmed = promauto.NewHistogramVec(prometheus.HistogramOpts{ - Name: "txm_time_until_tx_confirmed", - Help: "The amount of time elapsed from a transaction being broadcast to being included in a block.", - }, []string{"chainID"}) -) - type Config struct { EIP1559 bool BlockTime time.Duration @@ -104,6 +83,7 @@ type Txm struct { txStore TxStore keystore Keystore config Config + metrics *txmMetrics nonceMapMu sync.Mutex nonceMap map[common.Address]uint64 @@ -130,6 +110,11 @@ func NewTxm(lggr logger.Logger, chainID *big.Int, client Client, attemptBuilder func (t *Txm) Start(ctx context.Context) error { return t.StartOnce("Txm", func() error { + tm, err := NewTxmMetrics(t.chainID) + if err != nil { + return err + } + t.metrics = tm t.stopCh = make(chan struct{}) addresses, err := t.keystore.EnabledAddressesForChain(ctx, t.chainID) @@ -137,45 +122,39 @@ func (t *Txm) Start(ctx context.Context) error { return err } for _, address := range addresses { - err := t.startAddress(ctx, address) - if err != nil { - return err - } + t.startAddress(address) } return nil }) } -func (t *Txm) startAddress(ctx context.Context, address common.Address) error { +func (t *Txm) startAddress(address common.Address) { triggerCh := make(chan struct{}, 1) t.triggerCh[address] = triggerCh - pendingNonce, err := t.pollForPendingNonce(ctx, address) - if err != nil { - return err - } - t.setNonce(address, pendingNonce) t.wg.Add(2) go t.broadcastLoop(address, triggerCh) go t.backfillLoop(address) - return nil } -func (t *Txm) pollForPendingNonce(ctx context.Context, address common.Address) (pendingNonce uint64, err error) { +func (t *Txm) initializeNonce(ctx context.Context, address common.Address) { ctxWithTimeout, cancel := context.WithTimeout(ctx, pendingNonceDefaultTimeout) defer cancel() for { - pendingNonce, err = t.client.PendingNonceAt(ctxWithTimeout, address) + pendingNonce, err := t.client.PendingNonceAt(ctxWithTimeout, address) if err != nil { - t.lggr.Errorw("Error when fetching initial pending nonce", "address", address, "err", err) + t.lggr.Errorw("Error when fetching initial nonce", "address", address, "err", err) select { case <-time.After(pendingNonceRecheckInterval): case <-ctx.Done(): - return 0, context.Cause(ctx) + t.lggr.Errorw("context error", "err", context.Cause(ctx)) + return } continue } - return pendingNonce, nil + t.setNonce(address, pendingNonce) + t.lggr.Debugf("Set initial nonce for address: %v to %d", address, pendingNonce) + return } } @@ -208,6 +187,7 @@ func (t *Txm) Trigger(address common.Address) { } func (t *Txm) Abandon(address common.Address) error { + // TODO: restart txm t.lggr.Infof("Dropping unstarted and unconfirmed transactions for address: %v", address) return t.txStore.AbandonPendingTransactions(context.TODO(), address) } @@ -239,6 +219,8 @@ func (t *Txm) broadcastLoop(address common.Address, triggerCh chan struct{}) { broadcastWithBackoff := newBackoff(1 * time.Second) var broadcastCh <-chan time.Time + t.initializeNonce(ctx, address) + for { start := time.Now() bo, err := t.broadcastTransaction(ctx, address) @@ -300,11 +282,11 @@ func (t *Txm) broadcastTransaction(ctx context.Context, address common.Address) return false, err } - // Optimistically send up to 1/maxInFlightSubset of the maxInFlightTransactions. After that threshold, broadcast more cautiously - // by checking the pending nonce so no more than maxInFlightTransactions/3 can get stuck simultaneously i.e. due + // Optimistically send up to maxInFlightSubset of the maxInFlightTransactions. After that threshold, broadcast more cautiously + // by checking the pending nonce so no more than maxInFlightSubset can get stuck simultaneously i.e. due // to insufficient balance. We're making this trade-off to avoid storing stuck transactions and making unnecessary // RPC calls. The upper limit is always maxInFlightTransactions regardless of the pending nonce. - if unconfirmedCount >= maxInFlightTransactions/maxInFlightSubset { + if unconfirmedCount >= maxInFlightSubset { if unconfirmedCount > maxInFlightTransactions { t.lggr.Warnf("Reached transaction limit: %d for unconfirmed transactions", maxInFlightTransactions) return true, nil @@ -332,7 +314,7 @@ func (t *Txm) broadcastTransaction(ctx context.Context, address common.Address) t.setNonce(address, nonce+1) if err := t.createAndSendAttempt(ctx, tx, address); err != nil { - return true, err + return false, err } } } @@ -360,7 +342,7 @@ func (t *Txm) sendTransactionWithError(ctx context.Context, tx *types.Transactio start := time.Now() txErr := t.client.SendTransaction(ctx, tx, attempt) tx.AttemptCount++ - t.lggr.Infow("Broadcasted attempt", "tx", tx.PrettyPrint(), "attempt", attempt.PrettyPrint(), "duration", time.Since(start), "txErr: ", txErr) + t.lggr.Infow("Broadcasted attempt", "tx", tx, "attempt", attempt, "duration", time.Since(start), "txErr: ", txErr) if txErr != nil && t.errorHandler != nil { if err = t.errorHandler.HandleError(tx, txErr, t.attemptBuilder, t.client, t.txStore, t.setNonce, false); err != nil { return @@ -371,12 +353,11 @@ func (t *Txm) sendTransactionWithError(ctx context.Context, tx *types.Transactio return err } if pendingNonce <= *tx.Nonce { - t.lggr.Debugf("Pending nonce for txID: %v didn't increase. PendingNonce: %d, TxNonce: %d", tx.ID, pendingNonce, *tx.Nonce) - return nil + return fmt.Errorf("Pending nonce for txID: %v didn't increase. PendingNonce: %d, TxNonce: %d. TxErr: %w", tx.ID, pendingNonce, *tx.Nonce, txErr) } } - promNumBroadcastedTxs.WithLabelValues(t.chainID.String()).Add(float64(1)) + t.metrics.IncrementNumBroadcastedTxs(ctx) return t.txStore.UpdateTransactionBroadcast(ctx, attempt.TxID, *tx.Nonce, attempt.Hash, address) } @@ -391,8 +372,8 @@ func (t *Txm) backfillTransactions(ctx context.Context, address common.Address) return false, err } if len(confirmedTransactions) > 0 || len(unconfirmedTransactionIDs) > 0 { - promNumConfirmedTxs.WithLabelValues(t.chainID.String()).Add(float64(len(confirmedTransactions))) - confirmedTransactionIDs := extractMetrics(confirmedTransactions, t.chainID) + t.metrics.IncrementNumConfirmedTxs(ctx, len(confirmedTransactions)) + confirmedTransactionIDs := t.extractMetrics(ctx, confirmedTransactions) t.lggr.Infof("Confirmed transaction IDs: %v . Re-orged transaction IDs: %v", confirmedTransactionIDs, unconfirmedTransactionIDs) } @@ -407,7 +388,7 @@ func (t *Txm) backfillTransactions(ctx context.Context, address common.Address) if tx == nil || *tx.Nonce != latestNonce { t.lggr.Warnf("Nonce gap at nonce: %d - address: %v. Creating a new transaction\n", latestNonce, address) - promNumNonceGaps.WithLabelValues(t.chainID.String()).Add(float64(1)) + t.metrics.IncrementNumNonceGaps(ctx) return false, t.createAndSendEmptyTx(ctx, latestNonce, address) } else { //nolint:revive //linter nonsense if !tx.IsPurgeable && t.stuckTxDetector != nil { @@ -430,10 +411,10 @@ func (t *Txm) backfillTransactions(ctx context.Context, address common.Address) return true, fmt.Errorf("reached max allowed attempts for txID: %d. TXM won't broadcast any more attempts."+ "If this error persists, it means the transaction won't be confirmed and the TXM needs to be restarted."+ "Look for any error messages from previous broadcasted attempts that may indicate why this happened, i.e. wallet is out of funds. Tx: %v", tx.ID, - tx.PrettyPrintWithAttempts()) + tx.PrintWithAttempts()) } - if time.Since(tx.LastBroadcastAt) > (t.config.BlockTime*time.Duration(t.config.RetryBlockThreshold)) || tx.LastBroadcastAt.IsZero() { + if tx.LastBroadcastAt == nil || time.Since(*tx.LastBroadcastAt) > (t.config.BlockTime*time.Duration(t.config.RetryBlockThreshold)) { // TODO: add optional graceful bumping strategy t.lggr.Info("Rebroadcasting attempt for txID: ", tx.ID) return false, t.createAndSendAttempt(ctx, tx, address) @@ -450,12 +431,12 @@ func (t *Txm) createAndSendEmptyTx(ctx context.Context, latestNonce uint64, addr return t.createAndSendAttempt(ctx, tx, address) } -func extractMetrics(txs []*types.Transaction, chainID *big.Int) []uint64 { +func (t *Txm) extractMetrics(ctx context.Context, txs []*types.Transaction) []uint64 { confirmedTxIDs := make([]uint64, 0, len(txs)) for _, tx := range txs { confirmedTxIDs = append(confirmedTxIDs, tx.ID) - if !tx.InitialBroadcastAt.IsZero() { - promTimeUntilTxConfirmed.WithLabelValues(chainID.String()).Observe(float64(time.Since(tx.InitialBroadcastAt))) + if tx.InitialBroadcastAt != nil { + t.metrics.RecordTimeUntilTxConfirmed(ctx, float64(time.Since(*tx.InitialBroadcastAt))) } } return confirmedTxIDs diff --git a/core/chains/evm/txm/txm_test.go b/core/chains/evm/txm/txm_test.go index fef90f9c344..458c0ca97ef 100644 --- a/core/chains/evm/txm/txm_test.go +++ b/core/chains/evm/txm/txm_test.go @@ -2,6 +2,7 @@ package txm import ( "errors" + "fmt" "testing" "time" @@ -38,12 +39,14 @@ func TestLifecycle(t *testing.T) { lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) config := Config{BlockTime: 1 * time.Minute} txStore := storage.NewInMemoryStoreManager(lggr, testutils.FixtureChainID) + require.NoError(t, txStore.Add(address1)) keystore.On("EnabledAddressesForChain", mock.Anything, mock.Anything).Return([]common.Address{address1}, nil).Once() txm := NewTxm(lggr, testutils.FixtureChainID, client, nil, txStore, nil, config, keystore) client.On("PendingNonceAt", mock.Anything, address1).Return(uint64(0), errors.New("error")).Once() - client.On("PendingNonceAt", mock.Anything, address1).Return(uint64(0), nil).Once() + client.On("PendingNonceAt", mock.Anything, address1).Return(uint64(100), nil).Once() require.NoError(t, txm.Start(tests.Context(t))) - tests.AssertLogEventually(t, observedLogs, "Error when fetching initial pending nonce") + tests.AssertLogEventually(t, observedLogs, "Error when fetching initial nonce") + tests.AssertLogEventually(t, observedLogs, fmt.Sprintf("Set initial nonce for address: %v to %d", address1, 100)) }) t.Run("tests lifecycle successfully without any transactions", func(t *testing.T) { @@ -89,7 +92,7 @@ func TestTrigger(t *testing.T) { txm := NewTxm(lggr, testutils.FixtureChainID, client, ab, txStore, nil, config, keystore) var nonce uint64 // Start - client.On("PendingNonceAt", mock.Anything, address).Return(nonce, nil).Once() + client.On("PendingNonceAt", mock.Anything, address).Return(nonce, nil).Maybe() servicetest.Run(t, txm) txm.Trigger(address) }) @@ -126,12 +129,12 @@ func TestBroadcastTransaction(t *testing.T) { tests.AssertLogEventually(t, observedLogs, "Reached transaction limit") }) - t.Run("checks pending nonce if unconfirmed transactions are more than 1/3 of maxInFlightTransactions", func(t *testing.T) { + t.Run("checks pending nonce if unconfirmed transactions are equal or more than maxInFlightSubset", func(t *testing.T) { lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) mTxStore := mocks.NewTxStore(t) txm := NewTxm(lggr, testutils.FixtureChainID, client, ab, mTxStore, nil, config, keystore) txm.setNonce(address, 1) - mTxStore.On("FetchUnconfirmedTransactionAtNonceWithCount", mock.Anything, mock.Anything, mock.Anything).Return(nil, maxInFlightTransactions/3, nil).Twice() + mTxStore.On("FetchUnconfirmedTransactionAtNonceWithCount", mock.Anything, mock.Anything, mock.Anything).Return(nil, maxInFlightSubset, nil).Twice() client.On("PendingNonceAt", mock.Anything, address).Return(uint64(0), nil).Once() // LocalNonce: 1, PendingNonce: 0 bo, err := txm.broadcastTransaction(ctx, address) @@ -174,9 +177,12 @@ func TestBroadcastTransaction(t *testing.T) { require.NoError(t, txStore.Add(address)) txm := NewTxm(lggr, testutils.FixtureChainID, client, ab, txStore, nil, config, keystore) txm.setNonce(address, 8) + metrics, err := NewTxmMetrics(testutils.FixtureChainID) + require.NoError(t, err) + txm.metrics = metrics IDK := "IDK" txRequest := &types.TxRequest{ - Data: []byte{100}, + Data: []byte{100, 200}, IdempotencyKey: &IDK, ChainID: testutils.FixtureChainID, FromAddress: address, @@ -197,13 +203,13 @@ func TestBroadcastTransaction(t *testing.T) { require.NoError(t, err) assert.False(t, bo) assert.Equal(t, uint64(9), txm.getNonce(address)) - tx, err = txStore.FindTxWithIdempotencyKey(tests.Context(t), &IDK) + tx, err = txStore.FindTxWithIdempotencyKey(tests.Context(t), IDK) require.NoError(t, err) assert.Len(t, tx.Attempts, 1) var zeroTime time.Time - assert.Greater(t, tx.LastBroadcastAt, zeroTime) - assert.Greater(t, tx.Attempts[0].BroadcastAt, zeroTime) - assert.Greater(t, tx.InitialBroadcastAt, zeroTime) + assert.Greater(t, *tx.LastBroadcastAt, zeroTime) + assert.Greater(t, *tx.Attempts[0].BroadcastAt, zeroTime) + assert.Greater(t, *tx.InitialBroadcastAt, zeroTime) }) } diff --git a/core/chains/evm/txm/types/transaction.go b/core/chains/evm/txm/types/transaction.go index 0c1d2861daa..c2be729e509 100644 --- a/core/chains/evm/txm/types/transaction.go +++ b/core/chains/evm/txm/types/transaction.go @@ -4,7 +4,7 @@ import ( "encoding/json" "fmt" "math/big" - "strconv" + "reflect" "time" "github.com/google/uuid" @@ -42,8 +42,8 @@ type Transaction struct { SpecifiedGasLimit uint64 CreatedAt time.Time - InitialBroadcastAt time.Time - LastBroadcastAt time.Time + InitialBroadcastAt *time.Time + LastBroadcastAt *time.Time State TxState IsPurgeable bool @@ -60,29 +60,30 @@ type Transaction struct { CallbackCompleted bool } -func (t *Transaction) PrettyPrint() string { - idk, nonce := "", "" - if t.IdempotencyKey != nil { - idk = *t.IdempotencyKey - } - if t.Nonce != nil { - nonce = strconv.FormatUint(*t.Nonce, 10) - } +func (t *Transaction) String() string { return fmt.Sprintf(`{txID:%d, IdempotencyKey:%v, ChainID:%v, Nonce:%s, FromAddress:%v, ToAddress:%v, Value:%v, `+ - `Data:%s, SpecifiedGasLimit:%d, CreatedAt:%v, InitialBroadcastAt:%v, LastBroadcastAt:%v, State:%v, IsPurgeable:%v, AttemptCount:%d, `+ + `Data:%X, SpecifiedGasLimit:%d, CreatedAt:%v, InitialBroadcastAt:%v, LastBroadcastAt:%v, State:%v, IsPurgeable:%v, AttemptCount:%d, `+ `Meta:%v, Subject:%v}`, - t.ID, idk, t.ChainID, nonce, t.FromAddress, t.ToAddress, t.Value, t.Data, t.SpecifiedGasLimit, t.CreatedAt, t.InitialBroadcastAt, - t.LastBroadcastAt, t.State, t.IsPurgeable, t.AttemptCount, t.Meta, t.Subject) + t.ID, stringOrNull(t.IdempotencyKey), t.ChainID, stringOrNull(t.Nonce), t.FromAddress, t.ToAddress, t.Value, + reflect.ValueOf(&t.Data).Elem(), t.SpecifiedGasLimit, t.CreatedAt, stringOrNull(t.InitialBroadcastAt), stringOrNull(t.LastBroadcastAt), + t.State, t.IsPurgeable, t.AttemptCount, t.Meta, t.Subject) +} + +func stringOrNull[T any](t *T) string { + if t != nil { + return fmt.Sprintf("%v", *t) + } + return "null" } -func (t *Transaction) PrettyPrintWithAttempts() string { +func (t *Transaction) PrintWithAttempts() string { attempts := " Attempts: [" for _, a := range t.Attempts { - attempts += a.PrettyPrint() + ", " + attempts += a.String() + ", " } attempts += "]" - return t.PrettyPrint() + attempts + return t.String() + attempts } func (t *Transaction) FindAttemptByHash(attemptHash common.Hash) (*Attempt, error) { @@ -125,7 +126,7 @@ type Attempt struct { SignedTransaction *types.Transaction CreatedAt time.Time - BroadcastAt time.Time + BroadcastAt *time.Time } func (a *Attempt) DeepCopy() *Attempt { @@ -136,9 +137,9 @@ func (a *Attempt) DeepCopy() *Attempt { return &txCopy } -func (a *Attempt) PrettyPrint() string { +func (a *Attempt) String() string { return fmt.Sprintf(`{ID:%d, TxID:%d, Hash:%v, Fee:%v, GasLimit:%d, Type:%v, CreatedAt:%v, BroadcastAt:%v}`, - a.ID, a.TxID, a.Hash, a.Fee, a.GasLimit, a.Type, a.CreatedAt, a.BroadcastAt) + a.ID, a.TxID, a.Hash, a.Fee, a.GasLimit, a.Type, a.CreatedAt, stringOrNull(a.BroadcastAt)) } type TxRequest struct { diff --git a/core/chains/evm/txmgr/builder.go b/core/chains/evm/txmgr/builder.go index b5be9c829f2..421d8f0eb2f 100644 --- a/core/chains/evm/txmgr/builder.go +++ b/core/chains/evm/txmgr/builder.go @@ -127,7 +127,7 @@ func NewTxmV2( stuckTxDetector = txm.NewStuckTxDetector(lggr, chainConfig.ChainType(), stuckTxDetectorConfig) } - attemptBuilder := txm.NewAttemptBuilder(chainID, fCfg.PriceMax(), estimator, keyStore) + attemptBuilder := txm.NewAttemptBuilder(chainID, fCfg.PriceMaxKey, estimator, keyStore) inMemoryStoreManager := storage.NewInMemoryStoreManager(lggr, chainID) config := txm.Config{ EIP1559: fCfg.EIP1559DynamicFees(),