Skip to content

Commit

Permalink
implement DeleteInProgressAttempt
Browse files Browse the repository at this point in the history
  • Loading branch information
poopoothegorilla committed Dec 19, 2023
1 parent af5442a commit 12209c0
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 13 deletions.
25 changes: 18 additions & 7 deletions common/txmgr/address_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,17 +181,28 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) close(
clear(as.idempotencyKeyToTx)
}

func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UnstartedCount() int {
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CountTransactionsByState(txState txmgrtypes.TxState) int {
as.RLock()
defer as.RUnlock()

return as.unstarted.Len()
}
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UnconfirmedCount() int {
as.RLock()
defer as.RUnlock()
switch txState {
case TxUnstarted:
return as.unstarted.Len()
case TxInProgress:
if as.inprogress != nil {
return 1
}
case TxUnconfirmed:
return len(as.unconfirmed)
case TxConfirmedMissingReceipt:
return len(as.confirmedMissingReceipt)
case TxConfirmed:
return len(as.confirmed)
case TxFatalError:
return len(as.fatalErrored)
}

return len(as.unconfirmed)
return 0
}

func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxWithIdempotencyKey(key string) *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {
Expand Down
51 changes: 45 additions & 6 deletions common/txmgr/inmemory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Check
return fmt.Errorf("check_tx_queue_capacity: %w", ErrAddressNotFound)
}

count := uint64(as.UnstartedCount())
count := uint64(as.CountTransactionsByState(TxUnstarted))
if count >= maxQueuedTransactions {
return fmt.Errorf("check_tx_queue_capacity: cannot create transaction; too many unstarted transactions in the queue (%v/%v). %s", count, maxQueuedTransactions, label.MaxQueuedTransactionsWarning)
}
Expand Down Expand Up @@ -203,7 +203,7 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Count
return 0, fmt.Errorf("count_unstarted_transactions: %w", ErrAddressNotFound)
}

return uint32(as.UnconfirmedCount()), nil
return uint32(as.CountTransactionsByState(TxUnconfirmed)), nil
}

// CountUnstartedTransactions returns the number of unstarted transactions for a given address.
Expand All @@ -218,7 +218,7 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Count
return 0, fmt.Errorf("count_unstarted_transactions: %w", ErrAddressNotFound)
}

return uint32(as.UnstartedCount()), nil
return uint32(as.CountTransactionsByState(TxUnstarted)), nil
}

// UpdateTxUnstartedToInProgress updates a transaction from unstarted to in_progress.
Expand Down Expand Up @@ -900,12 +900,51 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ReapT

return nil
}
func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CountTransactionsByState(ctx context.Context, state txmgrtypes.TxState, chainID CHAIN_ID) (uint32, error) {
return ms.txStore.CountTransactionsByState(ctx, state, chainID)
func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CountTransactionsByState(_ context.Context, state txmgrtypes.TxState, chainID CHAIN_ID) (uint32, error) {
if ms.chainID.String() != chainID.String() {
return 0, fmt.Errorf("count_transactions_by_state: %w", ErrInvalidChainID)
}

var total int
for _, as := range ms.addressStates {
total += as.CountTransactionsByState(state)
}

return uint32(total), nil
}

func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) DeleteInProgressAttempt(ctx context.Context, attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error {
return ms.txStore.DeleteInProgressAttempt(ctx, attempt)
if attempt.State != txmgrtypes.TxAttemptInProgress {
return fmt.Errorf("delete_in_progress_attempt: expected attempt to be in_progress")
}
if attempt.ID == 0 {
return fmt.Errorf("delete_in_progress_attempt: expected attempt to have an ID")
}

// Check if fromaddress enabled
as, ok := ms.addressStates[attempt.Tx.FromAddress]
if !ok {
return fmt.Errorf("delete_in_progress_attempt: %w", ErrAddressNotFound)
}

// Persist to persistent storage
if err := ms.txStore.DeleteInProgressAttempt(ctx, attempt); err != nil {
return fmt.Errorf("delete_in_progress_attempt: %w", err)
}

// Update in memory store
filter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool {
if tx.TxAttempts == nil || len(tx.TxAttempts) == 0 {
return false
}

return tx.TxAttempts[0].ID == attempt.ID
}
as.DeleteTxs(as.FetchTxs(nil, filter)...)

return nil
}

func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxsRequiringGasBump(ctx context.Context, address ADDR, blockNum, gasBumpThreshold, depth int64, chainID CHAIN_ID) (etxs []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) {
return ms.txStore.FindTxsRequiringGasBump(ctx, address, blockNum, gasBumpThreshold, depth, chainID)
}
Expand Down

0 comments on commit 12209c0

Please sign in to comment.