From da30a83a946aa66c0f200de057e2afd9bfa0689b Mon Sep 17 00:00:00 2001 From: James Walker Date: Tue, 19 Dec 2023 16:22:07 -0500 Subject: [PATCH] implement more methods --- common/txmgr/inmemory_store.go | 413 ++++++++++++++++++++++++-- core/chains/evm/txmgr/evm_tx_store.go | 51 +++- 2 files changed, 428 insertions(+), 36 deletions(-) diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index 1b98a13a1ab..d0b17f9a482 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "math/big" + "sort" "sync" "time" @@ -945,44 +946,314 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Delet return nil } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxsRequiringGasBump(ctx context.Context, address ADDR, blockNum, gasBumpThreshold, depth int64, chainID CHAIN_ID) (etxs []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) { - return ms.txStore.FindTxsRequiringGasBump(ctx, address, blockNum, gasBumpThreshold, depth, chainID) -} -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxsRequiringResubmissionDueToInsufficientFunds(ctx context.Context, address ADDR, chainID CHAIN_ID) (etxs []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) { - return ms.txStore.FindTxsRequiringResubmissionDueToInsufficientFunds(ctx, address, chainID) +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxsRequiringResubmissionDueToInsufficientFunds(_ context.Context, address ADDR, chainID CHAIN_ID) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + if ms.chainID.String() != chainID.String() { + return nil, fmt.Errorf("find_txs_requiring_resubmission_due_to_insufficient_funds: %w", ErrInvalidChainID) + } + + as, ok := ms.addressStates[address] + if !ok { + return nil, fmt.Errorf("find_txs_requiring_resubmission_due_to_insufficient_funds: %w", ErrAddressNotFound) + } + + filter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + if tx.TxAttempts == nil || len(tx.TxAttempts) == 0 { + return false + } + attempt := tx.TxAttempts[0] + + return attempt.State == txmgrtypes.TxAttemptInsufficientFunds + } + states := []txmgrtypes.TxState{TxUnconfirmed} + txs := as.FetchTxs(states, filter) + // sort by sequence ASC + sort.Slice(txs, func(i, j int) bool { + return (*txs[i].Sequence).Int64() < (*txs[j].Sequence).Int64() + }) + + etxs := make([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], len(txs)) + for i, tx := range txs { + etxs[i] = &tx + } + + return etxs, nil } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxAttemptsRequiringResend(ctx context.Context, olderThan time.Time, maxInFlightTransactions uint32, chainID CHAIN_ID, address ADDR) (attempts []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) { - return ms.txStore.FindTxAttemptsRequiringResend(ctx, olderThan, maxInFlightTransactions, chainID, address) + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxAttemptsRequiringResend(_ context.Context, olderThan time.Time, maxInFlightTransactions uint32, chainID CHAIN_ID, address ADDR) ([]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + if ms.chainID.String() != chainID.String() { + return nil, fmt.Errorf("find_tx_attempts_requiring_resend: %w", ErrInvalidChainID) + } + + as, ok := ms.addressStates[address] + if !ok { + return nil, fmt.Errorf("find_tx_attempts_requiring_resend: %w", ErrAddressNotFound) + } + + filter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + if tx.TxAttempts == nil || len(tx.TxAttempts) == 0 { + return false + } + attempt := tx.TxAttempts[0] + if attempt.State == txmgrtypes.TxAttemptInProgress { + return false + } + if tx.BroadcastAt.After(olderThan) { + return false + } + + return false + } + states := []txmgrtypes.TxState{TxUnconfirmed, TxConfirmedMissingReceipt} + attempts := as.FetchTxAttempts(states, filter) + // sort by sequence ASC, gas_price DESC, gas_tip_cap DESC + // TODO + + // LIMIT by maxInFlightTransactions + if len(attempts) > int(maxInFlightTransactions) { + attempts = attempts[:maxInFlightTransactions] + } + + return attempts, nil } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxWithSequence(ctx context.Context, fromAddress ADDR, seq SEQ) (etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) { - return ms.txStore.FindTxWithSequence(ctx, fromAddress, seq) + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxWithSequence(_ context.Context, fromAddress ADDR, seq SEQ) (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + as, ok := ms.addressStates[fromAddress] + if !ok { + return nil, fmt.Errorf("find_tx_with_sequence: %w", ErrAddressNotFound) + } + + filter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + if tx.Sequence == nil { + return false + } + + return (*tx.Sequence).String() == seq.String() + } + states := []txmgrtypes.TxState{TxConfirmed, TxConfirmedMissingReceipt, TxUnconfirmed} + txs := as.FetchTxs(states, filter) + if len(txs) == 0 { + return nil, nil + } + + return &txs[0], nil } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTransactionsConfirmedInBlockRange(ctx context.Context, highBlockNumber, lowBlockNumber int64, chainID CHAIN_ID) (etxs []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) { - return ms.txStore.FindTransactionsConfirmedInBlockRange(ctx, highBlockNumber, lowBlockNumber, chainID) + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTransactionsConfirmedInBlockRange(_ context.Context, highBlockNumber, lowBlockNumber int64, chainID CHAIN_ID) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + if ms.chainID.String() != chainID.String() { + return nil, fmt.Errorf("find_transactions_confirmed_in_block_range: %w", ErrInvalidChainID) + } + + filter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + if tx.TxAttempts == nil || len(tx.TxAttempts) == 0 { + return false + } + attempt := tx.TxAttempts[0] + if attempt.State != txmgrtypes.TxAttemptBroadcast { + return false + } + if attempt.Receipts == nil || len(attempt.Receipts) == 0 { + return false + } + if attempt.Receipts[0].GetBlockNumber() == nil { + return false + } + blockNum := attempt.Receipts[0].GetBlockNumber().Int64() + return blockNum >= lowBlockNumber && blockNum <= highBlockNumber + } + states := []txmgrtypes.TxState{TxConfirmed, TxConfirmedMissingReceipt} + txs := []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} + for _, as := range ms.addressStates { + txs = append(txs, as.FetchTxs(states, filter)...) + } + // sort by sequence ASC + sort.Slice(txs, func(i, j int) bool { + return (*txs[i].Sequence).Int64() < (*txs[j].Sequence).Int64() + }) + + etxs := make([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], len(txs)) + for i, tx := range txs { + etxs[i] = &tx + } + + return etxs, nil } func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindEarliestUnconfirmedBroadcastTime(ctx context.Context, chainID CHAIN_ID) (null.Time, error) { - return ms.txStore.FindEarliestUnconfirmedBroadcastTime(ctx, chainID) + if ms.chainID.String() != chainID.String() { + return null.Time{}, fmt.Errorf("find_earliest_unconfirmed_broadcast_time: %w", ErrInvalidChainID) + } + + // TODO(jtw): this is super niave and might be slow + filter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + return tx.InitialBroadcastAt != nil + } + states := []txmgrtypes.TxState{TxUnconfirmed} + txs := []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} + for _, as := range ms.addressStates { + txs = append(txs, as.FetchTxs(states, filter)...) + } + + var minInitialBroadcastAt time.Time + for _, tx := range txs { + if tx.InitialBroadcastAt.Before(minInitialBroadcastAt) { + minInitialBroadcastAt = *tx.InitialBroadcastAt + } + } + + return null.TimeFrom(minInitialBroadcastAt), nil } + func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context, chainID CHAIN_ID) (null.Int, error) { - return ms.txStore.FindEarliestUnconfirmedTxAttemptBlock(ctx, chainID) + if ms.chainID.String() != chainID.String() { + return null.Int{}, fmt.Errorf("find_earliest_unconfirmed_broadcast_time: %w", ErrInvalidChainID) + } + + // TODO(jtw): this is super niave and might be slow + filter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + if tx.TxAttempts == nil || len(tx.TxAttempts) == 0 { + return false + } + attempt := tx.TxAttempts[0] + return attempt.BroadcastBeforeBlockNum != nil + } + states := []txmgrtypes.TxState{TxUnconfirmed} + txs := []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} + for _, as := range ms.addressStates { + txs = append(txs, as.FetchTxs(states, filter)...) + } + + var minBroadcastBeforeBlockNum int64 + for _, tx := range txs { + if *tx.TxAttempts[0].BroadcastBeforeBlockNum < minBroadcastBeforeBlockNum { + minBroadcastBeforeBlockNum = *tx.TxAttempts[0].BroadcastBeforeBlockNum + } + } + + return null.IntFrom(minBroadcastBeforeBlockNum), nil } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetInProgressTxAttempts(ctx context.Context, address ADDR, chainID CHAIN_ID) (attempts []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) { - return ms.txStore.GetInProgressTxAttempts(ctx, address, chainID) + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetInProgressTxAttempts(ctx context.Context, address ADDR, chainID CHAIN_ID) ([]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + if ms.chainID.String() != chainID.String() { + return nil, fmt.Errorf("get_in_progress_tx_attempts: %w", ErrInvalidChainID) + } + + as, ok := ms.addressStates[address] + if !ok { + return nil, fmt.Errorf("get_in_progress_tx_attempts: %w", ErrAddressNotFound) + } + + filter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + if tx.TxAttempts == nil || len(tx.TxAttempts) == 0 { + return false + } + attempt := tx.TxAttempts[0] + return attempt.State == txmgrtypes.TxAttemptInProgress + } + states := []txmgrtypes.TxState{TxConfirmed, TxConfirmedMissingReceipt, TxUnconfirmed} + attempts := as.FetchTxAttempts(states, filter) + + return attempts, nil } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetNonFatalTransactions(ctx context.Context, chainID CHAIN_ID) (txs []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) { - return ms.txStore.GetNonFatalTransactions(ctx, chainID) + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetNonFatalTransactions(ctx context.Context, chainID CHAIN_ID) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + if ms.chainID.String() != chainID.String() { + return nil, fmt.Errorf("get_non_fatal_transactions: %w", ErrInvalidChainID) + } + + // TODO(jtw): this is niave ... it might be better to just use all states excluding fatal + filter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + return tx.State != TxFatalError + } + txs := []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} + for _, as := range ms.addressStates { + txs = append(txs, as.FetchTxs(nil, filter)...) + } + + etxs := make([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], len(txs)) + for i, tx := range txs { + etxs[i] = &tx + } + + return etxs, nil } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetTxByID(ctx context.Context, id int64) (tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) { - return ms.txStore.GetTxByID(ctx, id) + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetTxByID(_ context.Context, id int64) (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + filter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + return tx.ID == id + } + for _, as := range ms.addressStates { + txs := as.FetchTxs(nil, filter, id) + if len(txs) > 0 { + return &txs[0], nil + } + } + + return nil, fmt.Errorf("failed to get tx with id: %v", id) + } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) HasInProgressTransaction(ctx context.Context, account ADDR, chainID CHAIN_ID) (exists bool, err error) { - return ms.txStore.HasInProgressTransaction(ctx, account, chainID) + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) HasInProgressTransaction(_ context.Context, account ADDR, chainID CHAIN_ID) (bool, error) { + if ms.chainID.String() != chainID.String() { + return false, fmt.Errorf("has_in_progress_transaction: %w", ErrInvalidChainID) + } + + as, ok := ms.addressStates[account] + if !ok { + return false, fmt.Errorf("has_in_progress_transaction: %w", ErrAddressNotFound) + } + + n := as.CountTransactionsByState(TxInProgress) + + return n > 0, nil } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) LoadTxAttempts(ctx context.Context, etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { - return ms.txStore.LoadTxAttempts(ctx, etx) + +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) LoadTxAttempts(_ context.Context, etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { + as, ok := ms.addressStates[etx.FromAddress] + if !ok { + return fmt.Errorf("load_tx_attempts: %w", ErrAddressNotFound) + } + + filter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + return tx.ID == etx.ID + } + txAttempts := []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} + for _, tx := range as.FetchTxs(nil, filter, etx.ID) { + txAttempts = append(txAttempts, tx.TxAttempts...) + } + etx.TxAttempts = txAttempts + + return nil } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MarkAllConfirmedMissingReceipt(ctx context.Context, chainID CHAIN_ID) (err error) { - return ms.txStore.MarkAllConfirmedMissingReceipt(ctx, chainID) +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MarkAllConfirmedMissingReceipt(ctx context.Context, chainID CHAIN_ID) error { + if ms.chainID.String() != chainID.String() { + return fmt.Errorf("mark_all_confirmed_missing_receipt: %w", ErrInvalidChainID) + } + + // TODO(jtw): need to complete + + /* + // Persist to persistent storage + if err := ms.txStore.MarkAllConfirmedMissingReceipt(ctx, chainID); err != nil { + return err + } + + // Update in memory store + fn := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { + if tx.State != TxUnconfirmed { + return + } + if tx.Sequence >= maxSequence { + return + } + + tx.State = TxConfirmedMissingReceipt + } + states := []txmgrtypes.TxState{TxUnconfirmed} + for _, as := range ms.addressStates { + as.ApplyToTxs(states, fn) + } + */ + + return nil } func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MarkOldTxesMissingReceiptAsErrored(ctx context.Context, blockNum int64, finalityDepth uint32, chainID CHAIN_ID) error { return ms.txStore.MarkOldTxesMissingReceiptAsErrored(ctx, blockNum, finalityDepth, chainID) @@ -994,21 +1265,105 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveC return ms.txStore.SaveConfirmedMissingReceiptAttempt(ctx, timeout, attempt, broadcastAt) } func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveInProgressAttempt(ctx context.Context, attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { - return ms.txStore.SaveInProgressAttempt(ctx, attempt) + _, ok := ms.addressStates[attempt.Tx.FromAddress] + if !ok { + return fmt.Errorf("save_in_progress_attempt: %w", ErrAddressNotFound) + } + if attempt.State != txmgrtypes.TxAttemptInProgress { + return fmt.Errorf("SaveInProgressAttempt failed: attempt state must be in_progress") + } + + // Persist to persistent storage + if err := ms.txStore.SaveInProgressAttempt(ctx, attempt); err != nil { + return err + } + + // Update in memory store + // TODO + + return nil } func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveInsufficientFundsAttempt(ctx context.Context, timeout time.Duration, attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { - return ms.txStore.SaveInsufficientFundsAttempt(ctx, timeout, attempt, broadcastAt) + as, ok := ms.addressStates[attempt.Tx.FromAddress] + if !ok { + return fmt.Errorf("save_insufficient_funds_attempt: %w", ErrAddressNotFound) + } + if !(attempt.State == txmgrtypes.TxAttemptInProgress || attempt.State == txmgrtypes.TxAttemptInsufficientFunds) { + return fmt.Errorf("expected state to be in_progress or insufficient_funds") + } + + // Persist to persistent storage + if err := ms.txStore.SaveInsufficientFundsAttempt(ctx, timeout, attempt, broadcastAt); err != nil { + return err + } + + // Update in memory store + fn := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { + if tx.ID != attempt.TxID { + return + } + if tx.TxAttempts == nil || len(tx.TxAttempts) == 0 { + return + } + if tx.BroadcastAt.Before(broadcastAt) { + tx.BroadcastAt = &broadcastAt + } + + tx.TxAttempts[0].State = txmgrtypes.TxAttemptInsufficientFunds + } + as.ApplyToTxs(nil, fn, attempt.TxID) + + return nil } func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveSentAttempt(ctx context.Context, timeout time.Duration, attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { - return ms.txStore.SaveSentAttempt(ctx, timeout, attempt, broadcastAt) + as, ok := ms.addressStates[attempt.Tx.FromAddress] + if !ok { + return fmt.Errorf("save_sent_attempt: %w", ErrAddressNotFound) + } + + if attempt.State != txmgrtypes.TxAttemptInProgress { + return fmt.Errorf("expected state to be in_progress") + } + + // Persist to persistent storage + if err := ms.txStore.SaveSentAttempt(ctx, timeout, attempt, broadcastAt); err != nil { + return err + } + + // Update in memory store + fn := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { + if tx.ID != attempt.TxID { + return + } + if tx.TxAttempts == nil || len(tx.TxAttempts) == 0 { + return + } + if tx.BroadcastAt.Before(broadcastAt) { + tx.BroadcastAt = &broadcastAt + } + + tx.TxAttempts[0].State = txmgrtypes.TxAttemptBroadcast + } + as.ApplyToTxs(nil, fn, attempt.TxID) + + return nil } func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxForRebroadcast(ctx context.Context, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], etxAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { return ms.txStore.UpdateTxForRebroadcast(ctx, etx, etxAttempt) } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) IsTxFinalized(ctx context.Context, blockHeight int64, txID int64, chainID CHAIN_ID) (finalized bool, err error) { +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) IsTxFinalized(ctx context.Context, blockHeight int64, txID int64, chainID CHAIN_ID) (bool, error) { return ms.txStore.IsTxFinalized(ctx, blockHeight, txID, chainID) } +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxsRequiringGasBump(ctx context.Context, address ADDR, blockNum, gasBumpThreshold, depth int64, chainID CHAIN_ID) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + if ms.chainID.String() != chainID.String() { + return nil, fmt.Errorf("find_txs_requiring_gas_bump: %w", ErrInvalidChainID) + } + + // TODO + return nil, nil +} + func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deepCopyTx(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { etx := *tx etx.TxAttempts = make([]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], len(tx.TxAttempts)) diff --git a/core/chains/evm/txmgr/evm_tx_store.go b/core/chains/evm/txmgr/evm_tx_store.go index 452b31349db..152834f3a5b 100644 --- a/core/chains/evm/txmgr/evm_tx_store.go +++ b/core/chains/evm/txmgr/evm_tx_store.go @@ -698,7 +698,11 @@ func (o *evmTxStore) LoadTxesAttempts(etxs []*Tx, qopts ...pg.QOpt) error { ethTxesM[etx.ID] = etxs[i] } var dbTxAttempts []DbEthTxAttempt - if err := qq.Select(&dbTxAttempts, `SELECT * FROM evm.tx_attempts WHERE eth_tx_id = ANY($1) ORDER BY evm.tx_attempts.gas_price DESC, evm.tx_attempts.gas_tip_cap DESC`, pq.Array(ethTxIDs)); err != nil { + if err := qq.Select(&dbTxAttempts, ` + SELECT * + FROM evm.tx_attempts + WHERE eth_tx_id = ANY($1) + ORDER BY evm.tx_attempts.gas_price DESC, evm.tx_attempts.gas_tip_cap DESC`, pq.Array(ethTxIDs)); err != nil { return pkgerrors.Wrap(err, "loadEthTxesAttempts failed to load evm.tx_attempts") } for _, dbAttempt := range dbTxAttempts { @@ -1037,7 +1041,8 @@ func (o *evmTxStore) GetInProgressTxAttempts(ctx context.Context, address common var dbAttempts []DbEthTxAttempt err = tx.Select(&dbAttempts, ` SELECT evm.tx_attempts.* FROM evm.tx_attempts -INNER JOIN evm.txes ON evm.txes.id = evm.tx_attempts.eth_tx_id AND evm.txes.state in ('confirmed', 'confirmed_missing_receipt', 'unconfirmed') +INNER JOIN evm.txes ON evm.txes.id = evm.tx_attempts.eth_tx_id + AND evm.txes.state in ('confirmed', 'confirmed_missing_receipt', 'unconfirmed') WHERE evm.tx_attempts.state = 'in_progress' AND evm.txes.from_address = $1 AND evm.txes.evm_chain_id = $2 `, address, chainID.String()) if err != nil { @@ -1214,7 +1219,11 @@ func (o *evmTxStore) FindEarliestUnconfirmedBroadcastTime(ctx context.Context, c defer cancel() qq := o.q.WithOpts(pg.WithParentCtx(ctx)) err = qq.Transaction(func(tx pg.Queryer) error { - if err = qq.QueryRowContext(ctx, `SELECT min(initial_broadcast_at) FROM evm.txes WHERE state = 'unconfirmed' AND evm_chain_id = $1`, chainID.String()).Scan(&broadcastAt); err != nil { + if err = qq.QueryRowContext(ctx, ` + SELECT + min(initial_broadcast_at) + FROM evm.txes + WHERE state = 'unconfirmed' AND evm_chain_id = $1`, chainID.String()).Scan(&broadcastAt); err != nil { return fmt.Errorf("failed to query for unconfirmed eth_tx count: %w", err) } return nil @@ -1443,9 +1452,30 @@ func (o *evmTxStore) FindTxsRequiringGasBump(ctx context.Context, address common err = qq.Transaction(func(tx pg.Queryer) error { stmt := ` SELECT evm.txes.* FROM evm.txes -LEFT JOIN evm.tx_attempts ON evm.txes.id = evm.tx_attempts.eth_tx_id AND (broadcast_before_block_num > $4 OR broadcast_before_block_num IS NULL OR evm.tx_attempts.state != 'broadcast') -WHERE evm.txes.state = 'unconfirmed' AND evm.tx_attempts.id IS NULL AND evm.txes.from_address = $1 AND evm.txes.evm_chain_id = $2 - AND (($3 = 0) OR (evm.txes.id IN (SELECT id FROM evm.txes WHERE state = 'unconfirmed' AND from_address = $1 ORDER BY nonce ASC LIMIT $3))) +LEFT JOIN evm.tx_attempts ON evm.txes.id = evm.tx_attempts.eth_tx_id + AND ( + broadcast_before_block_num > $4 + OR broadcast_before_block_num IS NULL + OR evm.tx_attempts.state != 'broadcast' + ) +WHERE + evm.txes.state = 'unconfirmed' + AND evm.tx_attempts.id IS NULL + AND evm.txes.from_address = $1 + AND evm.txes.evm_chain_id = $2 + AND ( + ($3 = 0) + OR ( + evm.txes.id IN ( + SELECT id + FROM evm.txes + WHERE + state = 'unconfirmed' + AND from_address = $1 + ORDER BY nonce ASC LIMIT $3 + ) + ) + ) ORDER BY nonce ASC ` var dbEtxs []DbEthTx @@ -1804,7 +1834,14 @@ func (o *evmTxStore) HasInProgressTransaction(ctx context.Context, account commo ctx, cancel = o.mergeContexts(ctx) defer cancel() qq := o.q.WithOpts(pg.WithParentCtx(ctx)) - err = qq.Get(&exists, `SELECT EXISTS(SELECT 1 FROM evm.txes WHERE state = 'in_progress' AND from_address = $1 AND evm_chain_id = $2)`, account, chainID.String()) + err = qq.Get(&exists, ` + SELECT EXISTS( + SELECT 1 + FROM evm.txes + WHERE + state = 'in_progress' + AND from_address = $1 + AND evm_chain_id = $2)`, account, chainID.String()) return exists, pkgerrors.Wrap(err, "hasInProgressTransaction failed") }