From 12209c07688ea04b5415654f6bb90ae3539e3919 Mon Sep 17 00:00:00 2001 From: James Walker Date: Tue, 19 Dec 2023 14:29:57 -0500 Subject: [PATCH] implement DeleteInProgressAttempt --- common/txmgr/address_state.go | 25 ++++++++++++----- common/txmgr/inmemory_store.go | 51 ++++++++++++++++++++++++++++++---- 2 files changed, 63 insertions(+), 13 deletions(-) diff --git a/common/txmgr/address_state.go b/common/txmgr/address_state.go index 144346e9223..d2e206dd264 100644 --- a/common/txmgr/address_state.go +++ b/common/txmgr/address_state.go @@ -181,17 +181,28 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) close( clear(as.idempotencyKeyToTx) } -func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UnstartedCount() int { +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CountTransactionsByState(txState txmgrtypes.TxState) int { as.RLock() defer as.RUnlock() - return as.unstarted.Len() -} -func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UnconfirmedCount() int { - as.RLock() - defer as.RUnlock() + switch txState { + case TxUnstarted: + return as.unstarted.Len() + case TxInProgress: + if as.inprogress != nil { + return 1 + } + case TxUnconfirmed: + return len(as.unconfirmed) + case TxConfirmedMissingReceipt: + return len(as.confirmedMissingReceipt) + case TxConfirmed: + return len(as.confirmed) + case TxFatalError: + return len(as.fatalErrored) + } - return len(as.unconfirmed) + return 0 } func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxWithIdempotencyKey(key string) *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index e180de1a2b6..1b98a13a1ab 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -157,7 +157,7 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Check return fmt.Errorf("check_tx_queue_capacity: %w", ErrAddressNotFound) } - count := uint64(as.UnstartedCount()) + count := uint64(as.CountTransactionsByState(TxUnstarted)) if count >= maxQueuedTransactions { return fmt.Errorf("check_tx_queue_capacity: cannot create transaction; too many unstarted transactions in the queue (%v/%v). %s", count, maxQueuedTransactions, label.MaxQueuedTransactionsWarning) } @@ -203,7 +203,7 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Count return 0, fmt.Errorf("count_unstarted_transactions: %w", ErrAddressNotFound) } - return uint32(as.UnconfirmedCount()), nil + return uint32(as.CountTransactionsByState(TxUnconfirmed)), nil } // CountUnstartedTransactions returns the number of unstarted transactions for a given address. @@ -218,7 +218,7 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Count return 0, fmt.Errorf("count_unstarted_transactions: %w", ErrAddressNotFound) } - return uint32(as.UnstartedCount()), nil + return uint32(as.CountTransactionsByState(TxUnstarted)), nil } // UpdateTxUnstartedToInProgress updates a transaction from unstarted to in_progress. @@ -900,12 +900,51 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ReapT return nil } -func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CountTransactionsByState(ctx context.Context, state txmgrtypes.TxState, chainID CHAIN_ID) (uint32, error) { - return ms.txStore.CountTransactionsByState(ctx, state, chainID) +func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CountTransactionsByState(_ context.Context, state txmgrtypes.TxState, chainID CHAIN_ID) (uint32, error) { + if ms.chainID.String() != chainID.String() { + return 0, fmt.Errorf("count_transactions_by_state: %w", ErrInvalidChainID) + } + + var total int + for _, as := range ms.addressStates { + total += as.CountTransactionsByState(state) + } + + return uint32(total), nil } + func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) DeleteInProgressAttempt(ctx context.Context, attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { - return ms.txStore.DeleteInProgressAttempt(ctx, attempt) + if attempt.State != txmgrtypes.TxAttemptInProgress { + return fmt.Errorf("delete_in_progress_attempt: expected attempt to be in_progress") + } + if attempt.ID == 0 { + return fmt.Errorf("delete_in_progress_attempt: expected attempt to have an ID") + } + + // Check if fromaddress enabled + as, ok := ms.addressStates[attempt.Tx.FromAddress] + if !ok { + return fmt.Errorf("delete_in_progress_attempt: %w", ErrAddressNotFound) + } + + // Persist to persistent storage + if err := ms.txStore.DeleteInProgressAttempt(ctx, attempt); err != nil { + return fmt.Errorf("delete_in_progress_attempt: %w", err) + } + + // Update in memory store + filter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + if tx.TxAttempts == nil || len(tx.TxAttempts) == 0 { + return false + } + + return tx.TxAttempts[0].ID == attempt.ID + } + as.DeleteTxs(as.FetchTxs(nil, filter)...) + + return nil } + func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxsRequiringGasBump(ctx context.Context, address ADDR, blockNum, gasBumpThreshold, depth int64, chainID CHAIN_ID) (etxs []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) { return ms.txStore.FindTxsRequiringGasBump(ctx, address, blockNum, gasBumpThreshold, depth, chainID) }