diff --git a/common/txmgr/address_state.go b/common/txmgr/address_state.go new file mode 100644 index 00000000000..9c2a4a52ece --- /dev/null +++ b/common/txmgr/address_state.go @@ -0,0 +1,712 @@ +package txmgr + +import ( + "fmt" + "sync" + "time" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types" + txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" + "github.com/smartcontractkit/chainlink/v2/common/types" + "gopkg.in/guregu/null.v4" +) + +// AddressState is the state of a given from address +type AddressState[ + CHAIN_ID types.ID, + ADDR, TX_HASH, BLOCK_HASH types.Hashable, + R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], + SEQ types.Sequence, + FEE feetypes.Fee, +] struct { + lggr logger.SugaredLogger + chainID CHAIN_ID + fromAddress ADDR + + sync.RWMutex + idempotencyKeyToTx map[string]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + unstarted *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] + inprogress *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + // NOTE: currently the unconfirmed map's key is the transaction ID that is assigned via the postgres DB + unconfirmed map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + confirmedMissingReceipt map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + confirmed map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + allTransactions map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + fatalErrored map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + // TODO: FINISH populate attemptHashToTxAttempt + // TODO: ANY NEW ATTEMPTS NEED TO BE ADDED TO THIS MAP + attemptHashToTxAttempt map[TX_HASH]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] +} + +// NewAddressState returns a new AddressState instance +func NewAddressState[ + CHAIN_ID types.ID, + ADDR, TX_HASH, BLOCK_HASH types.Hashable, + R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], + SEQ types.Sequence, + FEE feetypes.Fee, +]( + lggr logger.SugaredLogger, + chainID CHAIN_ID, + fromAddress ADDR, + maxUnstarted int, + txs []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], +) (*AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE], error) { + // Count the number of transactions in each state to reduce the number of map resizes + counts := map[txmgrtypes.TxState]int{ + TxUnstarted: 0, + TxInProgress: 0, + TxUnconfirmed: 0, + TxConfirmedMissingReceipt: 0, + TxConfirmed: 0, + TxFatalError: 0, + } + var idempotencyKeysCount int + var txAttemptCount int + for _, tx := range txs { + counts[tx.State]++ + if tx.IdempotencyKey != nil { + idempotencyKeysCount++ + } + if tx.State == TxUnconfirmed { + txAttemptCount += len(tx.TxAttempts) + } + } + + // TODO: MAKE BETTER + // nit: probably not a big deal but not all txs have an idempotency key so we're probably initializing this map bigger than it needs to be here. + as := AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{ + lggr: lggr, + chainID: chainID, + fromAddress: fromAddress, + + idempotencyKeyToTx: make(map[string]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], idempotencyKeysCount), + unstarted: NewTxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE](maxUnstarted), + inprogress: nil, + unconfirmed: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], counts[TxUnconfirmed]), + confirmedMissingReceipt: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], counts[TxConfirmedMissingReceipt]), + confirmed: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], counts[TxConfirmed]), + allTransactions: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], len(txs)), + fatalErrored: make(map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], counts[TxFatalError]), + attemptHashToTxAttempt: make(map[TX_HASH]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], txAttemptCount), + } + + // Load all transactions supplied + for i := 0; i < len(txs); i++ { + tx := txs[i] + switch tx.State { + case TxUnstarted: + as.unstarted.AddTx(&tx) + case TxInProgress: + as.inprogress = &tx + case TxUnconfirmed: + as.unconfirmed[tx.ID] = &tx + case TxConfirmedMissingReceipt: + as.confirmedMissingReceipt[tx.ID] = &tx + case TxConfirmed: + as.confirmed[tx.ID] = &tx + case TxFatalError: + as.fatalErrored[tx.ID] = &tx + } + as.allTransactions[tx.ID] = &tx + if tx.IdempotencyKey != nil { + as.idempotencyKeyToTx[*tx.IdempotencyKey] = &tx + } + for _, txAttempt := range tx.TxAttempts { + as.attemptHashToTxAttempt[txAttempt.Hash] = txAttempt + } + } + + return &as, nil +} + +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) close() { + clear(as.idempotencyKeyToTx) + + as.unstarted.Close() + as.unstarted = nil + as.inprogress = nil + + clear(as.unconfirmed) + clear(as.confirmedMissingReceipt) + clear(as.confirmed) + clear(as.allTransactions) + clear(as.fatalErrored) + + as.idempotencyKeyToTx = nil + as.unconfirmed = nil + as.confirmedMissingReceipt = nil + as.confirmed = nil + as.allTransactions = nil + as.fatalErrored = nil +} + +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CountTransactionsByState(txState txmgrtypes.TxState) int { + as.RLock() + defer as.RUnlock() + + switch txState { + case TxUnstarted: + return as.unstarted.Len() + case TxInProgress: + if as.inprogress != nil { + return 1 + } + case TxUnconfirmed: + return len(as.unconfirmed) + case TxConfirmedMissingReceipt: + return len(as.confirmedMissingReceipt) + case TxConfirmed: + return len(as.confirmed) + case TxFatalError: + return len(as.fatalErrored) + } + + return 0 +} + +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxWithIdempotencyKey(key string) *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { + as.RLock() + defer as.RUnlock() + + return as.idempotencyKeyToTx[key] +} + +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ApplyToTxsByState( + txStates []txmgrtypes.TxState, + fn func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]), + txIDs ...int64, +) { + as.Lock() + defer as.Unlock() + + // if txStates is empty then apply the filter to only the as.allTransactions map + if len(txStates) == 0 { + as.applyToTxs(as.allTransactions, fn, txIDs...) + return + } + + for _, txState := range txStates { + switch txState { + case TxInProgress: + if as.inprogress != nil { + fn(as.inprogress) + } + case TxUnconfirmed: + as.applyToTxs(as.unconfirmed, fn, txIDs...) + case TxConfirmedMissingReceipt: + as.applyToTxs(as.confirmedMissingReceipt, fn, txIDs...) + case TxConfirmed: + as.applyToTxs(as.confirmed, fn, txIDs...) + case TxFatalError: + as.applyToTxs(as.fatalErrored, fn, txIDs...) + } + } +} + +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) applyToTxs( + txIDsToTx map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + fn func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]), + txIDs ...int64, +) { + // if txIDs is not empty then only apply the filter to those transactions + if len(txIDs) > 0 { + for _, txID := range txIDs { + tx := txIDsToTx[txID] + if tx != nil { + fn(tx) + } + } + return + } + + // if txIDs is empty then apply the filter to all transactions + for _, tx := range txIDsToTx { + fn(tx) + } +} + +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FetchTxAttempts( + txStates []txmgrtypes.TxState, + txFilter func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool, + txAttemptFilter func(*txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool, + txIDs ...int64, +) []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { + as.RLock() + defer as.RUnlock() + + // if txStates is empty then apply the filter to only the as.allTransactions map + if len(txStates) == 0 { + return as.fetchTxAttempts(as.allTransactions, txFilter, txAttemptFilter, txIDs...) + } + + var txAttempts []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + for _, txState := range txStates { + switch txState { + case TxInProgress: + if as.inprogress != nil && txFilter(as.inprogress) { + for _, txAttempt := range as.inprogress.TxAttempts { + if txAttemptFilter(&txAttempt) { + txAttempts = append(txAttempts, txAttempt) + } + } + } + case TxUnconfirmed: + txAttempts = append(txAttempts, as.fetchTxAttempts(as.unconfirmed, txFilter, txAttemptFilter, txIDs...)...) + case TxConfirmedMissingReceipt: + txAttempts = append(txAttempts, as.fetchTxAttempts(as.confirmedMissingReceipt, txFilter, txAttemptFilter, txIDs...)...) + case TxConfirmed: + txAttempts = append(txAttempts, as.fetchTxAttempts(as.confirmed, txFilter, txAttemptFilter, txIDs...)...) + case TxFatalError: + txAttempts = append(txAttempts, as.fetchTxAttempts(as.fatalErrored, txFilter, txAttemptFilter, txIDs...)...) + } + } + + return txAttempts +} + +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) fetchTxAttempts( + txIDsToTx map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + txFilter func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool, + txAttemptFilter func(*txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool, + txIDs ...int64, +) []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { + as.RLock() + defer as.RUnlock() + + var txAttempts []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + // if txIDs is not empty then only apply the filter to those transactions + if len(txIDs) > 0 { + for _, txID := range txIDs { + tx := txIDsToTx[txID] + if tx != nil && txFilter(tx) { + for _, txAttempt := range tx.TxAttempts { + if txAttemptFilter(&txAttempt) { + txAttempts = append(txAttempts, txAttempt) + } + } + } + } + return txAttempts + } + + // if txIDs is empty then apply the filter to all transactions + for _, tx := range txIDsToTx { + if txFilter(tx) { + for _, txAttempt := range tx.TxAttempts { + if txAttemptFilter(&txAttempt) { + txAttempts = append(txAttempts, txAttempt) + } + } + } + } + + return txAttempts +} + +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FetchTxs( + txStates []txmgrtypes.TxState, + filter func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool, + txIDs ...int64, +) []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { + as.RLock() + defer as.RUnlock() + + // if txStates is empty then apply the filter to only the as.allTransactions map + if len(txStates) == 0 { + return as.fetchTxs(as.allTransactions, filter, txIDs...) + } + + var txs []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + for _, txState := range txStates { + switch txState { + case TxInProgress: + if as.inprogress != nil && filter(as.inprogress) { + txs = append(txs, *as.inprogress) + } + case TxUnconfirmed: + txs = append(txs, as.fetchTxs(as.unconfirmed, filter, txIDs...)...) + case TxConfirmedMissingReceipt: + txs = append(txs, as.fetchTxs(as.confirmedMissingReceipt, filter, txIDs...)...) + case TxConfirmed: + txs = append(txs, as.fetchTxs(as.confirmed, filter, txIDs...)...) + case TxFatalError: + txs = append(txs, as.fetchTxs(as.fatalErrored, filter, txIDs...)...) + } + } + + return txs +} + +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) fetchTxs( + txIDsToTx map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + filter func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool, + txIDs ...int64, +) []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { + as.RLock() + defer as.RUnlock() + + var txs []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + // if txIDs is not empty then only apply the filter to those transactions + if len(txIDs) > 0 { + for _, txID := range txIDs { + tx := txIDsToTx[txID] + if tx != nil && filter(tx) { + txs = append(txs, *tx) + } + } + return txs + } + + // if txIDs is empty then apply the filter to all transactions + for _, tx := range txIDsToTx { + if filter(tx) { + txs = append(txs, *tx) + } + } + + return txs +} + +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PruneUnstartedTxQueue(ids []int64) { + as.Lock() + defer as.Unlock() + + txs := as.unstarted.PruneByTxIDs(ids) + as.deleteTxs(txs...) +} + +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) DeleteTxs(txs ...txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { + as.Lock() + defer as.Unlock() + + as.deleteTxs(txs...) +} + +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deleteTxs(txs ...txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { + for _, tx := range txs { + if tx.IdempotencyKey != nil { + delete(as.idempotencyKeyToTx, *tx.IdempotencyKey) + } + txID := tx.ID + if as.inprogress != nil && as.inprogress.ID == txID { + as.inprogress = nil + } + delete(as.allTransactions, txID) + delete(as.unconfirmed, txID) + delete(as.confirmedMissingReceipt, txID) + delete(as.confirmed, txID) + delete(as.fatalErrored, txID) + as.unstarted.RemoveTxByID(txID) + } +} + +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PeekNextUnstartedTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + as.RLock() + defer as.RUnlock() + + tx := as.unstarted.PeekNextTx() + if tx == nil { + return nil, fmt.Errorf("peek_next_unstarted_tx: %w (address: %s)", ErrTxnNotFound, as.fromAddress) + } + + return tx, nil +} + +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PeekInProgressTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + as.RLock() + defer as.RUnlock() + + tx := as.inprogress + if tx == nil { + return nil, fmt.Errorf("peek_in_progress_tx: %w (address: %s)", ErrTxnNotFound, as.fromAddress) + } + + return tx, nil +} + +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) AddTxToUnstarted(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { + as.Lock() + defer as.Unlock() + + if as.unstarted.Len() >= as.unstarted.Cap() { + return fmt.Errorf("move_tx_to_unstarted: address %s unstarted queue capactiry has been reached", as.fromAddress) + } + + as.unstarted.AddTx(tx) + as.allTransactions[tx.ID] = tx + if tx.IdempotencyKey != nil { + as.idempotencyKeyToTx[*tx.IdempotencyKey] = tx + } + + return nil +} + +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveUnstartedToInProgress( + etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + txAttempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], +) error { + as.Lock() + defer as.Unlock() + + if as.inprogress != nil { + return fmt.Errorf("move_unstarted_to_in_progress: address %s already has a transaction in progress", as.fromAddress) + } + + tx := as.unstarted.RemoveTxByID(etx.ID) + if tx == nil { + return fmt.Errorf("move_unstarted_to_in_progress: no unstarted transaction to move to in_progress") + } + tx.State = TxInProgress + tx.TxAttempts = []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{*txAttempt} + tx.Sequence = etx.Sequence + tx.BroadcastAt = etx.BroadcastAt + tx.InitialBroadcastAt = etx.InitialBroadcastAt + + as.inprogress = tx + + return nil +} + +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveConfirmedMissingReceiptToUnconfirmed( + txID int64, +) error { + as.Lock() + defer as.Unlock() + + tx, ok := as.confirmedMissingReceipt[txID] + if !ok || tx == nil { + return fmt.Errorf("move_confirmed_missing_receipt_to_unconfirmed: no confirmed_missing_receipt transaction with ID %d: %w", txID, ErrTxnNotFound) + } + + tx.State = TxUnconfirmed + as.unconfirmed[tx.ID] = tx + delete(as.confirmedMissingReceipt, tx.ID) + + return nil +} + +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveInProgressToUnconfirmed( + etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + txAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], +) error { + as.Lock() + defer as.Unlock() + + tx := as.inprogress + if tx == nil { + return fmt.Errorf("move_in_progress_to_unconfirmed: no transaction in progress") + } + + tx.State = TxUnconfirmed + tx.Error = etx.Error + tx.BroadcastAt = etx.BroadcastAt + tx.InitialBroadcastAt = etx.InitialBroadcastAt + txAttempt.State = txmgrtypes.TxAttemptBroadcast + txAttempt.TxID = tx.ID + + for i := 0; i < len(tx.TxAttempts); i++ { + if tx.TxAttempts[i].ID == txAttempt.ID { + tx.TxAttempts[i] = txAttempt + break + } + } + + as.unconfirmed[tx.ID] = tx + as.inprogress = nil + + return nil +} + +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveUnconfirmedToConfirmed( + receipt txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], +) error { + as.Lock() + defer as.Unlock() + + txAttempt, ok := as.attemptHashToTxAttempt[receipt.GetTxHash()] + if !ok { + return fmt.Errorf("move_unconfirmed_to_confirmed: no unconfirmed transaction with receipt %v: %w", receipt, ErrTxnNotFound) + } + // TODO(jtw): not sure how to set blocknumber, transactionindex, and receipt on conflict + txAttempt.Receipts = []txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH]{receipt} + txAttempt.State = txmgrtypes.TxAttemptBroadcast + if txAttempt.BroadcastBeforeBlockNum == nil { + blockNum := receipt.GetBlockNumber().Int64() + txAttempt.BroadcastBeforeBlockNum = &blockNum + } + tx, ok := as.unconfirmed[txAttempt.TxID] + if !ok { + // TODO: WHAT SHOULD WE DO HERE? + // THIS WOULD BE A BIG BUG + return fmt.Errorf("move_unconfirmed_to_confirmed: no unconfirmed transaction with ID %d: %w", txAttempt.TxID, ErrTxnNotFound) + } + tx.State = TxConfirmed + + return nil +} + +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveUnstartedToFatalError( + etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + txError null.String, +) error { + as.Lock() + defer as.Unlock() + + tx := as.unstarted.RemoveTxByID(etx.ID) + if tx == nil { + return fmt.Errorf("move_unstarted_to_fatal_error: no unstarted transaction with ID %d", etx.ID) + } + + tx.State = TxFatalError + tx.Sequence = nil + tx.TxAttempts = nil + tx.InitialBroadcastAt = nil + tx.Error = txError + as.fatalErrored[tx.ID] = tx + + return nil +} + +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveInProgressToFatalError(txError null.String) error { + as.Lock() + defer as.Unlock() + + tx := as.inprogress + if tx == nil { + return fmt.Errorf("move_in_progress_to_fatal_error: no transaction in progress") + } + + tx.State = TxFatalError + tx.Sequence = nil + tx.TxAttempts = nil + tx.InitialBroadcastAt = nil + tx.Error = txError + as.fatalErrored[tx.ID] = tx + as.inprogress = nil + + return nil +} +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveConfirmedMissingReceiptToFatalError( + etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + txError null.String, +) error { + as.Lock() + defer as.Unlock() + + tx, ok := as.confirmedMissingReceipt[etx.ID] + if !ok || tx == nil { + return fmt.Errorf("move_confirmed_missing_receipt_to_fatal_error: no confirmed_missing_receipt transaction with ID %d: %w", etx.ID, ErrTxnNotFound) + } + + tx.State = TxFatalError + tx.Sequence = nil + tx.TxAttempts = nil + tx.InitialBroadcastAt = nil + tx.Error = txError + as.fatalErrored[tx.ID] = tx + delete(as.confirmedMissingReceipt, tx.ID) + + return nil +} + +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveUnconfirmedToConfirmedMissingReceipt(attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { + as.Lock() + defer as.Unlock() + + tx, ok := as.unconfirmed[attempt.TxID] + if !ok || tx == nil { + return fmt.Errorf("move_unconfirmed_to_confirmed_missing_receipt: no unconfirmed transaction with ID %d: %w", attempt.TxID, ErrTxnNotFound) + } + if tx.BroadcastAt.Before(broadcastAt) { + tx.BroadcastAt = &broadcastAt + } + tx.State = TxConfirmedMissingReceipt + tx.TxAttempts = []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{attempt} + tx.TxAttempts[0].State = txmgrtypes.TxAttemptBroadcast + + as.confirmedMissingReceipt[tx.ID] = tx + delete(as.unconfirmed, tx.ID) + + return nil +} +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveInProgressToConfirmedMissingReceipt(attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { + as.Lock() + defer as.Unlock() + + tx := as.inprogress + if tx == nil { + return fmt.Errorf("move_in_progress_to_confirmed_missing_receipt: no transaction in progress") + } + if tx.BroadcastAt.Before(broadcastAt) { + tx.BroadcastAt = &broadcastAt + } + tx.State = TxConfirmedMissingReceipt + tx.TxAttempts = []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{attempt} + tx.TxAttempts[0].State = txmgrtypes.TxAttemptBroadcast + + as.confirmedMissingReceipt[tx.ID] = tx + as.inprogress = nil + + return nil +} +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveConfirmedToUnconfirmed(attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { + as.Lock() + defer as.Unlock() + + if attempt.State != txmgrtypes.TxAttemptBroadcast { + return fmt.Errorf("move_confirmed_to_unconfirmed: attempt must be in broadcast state") + } + + tx, ok := as.confirmed[attempt.TxID] + if !ok || tx == nil { + return fmt.Errorf("move_confirmed_to_unconfirmed: no confirmed transaction with ID %d: %w", attempt.TxID, ErrTxnNotFound) + } + tx.State = TxUnconfirmed + + // Delete the receipt from the attempt + attempt.Receipts = nil + // Reset the broadcast information for the attempt + attempt.State = txmgrtypes.TxAttemptInProgress + attempt.BroadcastBeforeBlockNum = nil + tx.TxAttempts = []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{attempt} + + as.unconfirmed[tx.ID] = tx + delete(as.confirmed, tx.ID) + + return nil +} + +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) abandon() { + as.Lock() + defer as.Unlock() + + for as.unstarted.Len() > 0 { + tx := as.unstarted.RemoveNextTx() + as.abandonTx(tx) + } + + if as.inprogress != nil { + tx := as.inprogress + as.abandonTx(tx) + as.inprogress = nil + } + for _, tx := range as.unconfirmed { + as.abandonTx(tx) + } + + clear(as.unconfirmed) +} + +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) abandonTx(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { + if tx == nil { + return + } + + tx.State = TxFatalError + tx.Sequence = nil + tx.Error = null.NewString("abandoned", true) + + as.fatalErrored[tx.ID] = tx +} diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go new file mode 100644 index 00000000000..c62fb3070ff --- /dev/null +++ b/common/txmgr/inmemory_store.go @@ -0,0 +1,1894 @@ +package txmgr + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "math/big" + "sort" + "sync" + "time" + + "github.com/google/uuid" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types" + txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" + "github.com/smartcontractkit/chainlink/v2/common/types" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/label" + "gopkg.in/guregu/null.v4" +) + +var ( + // ErrInvalidChainID is returned when the chain ID is invalid + ErrInvalidChainID = errors.New("invalid chain ID") + // ErrTxnNotFound is returned when a transaction is not found + ErrTxnNotFound = errors.New("transaction not found") + // ErrExistingIdempotencyKey is returned when a transaction with the same idempotency key already exists + ErrExistingIdempotencyKey = errors.New("transaction with idempotency key already exists") + // ErrAddressNotFound is returned when an address is not found + ErrAddressNotFound = errors.New("address not found") + // ErrSequenceNotFound is returned when a sequence is not found + ErrSequenceNotFound = errors.New("sequence not found") + // ErrCouldNotGetReceipt is the error string we save if we reach our finality depth for a confirmed transaction without ever getting a receipt + // This most likely happened because an external wallet used the account for this nonce + ErrCouldNotGetReceipt = errors.New("could not get receipt") +) + +type InMemoryStore[ + CHAIN_ID types.ID, + ADDR, TX_HASH, BLOCK_HASH types.Hashable, + R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], + SEQ types.Sequence, + FEE feetypes.Fee, +] struct { + lggr logger.SugaredLogger + chainID CHAIN_ID + + keyStore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ] + txStore txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] + + addressStatesLock sync.RWMutex + addressStates map[ADDR]*AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] +} + +// NewInMemoryStore returns a new InMemoryStore +func NewInMemoryStore[ + CHAIN_ID types.ID, + ADDR, TX_HASH, BLOCK_HASH types.Hashable, + R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], + SEQ types.Sequence, + FEE feetypes.Fee, +]( + ctx context.Context, + lggr logger.SugaredLogger, + chainID CHAIN_ID, + keyStore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ], + txStore txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE], +) (*InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE], error) { + ms := InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{ + lggr: lggr, + chainID: chainID, + keyStore: keyStore, + txStore: txStore, + + addressStates: map[ADDR]*AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{}, + } + + maxUnstarted := 50 + addresses, err := keyStore.EnabledAddressesForChain(chainID) + if err != nil { + return nil, fmt.Errorf("new_in_memory_store: %w", err) + } + for _, fromAddr := range addresses { + txs, err := txStore.AllTransactions(ctx, fromAddr, chainID) + if err != nil { + return nil, fmt.Errorf("address_state: initialization: %w", err) + } + as, err := NewAddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE](lggr, chainID, fromAddr, maxUnstarted, txs) + if err != nil { + return nil, fmt.Errorf("new_in_memory_store: %w", err) + } + + ms.addressStates[fromAddr] = as + } + + return &ms, nil +} + +// CreateTransaction creates a new transaction for a given txRequest. +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CreateTransaction( + ctx context.Context, + txRequest txmgrtypes.TxRequest[ADDR, TX_HASH], + chainID CHAIN_ID, +) ( + tx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + err error, +) { + if ms.chainID.String() != chainID.String() { + return tx, fmt.Errorf("create_transaction: %w", ErrInvalidChainID) + } + + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + as, ok := ms.addressStates[txRequest.FromAddress] + if !ok { + return tx, fmt.Errorf("create_transaction: %w", ErrAddressNotFound) + } + + // Persist Transaction to persistent storage + tx, err = ms.txStore.CreateTransaction(ctx, txRequest, chainID) + if err != nil { + return tx, fmt.Errorf("create_transaction: %w", err) + } + + // Update in memory store + // Add the request to the Unstarted channel to be processed by the Broadcaster + if err := as.AddTxToUnstarted(&tx); err != nil { + return *ms.deepCopyTx(tx), fmt.Errorf("create_transaction: %w", err) + } + + return *ms.deepCopyTx(tx), nil +} + +// FindTxWithIdempotencyKey returns a transaction with the given idempotency key +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxWithIdempotencyKey(ctx context.Context, idempotencyKey string, chainID CHAIN_ID) (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + if ms.chainID.String() != chainID.String() { + return nil, fmt.Errorf("find_tx_with_idempotency_key: %w", ErrInvalidChainID) + } + + // Check if the transaction is in the pending queue of all address states + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + for _, as := range ms.addressStates { + if tx := as.FindTxWithIdempotencyKey(idempotencyKey); tx != nil { + return ms.deepCopyTx(*tx), nil + } + } + + return nil, fmt.Errorf("find_tx_with_idempotency_key: %w", ErrTxnNotFound) + +} + +// CheckTxQueueCapacity checks if the queue capacity has been reached for a given address +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CheckTxQueueCapacity(ctx context.Context, fromAddress ADDR, maxQueuedTransactions uint64, chainID CHAIN_ID) error { + if maxQueuedTransactions == 0 { + return nil + } + if ms.chainID.String() != chainID.String() { + return fmt.Errorf("check_tx_queue_capacity: %w", ErrInvalidChainID) + } + + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + as, ok := ms.addressStates[fromAddress] + if !ok { + return fmt.Errorf("check_tx_queue_capacity: %w", ErrAddressNotFound) + } + + count := uint64(as.CountTransactionsByState(TxUnstarted)) + if count >= maxQueuedTransactions { + return fmt.Errorf("check_tx_queue_capacity: cannot create transaction; too many unstarted transactions in the queue (%v/%v). %s", count, maxQueuedTransactions, label.MaxQueuedTransactionsWarning) + } + + return nil +} + +// FindLatestSequence returns the latest sequence number for a given address +// It is used to initialize the in-memory sequence map in the broadcaster +// TODO(jtw): this is until we have a abstracted Sequencer Component which can be used instead +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindLatestSequence(ctx context.Context, fromAddress ADDR, chainID CHAIN_ID) (SEQ, error) { + // Query the persistent store + return ms.txStore.FindLatestSequence(ctx, fromAddress, chainID) +} + +// CountUnconfirmedTransactions returns the number of unconfirmed transactions for a given address. +// Unconfirmed transactions are transactions that have been broadcast but not confirmed on-chain. +// NOTE(jtw): used to calculate total inflight transactions +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CountUnconfirmedTransactions(ctx context.Context, fromAddress ADDR, chainID CHAIN_ID) (uint32, error) { + if ms.chainID.String() != chainID.String() { + return 0, fmt.Errorf("count_unstarted_transactions: %w", ErrInvalidChainID) + } + + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + as, ok := ms.addressStates[fromAddress] + if !ok { + return 0, fmt.Errorf("count_unstarted_transactions: %w", ErrAddressNotFound) + } + + return uint32(as.CountTransactionsByState(TxUnconfirmed)), nil +} + +// CountUnstartedTransactions returns the number of unstarted transactions for a given address. +// Unstarted transactions are transactions that have not been broadcast yet. +// NOTE(jtw): used to calculate total inflight transactions +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CountUnstartedTransactions(ctx context.Context, fromAddress ADDR, chainID CHAIN_ID) (uint32, error) { + if ms.chainID.String() != chainID.String() { + return 0, fmt.Errorf("count_unstarted_transactions: %w", ErrInvalidChainID) + } + + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + as, ok := ms.addressStates[fromAddress] + if !ok { + return 0, fmt.Errorf("count_unstarted_transactions: %w", ErrAddressNotFound) + } + + return uint32(as.CountTransactionsByState(TxUnstarted)), nil +} + +// UpdateTxUnstartedToInProgress updates a transaction from unstarted to in_progress. +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxUnstartedToInProgress( + ctx context.Context, + tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], +) error { + if tx.Sequence == nil { + return fmt.Errorf("update_tx_unstarted_to_in_progress: in_progress transaction must have a sequence number") + } + if tx.State != TxUnstarted { + return fmt.Errorf("update_tx_unstarted_to_in_progress: can only transition to in_progress from unstarted, transaction is currently %s", tx.State) + } + if attempt.State != txmgrtypes.TxAttemptInProgress { + return fmt.Errorf("update_tx_unstarted_to_in_progress: attempt state must be in_progress") + } + + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + as, ok := ms.addressStates[tx.FromAddress] + if !ok { + return fmt.Errorf("update_tx_unstarted_to_in_progress: %w", ErrAddressNotFound) + } + + // Persist to persistent storage + if err := ms.txStore.UpdateTxUnstartedToInProgress(ctx, tx, attempt); err != nil { + return fmt.Errorf("update_tx_unstarted_to_in_progress: %w", err) + } + + // TODO: REDO THIS AND TAKE SOME OF THE LOGIC OUT OF MOVE UNSTARTED TO IN PROGRESS + // Update in address state in memory + if err := as.MoveUnstartedToInProgress(tx, attempt); err != nil { + return fmt.Errorf("update_tx_unstarted_to_in_progress: %w", err) + } + + return nil +} + +// GetTxInProgress returns the in_progress transaction for a given address. +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetTxInProgress(ctx context.Context, fromAddress ADDR) (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + as, ok := ms.addressStates[fromAddress] + if !ok { + return nil, fmt.Errorf("get_tx_in_progress: %w", ErrAddressNotFound) + } + + tx, err := as.PeekInProgressTx() + if tx == nil { + return nil, fmt.Errorf("get_tx_in_progress: %w", err) + } + + if len(tx.TxAttempts) != 1 || tx.TxAttempts[0].State != txmgrtypes.TxAttemptInProgress { + return nil, fmt.Errorf("get_tx_in_progress: invariant violation: expected in_progress transaction %v to have exactly one unsent attempt. "+ + "Your database is in an inconsistent state and this node will not function correctly until the problem is resolved", tx.ID) + } + + return ms.deepCopyTx(*tx), nil +} + +// UpdateTxAttemptInProgressToBroadcast updates a transaction attempt from in_progress to broadcast. +// It also updates the transaction state to unconfirmed. +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxAttemptInProgressToBroadcast( + ctx context.Context, + tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + newAttemptState txmgrtypes.TxAttemptState, +) error { + if tx.BroadcastAt == nil { + return fmt.Errorf("update_tx_attempt_in_progress_to_broadcast: unconfirmed transaction must have broadcast_at time") + } + if tx.InitialBroadcastAt == nil { + return fmt.Errorf("update_tx_attempt_in_progress_to_broadcast: unconfirmed transaction must have initial_broadcast_at time") + } + if tx.State != TxInProgress { + return fmt.Errorf("update_tx_attempt_in_progress_to_broadcast: can only transition to unconfirmed from in_progress, transaction is currently %s", tx.State) + } + if attempt.State != txmgrtypes.TxAttemptInProgress { + return fmt.Errorf("update_tx_attempt_in_progress_to_broadcast: attempt must be in in_progress state") + } + if newAttemptState != txmgrtypes.TxAttemptBroadcast { + return fmt.Errorf("update_tx_attempt_in_progress_to_broadcast: new attempt state must be broadcast, got: %s", newAttemptState) + } + + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + as, ok := ms.addressStates[tx.FromAddress] + if !ok { + return fmt.Errorf("update_tx_attempt_in_progress_to_broadcast: %w", ErrAddressNotFound) + } + + // Persist to persistent storage + if err := ms.txStore.UpdateTxAttemptInProgressToBroadcast(ctx, tx, attempt, newAttemptState); err != nil { + return fmt.Errorf("update_tx_attempt_in_progress_to_broadcast: %w", err) + } + + // Update in memory store + if err := as.MoveInProgressToUnconfirmed(*tx, attempt); err != nil { + return fmt.Errorf("update_tx_attempt_in_progress_to_broadcast: %w", err) + } + + return nil +} + +// FindNextUnstartedTransactionFromAddress returns the next unstarted transaction for a given address. +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindNextUnstartedTransactionFromAddress(_ context.Context, tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], fromAddress ADDR, chainID CHAIN_ID) error { + if ms.chainID.String() != chainID.String() { + return fmt.Errorf("find_next_unstarted_transaction_from_address: %w", ErrInvalidChainID) + } + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + as, ok := ms.addressStates[fromAddress] + if !ok { + return fmt.Errorf("find_next_unstarted_transaction_from_address: %w", ErrAddressNotFound) + } + + etx, err := as.PeekNextUnstartedTx() + if err != nil || etx == nil { + return fmt.Errorf("find_next_unstarted_transaction_from_address: %w", err) + } + tx = ms.deepCopyTx(*etx) + + return nil +} + +// SaveReplacementInProgressAttempt saves a replacement attempt for a transaction that is in_progress. +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveReplacementInProgressAttempt( + ctx context.Context, + oldAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + replacementAttempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], +) error { + if oldAttempt.State != txmgrtypes.TxAttemptInProgress || replacementAttempt.State != txmgrtypes.TxAttemptInProgress { + return fmt.Errorf("save_replacement_in_progress_attempt: expected attempts to be in_progress") + } + if oldAttempt.ID == 0 { + return fmt.Errorf("save_replacement_in_progress_attempt: expected oldattempt to have an ID") + } + + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + as, ok := ms.addressStates[oldAttempt.Tx.FromAddress] + if !ok { + return fmt.Errorf("save_replacement_in_progress_attempt: %w", ErrAddressNotFound) + } + + // Persist to persistent storage + if err := ms.txStore.SaveReplacementInProgressAttempt(ctx, oldAttempt, replacementAttempt); err != nil { + return fmt.Errorf("save_replacement_in_progress_attempt: %w", err) + } + + // Update in memory store + tx, err := as.PeekInProgressTx() + if tx == nil { + return fmt.Errorf("save_replacement_in_progress_attempt: %w", err) + } + + var found bool + for i := 0; i < len(tx.TxAttempts); i++ { + if tx.TxAttempts[i].ID == oldAttempt.ID { + tx.TxAttempts[i] = *replacementAttempt + found = true + } + } + if !found { + tx.TxAttempts = append(tx.TxAttempts, *replacementAttempt) + } + + return nil +} + +// UpdateTxFatalError updates a transaction to fatal_error. +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxFatalError(ctx context.Context, tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { + if tx.State != TxInProgress && tx.State != TxUnstarted { + return fmt.Errorf("update_tx_fatal_error: can only transition to fatal_error from in_progress, transaction is currently %s", tx.State) + } + if !tx.Error.Valid { + return fmt.Errorf("update_tx_fatal_error: expected error field to be set") + } + + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + as, ok := ms.addressStates[tx.FromAddress] + if !ok { + return fmt.Errorf("update_tx_fatal_error: %w", ErrAddressNotFound) + } + + // Persist to persistent storage + if err := ms.txStore.UpdateTxFatalError(ctx, tx); err != nil { + return fmt.Errorf("update_tx_fatal_error: %w", err) + } + + // Update in memory store + switch tx.State { + case TxInProgress: + if err := as.MoveInProgressToFatalError(tx.Error); err != nil { + return fmt.Errorf("update_tx_fatal_error: %w", err) + } + case TxUnstarted: + if err := as.MoveUnstartedToFatalError(*tx, tx.Error); err != nil { + return fmt.Errorf("update_tx_fatal_error: %w", err) + } + } + + return nil +} + +// Close closes the InMemoryStore +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Close() { + // Close the event recorder + ms.txStore.Close() + + // Clear all address states + ms.addressStatesLock.Lock() + for _, as := range ms.addressStates { + as.close() + } + clear(ms.addressStates) + ms.addressStatesLock.Unlock() +} + +// Abandon removes all transactions for a given address +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Abandon(ctx context.Context, chainID CHAIN_ID, addr ADDR) error { + if ms.chainID.String() != chainID.String() { + return fmt.Errorf("abandon: %w", ErrInvalidChainID) + } + + // Mark all persisted transactions as abandoned + if err := ms.txStore.Abandon(ctx, chainID, addr); err != nil { + return err + } + + // check that the address exists in the unstarted transactions + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + as, ok := ms.addressStates[addr] + if !ok { + return fmt.Errorf("abandon: %w", ErrAddressNotFound) + } + as.abandon() + + return nil +} + +// SetBroadcastBeforeBlockNum sets the broadcast_before_block_num for a given chain ID +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SetBroadcastBeforeBlockNum(ctx context.Context, blockNum int64, chainID CHAIN_ID) error { + if ms.chainID.String() != chainID.String() { + return fmt.Errorf("set_broadcast_before_block_num: %w", ErrInvalidChainID) + } + + // Persist to persistent storage + if err := ms.txStore.SetBroadcastBeforeBlockNum(ctx, blockNum, chainID); err != nil { + return fmt.Errorf("set_broadcast_before_block_num: %w", err) + } + + fn := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { + if tx.TxAttempts == nil || len(tx.TxAttempts) == 0 { + return + } + + for i := 0; i < len(tx.TxAttempts); i++ { + attempt := tx.TxAttempts[i] + if attempt.State == txmgrtypes.TxAttemptBroadcast && attempt.BroadcastBeforeBlockNum == nil { + tx.TxAttempts[i].BroadcastBeforeBlockNum = &blockNum + } + } + } + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + for _, as := range ms.addressStates { + as.ApplyToTxsByState(nil, fn) + } + + return nil +} + +// FindTxAttemptsConfirmedMissingReceipt returns all transactions that are confirmed but missing a receipt +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxAttemptsConfirmedMissingReceipt(ctx context.Context, chainID CHAIN_ID) ( + []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + error, +) { + if ms.chainID.String() != chainID.String() { + return nil, fmt.Errorf("find_next_unstarted_transaction_from_address: %w", ErrInvalidChainID) + } + + txFilter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + return tx.TxAttempts != nil && len(tx.TxAttempts) > 0 + } + txAttemptFilter := func(attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + return true + } + states := []txmgrtypes.TxState{TxConfirmedMissingReceipt} + attempts := []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + for _, as := range ms.addressStates { + attempts = append(attempts, as.FetchTxAttempts(states, txFilter, txAttemptFilter)...) + } + // TODO: FINISH THIS + // sort by tx_id ASC, gas_price DESC, gas_tip_cap DESC + sort.SliceStable(attempts, func(i, j int) bool { + /* + if attempts[i].TxID == attempts[j].TxID { + // sort by gas_price DESC + } + */ + + return attempts[i].TxID < attempts[j].TxID + }) + + // deep copy the attempts + var eAttempts []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + for _, attempt := range attempts { + eAttempts = append(eAttempts, ms.deepCopyTxAttempt(attempt.Tx, attempt)) + } + + return eAttempts, nil +} + +// UpdateBroadcastAts updates the broadcast_at time for a given set of attempts +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateBroadcastAts(ctx context.Context, now time.Time, txIDs []int64) error { + // Persist to persistent storage + if err := ms.txStore.UpdateBroadcastAts(ctx, now, txIDs); err != nil { + return err + } + + // Update in memory store + fn := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { + if tx.BroadcastAt != nil && tx.BroadcastAt.Before(now) { + tx.BroadcastAt = &now + } + } + + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + for _, as := range ms.addressStates { + as.ApplyToTxsByState(nil, fn, txIDs...) + } + + return nil +} + +// UpdateTxsUnconfirmed updates the unconfirmed transactions for a given set of ids +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxsUnconfirmed(ctx context.Context, txIDs []int64) error { + // Persist to persistent storage + if err := ms.txStore.UpdateTxsUnconfirmed(ctx, txIDs); err != nil { + return err + } + + // Update in memory store + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + wg := sync.WaitGroup{} + for _, as := range ms.addressStates { + wg.Add(1) + go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) { + for _, txID := range txIDs { + if err := as.MoveConfirmedMissingReceiptToUnconfirmed(txID); err != nil { + continue + } + } + wg.Done() + }(as) + } + wg.Wait() + + return nil +} + +// FindTxAttemptsRequiringReceiptFetch returns all transactions that are missing a receipt +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxAttemptsRequiringReceiptFetch(ctx context.Context, chainID CHAIN_ID) ( + attempts []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + err error, +) { + if ms.chainID.String() != chainID.String() { + return attempts, fmt.Errorf("find_tx_attempts_requiring_receipt_fetch: %w", ErrInvalidChainID) + } + + txFilterFn := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + return tx.TxAttempts != nil && len(tx.TxAttempts) > 0 + } + txAttemptFilterFn := func(attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + return attempt.State != txmgrtypes.TxAttemptInsufficientFunds + } + states := []txmgrtypes.TxState{TxUnconfirmed, TxConfirmedMissingReceipt} + attempts = []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + for _, as := range ms.addressStates { + attempts = append(attempts, as.FetchTxAttempts(states, txFilterFn, txAttemptFilterFn)...) + } + // sort by sequence ASC, gas_price DESC, gas_tip_cap DESC + sort.Slice(attempts, func(i, j int) bool { + return (*attempts[i].Tx.Sequence).Int64() < (*attempts[j].Tx.Sequence).Int64() + }) + + // deep copy the attempts + var eAttempts []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + for _, attempt := range attempts { + eAttempts = append(eAttempts, ms.deepCopyTxAttempt(attempt.Tx, attempt)) + } + + return eAttempts, nil +} + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID CHAIN_ID) ( + []txmgrtypes.ReceiptPlus[R], + error, +) { + if ms.chainID.String() != chainID.String() { + return nil, fmt.Errorf("find_txes_pending_callback: %w", ErrInvalidChainID) + } + + filterFn := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + if tx.TxAttempts == nil || len(tx.TxAttempts) == 0 { + return false + } + + // TODO: loop through all attempts since any of them can have a receipt + if tx.TxAttempts[0].Receipts == nil || len(tx.TxAttempts[0].Receipts) == 0 { + return false + } + + if tx.PipelineTaskRunID.Valid && tx.SignalCallback && !tx.CallbackCompleted && + tx.TxAttempts[0].Receipts[0].GetBlockNumber() != nil && + big.NewInt(blockNum-int64(tx.MinConfirmations.Uint32)).Cmp(tx.TxAttempts[0].Receipts[0].GetBlockNumber()) > 0 { + return true + } + + return false + + } + states := []txmgrtypes.TxState{TxConfirmed} + txs := []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + for _, as := range ms.addressStates { + txs = append(txs, as.FetchTxs(states, filterFn)...) + } + + receiptsPlus := make([]txmgrtypes.ReceiptPlus[R], len(txs)) + meta := map[string]interface{}{} + for i, tx := range txs { + if err := json.Unmarshal(json.RawMessage(*tx.Meta), &meta); err != nil { + return nil, err + } + failOnRevert := false + if v, ok := meta["FailOnRevert"].(bool); ok { + failOnRevert = v + } + + receiptsPlus[i] = txmgrtypes.ReceiptPlus[R]{ + ID: tx.PipelineTaskRunID.UUID, + Receipt: (tx.TxAttempts[0].Receipts[0]).(R), + FailOnRevert: failOnRevert, + } + clear(meta) + } + + return receiptsPlus, nil +} + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxCallbackCompleted(ctx context.Context, pipelineTaskRunRid uuid.UUID, chainId CHAIN_ID) error { + if ms.chainID.String() != chainId.String() { + return fmt.Errorf("update_tx_callback_completed: %w", ErrInvalidChainID) + } + + // Persist to persistent storage + if err := ms.txStore.UpdateTxCallbackCompleted(ctx, pipelineTaskRunRid, chainId); err != nil { + return err + } + + // Update in memory store + fn := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { + if tx.PipelineTaskRunID.UUID == pipelineTaskRunRid { + tx.CallbackCompleted = true + } + } + wg := sync.WaitGroup{} + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + for _, as := range ms.addressStates { + wg.Add(1) + go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) { + as.ApplyToTxsByState(nil, fn) + wg.Done() + }(as) + } + wg.Wait() + + return nil +} + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveFetchedReceipts(ctx context.Context, receipts []R, chainID CHAIN_ID) error { + if ms.chainID.String() != chainID.String() { + return fmt.Errorf("save_fetched_receipts: %w", ErrInvalidChainID) + } + + // Persist to persistent storage + if err := ms.txStore.SaveFetchedReceipts(ctx, receipts, chainID); err != nil { + return err + } + + // Update in memory store + errsLock := sync.Mutex{} + var errs error + wg := sync.WaitGroup{} + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + for _, as := range ms.addressStates { + wg.Add(1) + go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) { + for _, receipt := range receipts { + if err := as.MoveUnconfirmedToConfirmed(receipt); err != nil { + errsLock.Lock() + errs = errors.Join(errs, err) + errsLock.Unlock() + } + } + wg.Done() + }(as) + } + wg.Wait() + + return errs +} + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesByMetaFieldAndStates(ctx context.Context, metaField string, metaValue string, states []txmgrtypes.TxState, chainID *big.Int) ( + []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + error, +) { + if ms.chainID.String() != chainID.String() { + return nil, fmt.Errorf("find_txes_by_meta_field_and_states: %w", ErrInvalidChainID) + } + + filterFn := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + if tx.Meta == nil { + return false + } + meta := map[string]interface{}{} + if err := json.Unmarshal(json.RawMessage(*tx.Meta), &meta); err != nil { + return false + } + if v, ok := meta[metaField].(string); ok { + return v == metaValue + } + + return false + } + txsLock := sync.Mutex{} + txs := []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} + wg := sync.WaitGroup{} + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + for _, as := range ms.addressStates { + wg.Add(1) + go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) { + for _, tx := range as.FetchTxs(states, filterFn) { + etx := ms.deepCopyTx(tx) + txsLock.Lock() + txs = append(txs, etx) + txsLock.Unlock() + } + wg.Done() + }(as) + } + wg.Wait() + + return txs, nil +} +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesWithMetaFieldByStates(ctx context.Context, metaField string, states []txmgrtypes.TxState, chainID *big.Int) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + if ms.chainID.String() != chainID.String() { + return nil, fmt.Errorf("find_txes_with_meta_field_by_states: %w", ErrInvalidChainID) + } + + filterFn := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + if tx.Meta == nil { + return false + } + meta := map[string]interface{}{} + if err := json.Unmarshal(json.RawMessage(*tx.Meta), &meta); err != nil { + return false + } + if _, ok := meta[metaField]; ok { + return true + } + + return false + } + + txsLock := sync.Mutex{} + txs := []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} + wg := sync.WaitGroup{} + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + for _, as := range ms.addressStates { + wg.Add(1) + go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) { + for _, tx := range as.FetchTxs(states, filterFn) { + etx := ms.deepCopyTx(tx) + txsLock.Lock() + txs = append(txs, etx) + txsLock.Unlock() + } + wg.Done() + }(as) + } + wg.Wait() + + return txs, nil +} + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesWithMetaFieldByReceiptBlockNum(ctx context.Context, metaField string, blockNum int64, chainID *big.Int) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + if ms.chainID.String() != chainID.String() { + return nil, fmt.Errorf("find_txes_with_meta_field_by_receipt_block_num: %w", ErrInvalidChainID) + } + + filterFn := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + if tx.Meta == nil { + return false + } + meta := map[string]interface{}{} + if err := json.Unmarshal(json.RawMessage(*tx.Meta), &meta); err != nil { + return false + } + if _, ok := meta[metaField]; !ok { + return false + } + if tx.TxAttempts == nil || len(tx.TxAttempts) == 0 { + return false + } + + for _, attempt := range tx.TxAttempts { + if attempt.Receipts == nil || len(attempt.Receipts) == 0 { + continue + } + if attempt.Receipts[0].GetBlockNumber() == nil { + continue + } + return attempt.Receipts[0].GetBlockNumber().Int64() >= blockNum + } + + return false + } + + txsLock := sync.Mutex{} + txs := []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} + wg := sync.WaitGroup{} + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + for _, as := range ms.addressStates { + wg.Add(1) + go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) { + for _, tx := range as.FetchTxs(nil, filterFn) { + etx := ms.deepCopyTx(tx) + txsLock.Lock() + txs = append(txs, etx) + txsLock.Unlock() + } + wg.Done() + }(as) + } + wg.Wait() + + return txs, nil +} + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesWithAttemptsAndReceiptsByIdsAndState(ctx context.Context, ids []big.Int, states []txmgrtypes.TxState, chainID *big.Int) (tx []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) { + if ms.chainID.String() != chainID.String() { + return nil, fmt.Errorf("find_txes_with_attempts_and_receipts_by_ids_and_state: %w", ErrInvalidChainID) + } + + filterFn := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + return true + } + + txIDs := make([]int64, len(ids)) + for i, id := range ids { + txIDs[i] = id.Int64() + } + + txsLock := sync.Mutex{} + txs := []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} + wg := sync.WaitGroup{} + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + for _, as := range ms.addressStates { + wg.Add(1) + go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) { + for _, tx := range as.FetchTxs(states, filterFn, txIDs...) { + etx := ms.deepCopyTx(tx) + txsLock.Lock() + txs = append(txs, etx) + txsLock.Unlock() + } + wg.Done() + }(as) + } + wg.Wait() + + return txs, nil +} + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PruneUnstartedTxQueue(ctx context.Context, queueSize uint32, subject uuid.UUID) ([]int64, error) { + // Persist to persistent storage + ids, err := ms.txStore.PruneUnstartedTxQueue(ctx, queueSize, subject) + if err != nil { + return ids, err + } + + // Update in memory store + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + for _, as := range ms.addressStates { + as.PruneUnstartedTxQueue(ids) + } + + return ids, nil +} + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ReapTxHistory(ctx context.Context, minBlockNumberToKeep int64, timeThreshold time.Time, chainID CHAIN_ID) error { + if ms.chainID.String() != chainID.String() { + return fmt.Errorf("reap_tx_history: %w", ErrInvalidChainID) + } + + // Persist to persistent storage + if err := ms.txStore.ReapTxHistory(ctx, minBlockNumberToKeep, timeThreshold, chainID); err != nil { + return err + } + + // Update in memory store + states := []txmgrtypes.TxState{TxConfirmed} + filterFn := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + if tx.TxAttempts == nil || len(tx.TxAttempts) == 0 { + return false + } + for _, attempt := range tx.TxAttempts { + if attempt.Receipts == nil || len(attempt.Receipts) == 0 { + continue + } + if attempt.Receipts[0].GetBlockNumber() == nil { + continue + } + if attempt.Receipts[0].GetBlockNumber().Int64() >= minBlockNumberToKeep { + continue + } + if tx.CreatedAt.After(timeThreshold) { + continue + } + return tx.State == TxConfirmed + } + return false + } + + wg := sync.WaitGroup{} + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + for _, as := range ms.addressStates { + wg.Add(1) + go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) { + as.DeleteTxs(as.FetchTxs(states, filterFn)...) + wg.Done() + }(as) + } + wg.Wait() + + filterFn = func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + return tx.State == TxFatalError && tx.CreatedAt.Before(timeThreshold) + } + states = []txmgrtypes.TxState{TxFatalError} + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + for _, as := range ms.addressStates { + wg.Add(1) + go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) { + as.DeleteTxs(as.FetchTxs(states, filterFn)...) + wg.Done() + }(as) + } + wg.Wait() + + return nil +} +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CountTransactionsByState(_ context.Context, state txmgrtypes.TxState, chainID CHAIN_ID) (uint32, error) { + if ms.chainID.String() != chainID.String() { + return 0, fmt.Errorf("count_transactions_by_state: %w", ErrInvalidChainID) + } + + var total int + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + for _, as := range ms.addressStates { + total += as.CountTransactionsByState(state) + } + + return uint32(total), nil +} + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) DeleteInProgressAttempt(ctx context.Context, attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { + if attempt.State != txmgrtypes.TxAttemptInProgress { + return fmt.Errorf("delete_in_progress_attempt: expected attempt to be in_progress") + } + if attempt.ID == 0 { + return fmt.Errorf("delete_in_progress_attempt: expected attempt to have an ID") + } + + // Check if fromaddress enabled + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + as, ok := ms.addressStates[attempt.Tx.FromAddress] + if !ok { + return fmt.Errorf("delete_in_progress_attempt: %w", ErrAddressNotFound) + } + + // Persist to persistent storage + if err := ms.txStore.DeleteInProgressAttempt(ctx, attempt); err != nil { + return fmt.Errorf("delete_in_progress_attempt: %w", err) + } + + // Update in memory store + filter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + if tx.TxAttempts == nil || len(tx.TxAttempts) == 0 { + return false + } + + for _, a := range tx.TxAttempts { + if a.ID == attempt.ID { + return true + } + } + return false + } + as.DeleteTxs(as.FetchTxs(nil, filter)...) + + return nil +} + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxsRequiringResubmissionDueToInsufficientFunds(_ context.Context, address ADDR, chainID CHAIN_ID) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + if ms.chainID.String() != chainID.String() { + return nil, fmt.Errorf("find_txs_requiring_resubmission_due_to_insufficient_funds: %w", ErrInvalidChainID) + } + + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + as, ok := ms.addressStates[address] + if !ok { + return nil, fmt.Errorf("find_txs_requiring_resubmission_due_to_insufficient_funds: %w", ErrAddressNotFound) + } + + filter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + if tx.TxAttempts == nil || len(tx.TxAttempts) == 0 { + return false + } + for _, attempt := range tx.TxAttempts { + if attempt.State == txmgrtypes.TxAttemptInsufficientFunds { + return true + } + } + return false + } + states := []txmgrtypes.TxState{TxUnconfirmed} + txs := as.FetchTxs(states, filter) + // sort by sequence ASC + sort.Slice(txs, func(i, j int) bool { + return (*txs[i].Sequence).Int64() < (*txs[j].Sequence).Int64() + }) + + etxs := make([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], len(txs)) + for i, tx := range txs { + etxs[i] = ms.deepCopyTx(tx) + } + + return etxs, nil +} + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxAttemptsRequiringResend(_ context.Context, olderThan time.Time, maxInFlightTransactions uint32, chainID CHAIN_ID, address ADDR) ([]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + if ms.chainID.String() != chainID.String() { + return nil, fmt.Errorf("find_tx_attempts_requiring_resend: %w", ErrInvalidChainID) + } + + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + as, ok := ms.addressStates[address] + if !ok { + return nil, fmt.Errorf("find_tx_attempts_requiring_resend: %w", ErrAddressNotFound) + } + + txFilter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + if tx.TxAttempts == nil || len(tx.TxAttempts) == 0 { + return false + } + return tx.BroadcastAt.Before(olderThan) || tx.BroadcastAt.Equal(olderThan) + } + txAttemptFilter := func(attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + return attempt.State != txmgrtypes.TxAttemptInProgress + } + states := []txmgrtypes.TxState{TxUnconfirmed, TxConfirmedMissingReceipt} + attempts := as.FetchTxAttempts(states, txFilter, txAttemptFilter) + // sort by sequence ASC, gas_price DESC, gas_tip_cap DESC + sort.Slice(attempts, func(i, j int) bool { + return (*attempts[i].Tx.Sequence).Int64() < (*attempts[j].Tx.Sequence).Int64() + }) + // LIMIT by maxInFlightTransactions + if len(attempts) > int(maxInFlightTransactions) { + attempts = attempts[:maxInFlightTransactions] + } + + // deep copy the attempts + var eAttempts []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + for _, attempt := range attempts { + eAttempts = append(eAttempts, ms.deepCopyTxAttempt(attempt.Tx, attempt)) + } + + return eAttempts, nil +} + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxWithSequence(_ context.Context, fromAddress ADDR, seq SEQ) (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + as, ok := ms.addressStates[fromAddress] + if !ok { + return nil, fmt.Errorf("find_tx_with_sequence: %w", ErrAddressNotFound) + } + + filter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + if tx.Sequence == nil { + return false + } + + return (*tx.Sequence).String() == seq.String() + } + states := []txmgrtypes.TxState{TxConfirmed, TxConfirmedMissingReceipt, TxUnconfirmed} + txs := as.FetchTxs(states, filter) + if len(txs) == 0 { + return nil, nil + } + + return ms.deepCopyTx(txs[0]), nil +} + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTransactionsConfirmedInBlockRange(_ context.Context, highBlockNumber, lowBlockNumber int64, chainID CHAIN_ID) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + if ms.chainID.String() != chainID.String() { + return nil, fmt.Errorf("find_transactions_confirmed_in_block_range: %w", ErrInvalidChainID) + } + + filter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + if tx.TxAttempts == nil || len(tx.TxAttempts) == 0 { + return false + } + for _, attempt := range tx.TxAttempts { + if attempt.State != txmgrtypes.TxAttemptBroadcast { + continue + } + if len(attempt.Receipts) == 0 { + continue + } + if attempt.Receipts[0].GetBlockNumber() == nil { + continue + } + blockNum := attempt.Receipts[0].GetBlockNumber().Int64() + if blockNum >= lowBlockNumber && blockNum <= highBlockNumber { + return true + } + } + + return false + } + states := []txmgrtypes.TxState{TxConfirmed, TxConfirmedMissingReceipt} + txsLock := sync.Mutex{} + txs := []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} + wg := sync.WaitGroup{} + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + for _, as := range ms.addressStates { + wg.Add(1) + go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) { + ts := as.FetchTxs(states, filter) + txsLock.Lock() + txs = append(txs, ts...) + txsLock.Unlock() + wg.Done() + }(as) + } + wg.Wait() + // sort by sequence ASC + sort.Slice(txs, func(i, j int) bool { + return (*txs[i].Sequence).Int64() < (*txs[j].Sequence).Int64() + }) + + etxs := make([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], len(txs)) + for i, tx := range txs { + etxs[i] = ms.deepCopyTx(tx) + } + + return etxs, nil +} +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindEarliestUnconfirmedBroadcastTime(ctx context.Context, chainID CHAIN_ID) (null.Time, error) { + if ms.chainID.String() != chainID.String() { + return null.Time{}, fmt.Errorf("find_earliest_unconfirmed_broadcast_time: %w", ErrInvalidChainID) + } + + filter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + return tx.InitialBroadcastAt != nil + } + states := []txmgrtypes.TxState{TxUnconfirmed} + txsLock := sync.Mutex{} + txs := []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} + wg := sync.WaitGroup{} + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + for _, as := range ms.addressStates { + wg.Add(1) + go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) { + etxs := as.FetchTxs(states, filter) + txsLock.Lock() + txs = append(txs, etxs...) + txsLock.Unlock() + wg.Done() + }(as) + } + wg.Wait() + + var minInitialBroadcastAt time.Time + for _, tx := range txs { + if tx.InitialBroadcastAt.Before(minInitialBroadcastAt) { + minInitialBroadcastAt = *tx.InitialBroadcastAt + } + } + + return null.TimeFrom(minInitialBroadcastAt), nil +} + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context, chainID CHAIN_ID) (null.Int, error) { + if ms.chainID.String() != chainID.String() { + return null.Int{}, fmt.Errorf("find_earliest_unconfirmed_broadcast_time: %w", ErrInvalidChainID) + } + + filter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + if tx.TxAttempts == nil || len(tx.TxAttempts) == 0 { + return false + } + + for _, attempt := range tx.TxAttempts { + if attempt.BroadcastBeforeBlockNum != nil { + return true + } + } + + return false + } + states := []txmgrtypes.TxState{TxUnconfirmed} + txsLock := sync.Mutex{} + txs := []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} + wg := sync.WaitGroup{} + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + for _, as := range ms.addressStates { + wg.Add(1) + go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) { + etxs := as.FetchTxs(states, filter) + txsLock.Lock() + txs = append(txs, etxs...) + txsLock.Unlock() + wg.Done() + }(as) + } + wg.Wait() + + var minBroadcastBeforeBlockNum int64 + for _, tx := range txs { + if *tx.TxAttempts[0].BroadcastBeforeBlockNum < minBroadcastBeforeBlockNum { + minBroadcastBeforeBlockNum = *tx.TxAttempts[0].BroadcastBeforeBlockNum + } + } + + return null.IntFrom(minBroadcastBeforeBlockNum), nil +} + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetInProgressTxAttempts(ctx context.Context, address ADDR, chainID CHAIN_ID) ([]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + if ms.chainID.String() != chainID.String() { + return nil, fmt.Errorf("get_in_progress_tx_attempts: %w", ErrInvalidChainID) + } + + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + as, ok := ms.addressStates[address] + if !ok { + return nil, fmt.Errorf("get_in_progress_tx_attempts: %w", ErrAddressNotFound) + } + + txFilter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + return tx.TxAttempts != nil && len(tx.TxAttempts) > 0 + } + txAttemptFilter := func(attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + return attempt.State == txmgrtypes.TxAttemptInProgress + } + states := []txmgrtypes.TxState{TxConfirmed, TxConfirmedMissingReceipt, TxUnconfirmed} + attempts := as.FetchTxAttempts(states, txFilter, txAttemptFilter) + + // deep copy the attempts + var eAttempts []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + for _, attempt := range attempts { + eAttempts = append(eAttempts, ms.deepCopyTxAttempt(attempt.Tx, attempt)) + } + + return eAttempts, nil +} + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetNonFatalTransactions(ctx context.Context, chainID CHAIN_ID) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + if ms.chainID.String() != chainID.String() { + return nil, fmt.Errorf("get_non_fatal_transactions: %w", ErrInvalidChainID) + } + + filter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + return tx.State != TxFatalError + } + txsLock := sync.Mutex{} + txs := []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} + wg := sync.WaitGroup{} + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + for _, as := range ms.addressStates { + wg.Add(1) + go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) { + etxs := as.FetchTxs(nil, filter) + txsLock.Lock() + txs = append(txs, etxs...) + txsLock.Unlock() + wg.Done() + }(as) + } + wg.Wait() + + etxs := make([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], len(txs)) + for i, tx := range txs { + etxs[i] = ms.deepCopyTx(tx) + } + + return etxs, nil +} + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetTxByID(_ context.Context, id int64) (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + filter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + return tx.ID == id + } + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + for _, as := range ms.addressStates { + txs := as.FetchTxs(nil, filter, id) + if len(txs) > 0 { + return ms.deepCopyTx(txs[0]), nil + } + } + + return nil, fmt.Errorf("failed to get tx with id: %v", id) + +} + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) HasInProgressTransaction(_ context.Context, account ADDR, chainID CHAIN_ID) (bool, error) { + if ms.chainID.String() != chainID.String() { + return false, fmt.Errorf("has_in_progress_transaction: %w", ErrInvalidChainID) + } + + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + as, ok := ms.addressStates[account] + if !ok { + return false, fmt.Errorf("has_in_progress_transaction: %w", ErrAddressNotFound) + } + + n := as.CountTransactionsByState(TxInProgress) + + return n > 0, nil +} + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) LoadTxAttempts(_ context.Context, etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + as, ok := ms.addressStates[etx.FromAddress] + if !ok { + return fmt.Errorf("load_tx_attempts: %w", ErrAddressNotFound) + } + + filter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + return tx.ID == etx.ID + } + txAttempts := []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} + for _, tx := range as.FetchTxs(nil, filter, etx.ID) { + for _, txAttempt := range tx.TxAttempts { + txAttempts = append(txAttempts, ms.deepCopyTxAttempt(*etx, txAttempt)) + } + } + etx.TxAttempts = txAttempts + + return nil +} +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PreloadTxes(_ context.Context, attempts []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { + if len(attempts) == 0 { + return nil + } + + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + as, ok := ms.addressStates[attempts[0].Tx.FromAddress] + if !ok { + return fmt.Errorf("preload_txes: %w", ErrAddressNotFound) + } + + txIDs := make([]int64, len(attempts)) + for i, attempt := range attempts { + txIDs[i] = attempt.TxID + } + filter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + return true + } + txs := as.FetchTxs(nil, filter, txIDs...) + for i, attempt := range attempts { + for _, tx := range txs { + if tx.ID == attempt.TxID { + attempts[i].Tx = *ms.deepCopyTx(tx) + } + } + } + + return nil +} +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveConfirmedMissingReceiptAttempt(ctx context.Context, timeout time.Duration, attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + as, ok := ms.addressStates[attempt.Tx.FromAddress] + if !ok { + return fmt.Errorf("save_confirmed_missing_receipt_attempt: %w", ErrAddressNotFound) + } + if attempt.State != txmgrtypes.TxAttemptInProgress { + return fmt.Errorf("expected state to be in_progress") + } + + // Persist to persistent storage + if err := ms.txStore.SaveConfirmedMissingReceiptAttempt(ctx, timeout, attempt, broadcastAt); err != nil { + return err + } + + // Update in memory store + return as.MoveInProgressToConfirmedMissingReceipt(*attempt, broadcastAt) +} +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveInProgressAttempt(ctx context.Context, attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + as, ok := ms.addressStates[attempt.Tx.FromAddress] + if !ok { + return fmt.Errorf("save_in_progress_attempt: %w", ErrAddressNotFound) + } + if attempt.State != txmgrtypes.TxAttemptInProgress { + return fmt.Errorf("SaveInProgressAttempt failed: attempt state must be in_progress") + } + + // Persist to persistent storage + if err := ms.txStore.SaveInProgressAttempt(ctx, attempt); err != nil { + return err + } + + // Update in memory store + fn := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { + if tx.ID != attempt.TxID { + return + } + if tx.TxAttempts != nil && len(tx.TxAttempts) > 0 { + for i := 0; i < len(tx.TxAttempts); i++ { + if tx.TxAttempts[i].ID == attempt.ID { + tx.TxAttempts[i].State = txmgrtypes.TxAttemptInProgress + tx.TxAttempts[i].BroadcastBeforeBlockNum = attempt.BroadcastBeforeBlockNum + return + } + } + } + tx.TxAttempts = []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{*attempt} + } + as.ApplyToTxsByState(nil, fn, attempt.TxID) + + return nil +} +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveInsufficientFundsAttempt(ctx context.Context, timeout time.Duration, attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + as, ok := ms.addressStates[attempt.Tx.FromAddress] + if !ok { + return fmt.Errorf("save_insufficient_funds_attempt: %w", ErrAddressNotFound) + } + if !(attempt.State == txmgrtypes.TxAttemptInProgress || attempt.State == txmgrtypes.TxAttemptInsufficientFunds) { + return fmt.Errorf("expected state to be in_progress or insufficient_funds") + } + + // Persist to persistent storage + if err := ms.txStore.SaveInsufficientFundsAttempt(ctx, timeout, attempt, broadcastAt); err != nil { + return err + } + + // Update in memory store + fn := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { + if tx.ID != attempt.TxID { + return + } + if tx.TxAttempts == nil || len(tx.TxAttempts) == 0 { + return + } + if tx.BroadcastAt.Before(broadcastAt) { + tx.BroadcastAt = &broadcastAt + } + + tx.TxAttempts[0].State = txmgrtypes.TxAttemptInsufficientFunds + } + as.ApplyToTxsByState(nil, fn, attempt.TxID) + + return nil +} +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveSentAttempt(ctx context.Context, timeout time.Duration, attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + as, ok := ms.addressStates[attempt.Tx.FromAddress] + if !ok { + return fmt.Errorf("save_sent_attempt: %w", ErrAddressNotFound) + } + + if attempt.State != txmgrtypes.TxAttemptInProgress { + return fmt.Errorf("expected state to be in_progress") + } + + // Persist to persistent storage + if err := ms.txStore.SaveSentAttempt(ctx, timeout, attempt, broadcastAt); err != nil { + return err + } + + // Update in memory store + fn := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { + if tx.ID != attempt.TxID { + return + } + if tx.TxAttempts == nil || len(tx.TxAttempts) == 0 { + return + } + if tx.BroadcastAt.Before(broadcastAt) { + tx.BroadcastAt = &broadcastAt + } + + tx.TxAttempts[0].State = txmgrtypes.TxAttemptBroadcast + } + as.ApplyToTxsByState(nil, fn, attempt.TxID) + + return nil +} +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxForRebroadcast(ctx context.Context, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], etxAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + as, ok := ms.addressStates[etx.FromAddress] + if !ok { + return fmt.Errorf("update_tx_for_rebroadcast: %w", ErrAddressNotFound) + } + + // Persist to persistent storage + if err := ms.txStore.UpdateTxForRebroadcast(ctx, etx, etxAttempt); err != nil { + return err + } + + // Update in memory store + return as.MoveConfirmedToUnconfirmed(etxAttempt) +} +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) IsTxFinalized(ctx context.Context, blockHeight int64, txID int64, chainID CHAIN_ID) (bool, error) { + if ms.chainID.String() != chainID.String() { + return false, fmt.Errorf("is_tx_finalized: %w", ErrInvalidChainID) + } + + txFilter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + if tx.ID != txID { + return false + } + + for _, attempt := range tx.TxAttempts { + if attempt.Receipts == nil || len(attempt.Receipts) == 0 { + continue + } + // there can only be one receipt per attempt + if attempt.Receipts[0].GetBlockNumber() == nil { + continue + } + + return attempt.Receipts[0].GetBlockNumber().Int64() <= (blockHeight - int64(tx.MinConfirmations.Uint32)) + } + + return false + } + txAttemptFilter := func(attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + return attempt.Receipts != nil && len(attempt.Receipts) > 0 + } + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + for _, as := range ms.addressStates { + txas := as.FetchTxAttempts(nil, txFilter, txAttemptFilter, txID) + if len(txas) > 0 { + return true, nil + } + } + + return false, nil +} + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxsRequiringGasBump(ctx context.Context, address ADDR, blockNum, gasBumpThreshold, depth int64, chainID CHAIN_ID) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + if ms.chainID.String() != chainID.String() { + return nil, fmt.Errorf("find_txs_requiring_gas_bump: %w", ErrInvalidChainID) + } + if gasBumpThreshold == 0 { + return nil, nil + } + + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + as, ok := ms.addressStates[address] + if !ok { + return nil, fmt.Errorf("find_txs_requiring_gas_bump: %w", ErrAddressNotFound) + } + + filter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + if tx.TxAttempts == nil || len(tx.TxAttempts) == 0 { + return false + } + attempt := tx.TxAttempts[0] + if *attempt.BroadcastBeforeBlockNum <= blockNum || + attempt.State == txmgrtypes.TxAttemptBroadcast { + return false + } + + if tx.State != TxUnconfirmed || + attempt.ID != 0 { + return false + } + + return true + } + states := []txmgrtypes.TxState{TxUnconfirmed} + txs := as.FetchTxs(states, filter) + // sort by sequence ASC + sort.Slice(txs, func(i, j int) bool { + return (*txs[i].Sequence).Int64() < (*txs[j].Sequence).Int64() + }) + + if depth > 0 { + // LIMIT by depth + if len(txs) > int(depth) { + txs = txs[:depth] + } + } + + etxs := make([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], len(txs)) + for i, tx := range txs { + etxs[i] = ms.deepCopyTx(tx) + } + + return etxs, nil +} +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MarkAllConfirmedMissingReceipt(ctx context.Context, chainID CHAIN_ID) error { + if ms.chainID.String() != chainID.String() { + return fmt.Errorf("mark_all_confirmed_missing_receipt: %w", ErrInvalidChainID) + } + + // Persist to persistent storage + if err := ms.txStore.MarkAllConfirmedMissingReceipt(ctx, chainID); err != nil { + return err + } + + // Update in memory store + wg := sync.WaitGroup{} + errsLock := sync.Mutex{} + var errs error + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + for _, as := range ms.addressStates { + wg.Add(1) + go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) { + // TODO(jtw): THIS IS EVM SPECIFIC THIS SHOULD BE GENERALIZED + // Get the max confirmed sequence + filter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { return true } + states := []txmgrtypes.TxState{TxConfirmed} + txs := as.FetchTxs(states, filter) + var maxConfirmedSequence SEQ + for _, tx := range txs { + if tx.Sequence == nil { + continue + } + if (*tx.Sequence).Int64() > maxConfirmedSequence.Int64() { + maxConfirmedSequence = *tx.Sequence + } + } + + // Mark all unconfirmed txs with a sequence less than the max confirmed sequence as confirmed_missing_receipt + filter = func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + if tx.Sequence == nil { + return false + } + if tx.State != TxUnconfirmed { + return false + } + + return (*tx.Sequence).Int64() < maxConfirmedSequence.Int64() + } + states = []txmgrtypes.TxState{TxUnconfirmed} + txs = as.FetchTxs(states, filter) + for _, tx := range txs { + attempt := tx.TxAttempts[0] + + if err := as.MoveUnconfirmedToConfirmedMissingReceipt(attempt, *tx.BroadcastAt); err != nil { + err = fmt.Errorf("mark_all_confirmed_missing_receipt: address: %s: %w", as.fromAddress, err) + errsLock.Lock() + errs = errors.Join(errs, err) + errsLock.Unlock() + } + } + wg.Done() + }(as) + } + wg.Wait() + + return errs +} +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MarkOldTxesMissingReceiptAsErrored(ctx context.Context, blockNum int64, finalityDepth uint32, chainID CHAIN_ID) error { + if ms.chainID.String() != chainID.String() { + return fmt.Errorf("mark_old_txes_missing_receipt_as_errored: %w", ErrInvalidChainID) + } + + // Persist to persistent storage + if err := ms.txStore.MarkOldTxesMissingReceiptAsErrored(ctx, blockNum, finalityDepth, chainID); err != nil { + return err + } + + // Update in memory store + type result struct { + ID int64 + Sequence SEQ + FromAddress ADDR + MaxBroadcastBeforeBlockNum int64 + TxHashes []TX_HASH + } + var resultsLock sync.Mutex + var results []result + wg := sync.WaitGroup{} + errsLock := sync.Mutex{} + var errs error + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + for _, as := range ms.addressStates { + wg.Add(1) + go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) { + filter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + if tx.TxAttempts == nil || len(tx.TxAttempts) == 0 { + return false + } + if tx.State != TxConfirmedMissingReceipt { + return false + } + attempt := tx.TxAttempts[0] + if attempt.BroadcastBeforeBlockNum == nil { + return false + } + + return *attempt.BroadcastBeforeBlockNum < blockNum-int64(finalityDepth) + } + states := []txmgrtypes.TxState{TxConfirmedMissingReceipt} + txs := as.FetchTxs(states, filter) + for _, tx := range txs { + if err := as.MoveConfirmedMissingReceiptToFatalError(tx, null.StringFrom(ErrCouldNotGetReceipt.Error())); err != nil { + err = fmt.Errorf("mark_old_txes_missing_receipt_as_errored: address: %s: %w", as.fromAddress, err) + errsLock.Lock() + errs = errors.Join(errs, err) + errsLock.Unlock() + continue + } + hashes := make([]TX_HASH, len(tx.TxAttempts)) + maxBroadcastBeforeBlockNum := int64(0) + for i, attempt := range tx.TxAttempts { + hashes[i] = attempt.Hash + if attempt.BroadcastBeforeBlockNum != nil { + if *attempt.BroadcastBeforeBlockNum > maxBroadcastBeforeBlockNum { + maxBroadcastBeforeBlockNum = *attempt.BroadcastBeforeBlockNum + } + } + } + rr := result{ + ID: tx.ID, + Sequence: *tx.Sequence, + FromAddress: tx.FromAddress, + MaxBroadcastBeforeBlockNum: maxBroadcastBeforeBlockNum, + TxHashes: hashes, + } + resultsLock.Lock() + results = append(results, rr) + resultsLock.Unlock() + } + wg.Done() + }(as) + } + wg.Wait() + + for _, r := range results { + ms.lggr.Criticalw(fmt.Sprintf("eth_tx with ID %v expired without ever getting a receipt for any of our attempts. "+ + "Current block height is %v, transaction was broadcast before block height %v. This transaction may not have not been sent and will be marked as fatally errored. "+ + "This can happen if there is another instance of chainlink running that is using the same private key, or if "+ + "an external wallet has been used to send a transaction from account %s with nonce %v."+ + " Please note that Chainlink requires exclusive ownership of it's private keys and sharing keys across multiple"+ + " chainlink instances, or using the chainlink keys with an external wallet is NOT SUPPORTED and WILL lead to missed transactions", + r.ID, blockNum, r.MaxBroadcastBeforeBlockNum, r.FromAddress, r.Sequence), "ethTxID", r.ID, "sequence", r.Sequence, "fromAddress", r.FromAddress, "txHashes", r.TxHashes) + } + + return errs +} + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deepCopyTx( + tx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], +) *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { + copyTx := txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{ + ID: tx.ID, + IdempotencyKey: tx.IdempotencyKey, + Sequence: tx.Sequence, + FromAddress: tx.FromAddress, + ToAddress: tx.ToAddress, + EncodedPayload: make([]byte, len(tx.EncodedPayload)), + Value: *new(big.Int).Set(&tx.Value), + FeeLimit: tx.FeeLimit, + Error: tx.Error, + BroadcastAt: tx.BroadcastAt, + InitialBroadcastAt: tx.InitialBroadcastAt, + CreatedAt: tx.CreatedAt, + State: tx.State, + TxAttempts: make([]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], len(tx.TxAttempts)), + Meta: tx.Meta, + Subject: tx.Subject, + ChainID: tx.ChainID, + PipelineTaskRunID: tx.PipelineTaskRunID, + MinConfirmations: tx.MinConfirmations, + TransmitChecker: tx.TransmitChecker, + SignalCallback: tx.SignalCallback, + CallbackCompleted: tx.CallbackCompleted, + } + + // Copy the EncodedPayload + copy(copyTx.EncodedPayload, tx.EncodedPayload) + + // Copy the TxAttempts + for i, attempt := range tx.TxAttempts { + copyTx.TxAttempts[i] = ms.deepCopyTxAttempt(copyTx, attempt) + } + + return ©Tx +} + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deepCopyTxAttempt( + tx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], +) txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { + copyAttempt := txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{ + ID: attempt.ID, + TxID: attempt.TxID, + Tx: tx, + TxFee: attempt.TxFee, + ChainSpecificFeeLimit: attempt.ChainSpecificFeeLimit, + SignedRawTx: make([]byte, len(attempt.SignedRawTx)), + Hash: attempt.Hash, + CreatedAt: attempt.CreatedAt, + BroadcastBeforeBlockNum: attempt.BroadcastBeforeBlockNum, + State: attempt.State, + Receipts: make([]txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], len(attempt.Receipts)), + TxType: attempt.TxType, + } + + copy(copyAttempt.SignedRawTx, attempt.SignedRawTx) + copy(copyAttempt.Receipts, attempt.Receipts) + + return copyAttempt +} diff --git a/common/txmgr/tx_priority_queue.go b/common/txmgr/tx_priority_queue.go new file mode 100644 index 00000000000..aeb07969a70 --- /dev/null +++ b/common/txmgr/tx_priority_queue.go @@ -0,0 +1,133 @@ +package txmgr + +import ( + "container/heap" + "sync" + + feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types" + txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" + "github.com/smartcontractkit/chainlink/v2/common/types" +) + +// TODO: HIDE THE HEAP INTERFACE FROM THE USER + +// TxPriorityQueue is a priority queue of transactions prioritized by creation time. The oldest transaction is at the front of the queue. +type TxPriorityQueue[ + CHAIN_ID types.ID, + ADDR, TX_HASH, BLOCK_HASH types.Hashable, + R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], + SEQ types.Sequence, + FEE feetypes.Fee, +] struct { + sync.RWMutex + txs []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + idToIndex map[int64]int +} + +// NewTxPriorityQueue returns a new TxPriorityQueue instance +func NewTxPriorityQueue[ + CHAIN_ID types.ID, + ADDR, TX_HASH, BLOCK_HASH types.Hashable, + R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], + SEQ types.Sequence, + FEE feetypes.Fee, +](maxUnstarted int) *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { + pq := TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{ + txs: make([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], 0, maxUnstarted), + idToIndex: make(map[int64]int), + } + + return &pq +} + +// AddTx adds a transaction to the queue +func (pq *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) AddTx(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { + pq.Lock() + defer pq.Unlock() + + heap.Push(pq, tx) +} + +// RemoveNextTx removes the next transaction to be processed from the queue +func (pq *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RemoveNextTx() *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { + pq.Lock() + defer pq.Unlock() + + return heap.Pop(pq).(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) +} + +// RemoveTxByID removes the transaction with the given ID from the queue +func (pq *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RemoveTxByID(id int64) *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { + pq.Lock() + defer pq.Unlock() + + if i, ok := pq.idToIndex[id]; ok { + return heap.Remove(pq, i).(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) + } + + return nil +} + +// PruneByTxIDs removes the transactions with the given IDs from the queue +func (pq *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PruneByTxIDs(ids []int64) []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { + pq.Lock() + defer pq.Unlock() + + removed := []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} + for _, id := range ids { + if tx := pq.RemoveTxByID(id); tx != nil { + removed = append(removed, *tx) + } + } + + return removed +} + +// PeekNextTx returns the next transaction to be processed without removing it from the queue +func (pq *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PeekNextTx() *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { + pq.Lock() + defer pq.Unlock() + + if len(pq.txs) == 0 { + return nil + } + return pq.txs[0] +} + +// Close clears the queue +func (pq *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Close() { + pq.Lock() + defer pq.Unlock() + + clear(pq.txs) +} + +// Cap returns the capacity of the queue +func (pq *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Cap() int { + return cap(pq.txs) +} + +// Len, Less, Swap, Push, and Pop methods implement the heap interface +func (pq *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Len() int { + return len(pq.txs) +} +func (pq *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Less(i, j int) bool { + // We want Pop to give us the oldest, not newest, transaction based on creation time + return pq.txs[i].CreatedAt.Before(pq.txs[j].CreatedAt) +} +func (pq *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Swap(i, j int) { + pq.txs[i], pq.txs[j] = pq.txs[j], pq.txs[i] + pq.idToIndex[pq.txs[i].ID] = j + pq.idToIndex[pq.txs[j].ID] = i +} +func (pq *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Push(tx any) { + pq.txs = append(pq.txs, tx.(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE])) +} +func (pq *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Pop() any { + old := pq.txs + n := len(old) + tx := old[n-1] + old[n-1] = nil // avoid memory leak + pq.txs = old[0 : n-1] + return tx +} diff --git a/common/txmgr/types/mocks/tx_store.go b/common/txmgr/types/mocks/tx_store.go index 353f398316d..368ded723b1 100644 --- a/common/txmgr/types/mocks/tx_store.go +++ b/common/txmgr/types/mocks/tx_store.go @@ -43,6 +43,36 @@ func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Abandon(ctx return r0 } +// AllTransactions provides a mock function with given fields: ctx, fromAddress, chainID +func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) AllTransactions(ctx context.Context, fromAddress ADDR, chainID CHAIN_ID) ([]txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + ret := _m.Called(ctx, fromAddress, chainID) + + if len(ret) == 0 { + panic("no return value specified for AllTransactions") + } + + var r0 []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, ADDR, CHAIN_ID) ([]txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error)); ok { + return rf(ctx, fromAddress, chainID) + } + if rf, ok := ret.Get(0).(func(context.Context, ADDR, CHAIN_ID) []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]); ok { + r0 = rf(ctx, fromAddress, chainID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, ADDR, CHAIN_ID) error); ok { + r1 = rf(ctx, fromAddress, chainID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // CheckTxQueueCapacity provides a mock function with given fields: ctx, fromAddress, maxQueuedTransactions, chainID func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CheckTxQueueCapacity(ctx context.Context, fromAddress ADDR, maxQueuedTransactions uint64, chainID CHAIN_ID) error { ret := _m.Called(ctx, fromAddress, maxQueuedTransactions, chainID) diff --git a/common/txmgr/types/tx_store.go b/common/txmgr/types/tx_store.go index 742a1740033..06560ba42c6 100644 --- a/common/txmgr/types/tx_store.go +++ b/common/txmgr/types/tx_store.go @@ -34,6 +34,7 @@ type TxStore[ UnstartedTxQueuePruner TxHistoryReaper[CHAIN_ID] TransactionStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, SEQ, FEE] + InMemoryInitializer[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] // Find confirmed txes beyond the minConfirmations param that require callback but have not yet been signaled FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID CHAIN_ID) (receiptsPlus []ReceiptPlus[R], err error) @@ -55,6 +56,19 @@ type TxStore[ FindTxesWithAttemptsAndReceiptsByIdsAndState(ctx context.Context, ids []big.Int, states []TxState, chainID *big.Int) (tx []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) } +// InMemoryInitializer encapsulates the methods that are used by the txmgr to initialize the in memory tx store. +type InMemoryInitializer[ + ADDR types.Hashable, + CHAIN_ID types.ID, + TX_HASH types.Hashable, + BLOCK_HASH types.Hashable, + R ChainReceipt[TX_HASH, BLOCK_HASH], + SEQ types.Sequence, + FEE feetypes.Fee, +] interface { + AllTransactions(ctx context.Context, fromAddress ADDR, chainID CHAIN_ID) ([]Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) +} + // TransactionStore contains the persistence layer methods needed to manage Txs and TxAttempts type TransactionStore[ ADDR types.Hashable, diff --git a/core/chains/evm/txmgr/evm_tx_store.go b/core/chains/evm/txmgr/evm_tx_store.go index ae986acee27..7dadc45e840 100644 --- a/core/chains/evm/txmgr/evm_tx_store.go +++ b/core/chains/evm/txmgr/evm_tx_store.go @@ -48,6 +48,7 @@ type EvmTxStore interface { // redeclare TxStore for mockery txmgrtypes.TxStore[common.Address, *big.Int, common.Hash, common.Hash, *evmtypes.Receipt, evmtypes.Nonce, gas.EvmFee] TxStoreWebApi + TxStoreInMemory } // TxStoreWebApi encapsulates the methods that are not used by the txmgr and only used by the various web controllers and readers @@ -61,6 +62,11 @@ type TxStoreWebApi interface { FindTxWithAttempts(etxID int64) (etx Tx, err error) } +// TxStoreInMemory encapsulates the methods that are used by the txmgr to initialize the in memory tx store. +type TxStoreInMemory interface { + AllTransactions(ctx context.Context, fromAddress common.Address, chainID *big.Int) (txs []Tx, err error) +} + type TestEvmTxStore interface { EvmTxStore @@ -465,6 +471,22 @@ func (o *evmTxStore) TransactionsWithAttempts(offset, limit int) (txs []Tx, coun return } +// AllTransactions returns all eth transactions +func (o *evmTxStore) AllTransactions(ctx context.Context, fromAddress common.Address, chainID *big.Int) (txs []Tx, err error) { + var cancel context.CancelFunc + ctx, cancel = o.mergeContexts(ctx) + defer cancel() + qq := o.q.WithOpts(pg.WithParentCtx(ctx)) + var dbEtxs []DbEthTx + sql := `SELECT * FROM evm.txes WHERE from_address = $1 AND evm_chain_id = $2 ORDER BY id desc` + if err = qq.Select(&dbEtxs, sql, fromAddress, chainID.String()); err != nil { + return + } + txs = dbEthTxsToEvmEthTxs(dbEtxs) + err = o.preloadTxAttempts(txs) + return +} + // TxAttempts returns the last tx attempts sorted by created_at descending. func (o *evmTxStore) TxAttempts(offset, limit int) (txs []TxAttempt, count int, err error) { sql := `SELECT count(*) FROM evm.tx_attempts` @@ -1018,8 +1040,11 @@ func (o *evmTxStore) FindLatestSequence(ctx context.Context, fromAddress common. ctx, cancel = o.mergeContexts(ctx) defer cancel() qq := o.q.WithOpts(pg.WithParentCtx(ctx)) - sql := `SELECT nonce FROM evm.txes WHERE from_address = $1 AND evm_chain_id = $2 AND nonce IS NOT NULL ORDER BY nonce DESC LIMIT 1` - err = qq.Get(&nonce, sql, fromAddress, chainId.String()) + stmt := `SELECT nonce FROM evm.txes WHERE from_address = $1 AND evm_chain_id = $2 AND nonce IS NOT NULL ORDER BY nonce DESC LIMIT 1` + err = qq.Get(&nonce, stmt, fromAddress, chainId.String()) + if errors.Is(err, sql.ErrNoRows) { + return nonce, txmgr.ErrSequenceNotFound + } return } @@ -1552,8 +1577,14 @@ func (o *evmTxStore) FindNextUnstartedTransactionFromAddress(ctx context.Context qq := o.q.WithOpts(pg.WithParentCtx(ctx)) var dbEtx DbEthTx err := qq.Get(&dbEtx, `SELECT * FROM evm.txes WHERE from_address = $1 AND state = 'unstarted' AND evm_chain_id = $2 ORDER BY value ASC, created_at ASC, id ASC`, fromAddress, chainID.String()) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return sql.ErrNoRows + } + return pkgerrors.Wrap(err, "failed to FindNextUnstartedTransactionFromAddress") + } dbEtx.ToTx(etx) - return pkgerrors.Wrap(err, "failed to FindNextUnstartedTransactionFromAddress") + return nil } func (o *evmTxStore) UpdateTxFatalError(ctx context.Context, etx *Tx) error { diff --git a/core/chains/evm/txmgr/mocks/evm_tx_store.go b/core/chains/evm/txmgr/mocks/evm_tx_store.go index 9690bf9728d..06de8e0d0cd 100644 --- a/core/chains/evm/txmgr/mocks/evm_tx_store.go +++ b/core/chains/evm/txmgr/mocks/evm_tx_store.go @@ -46,6 +46,36 @@ func (_m *EvmTxStore) Abandon(ctx context.Context, id *big.Int, addr common.Addr return r0 } +// AllTransactions provides a mock function with given fields: ctx, fromAddress, chainID +func (_m *EvmTxStore) AllTransactions(ctx context.Context, fromAddress common.Address, chainID *big.Int) ([]types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee], error) { + ret := _m.Called(ctx, fromAddress, chainID) + + if len(ret) == 0 { + panic("no return value specified for AllTransactions") + } + + var r0 []types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee] + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, common.Address, *big.Int) ([]types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee], error)); ok { + return rf(ctx, fromAddress, chainID) + } + if rf, ok := ret.Get(0).(func(context.Context, common.Address, *big.Int) []types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee]); ok { + r0 = rf(ctx, fromAddress, chainID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee]) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, common.Address, *big.Int) error); ok { + r1 = rf(ctx, fromAddress, chainID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // CheckTxQueueCapacity provides a mock function with given fields: ctx, fromAddress, maxQueuedTransactions, chainID func (_m *EvmTxStore) CheckTxQueueCapacity(ctx context.Context, fromAddress common.Address, maxQueuedTransactions uint64, chainID *big.Int) error { ret := _m.Called(ctx, fromAddress, maxQueuedTransactions, chainID)