Skip to content

Commit

Permalink
refactor func call args
Browse files Browse the repository at this point in the history
  • Loading branch information
huangzhen1997 committed Aug 27, 2024
1 parent aa7f9e2 commit 6f490e3
Showing 1 changed file with 52 additions and 11 deletions.
63 changes: 52 additions & 11 deletions common/txmgr/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,26 @@ var ErrTxRemoved = errors.New("tx removed")

type ProcessUnstartedTxs[ADDR types.Hashable] func(ctx context.Context, fromAddress ADDR) (retryable bool, err error)

type tryAgainAttemptParam[
CHAIN_ID types.ID,
HEAD types.Head[BLOCK_HASH],
ADDR types.Hashable,
TX_HASH types.Hashable,
BLOCK_HASH types.Hashable,
SEQ types.Sequence,
FEE feetypes.Fee,
] struct {
etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
replacementAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
initialBroadcastAt time.Time
newFee FEE
newFeeLimit uint64
retry int
txError error
errType client.SendTxReturnCode
}

// TransmitCheckerFactory creates a transmit checker based on a spec.
type TransmitCheckerFactory[
CHAIN_ID types.ID,
Expand Down Expand Up @@ -558,7 +578,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand
eb.sequenceTracker.GenerateNextSequence(etx.FromAddress, *etx.Sequence)
return err, true
case client.Underpriced:
return eb.tryAgainBumpingGas(ctx, lgr, err, etx, attempt, initialBroadcastAt, retryCount+1)
return eb.tryAgainBumpingGas(ctx, lgr, err, errType, etx, attempt, initialBroadcastAt, retryCount+1)
case client.InsufficientFunds:
// NOTE:
// This can potentially happen during gas spike.
Expand Down Expand Up @@ -679,7 +699,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) next
return etx, nil
}

func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) tryAgainBumpingGas(ctx context.Context, lgr logger.Logger, txError error, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time, retry int) (err error, retryable bool) {
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) tryAgainBumpingGas(ctx context.Context, lgr logger.Logger, txError error, errType client.SendTxReturnCode, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time, retry int) (err error, retryable bool) {
// This log error is not applicable to Hedera since the action required would not be needed for its gas estimator
if eb.chainType != hederaChainType {
logger.With(lgr,
Expand All @@ -697,8 +717,18 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) tryA
if err != nil {
return fmt.Errorf("tryAgainBumpFee failed: %w", err), retryable
}

return eb.saveTryAgainAttempt(ctx, lgr, etx, attempt, replacementAttempt, initialBroadcastAt, bumpedFee, bumpedFeeLimit, retry, txError, client.Underpriced)
params := tryAgainAttemptParam[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{
etx: etx,
attempt: attempt,
replacementAttempt: replacementAttempt,
initialBroadcastAt: initialBroadcastAt,
newFee: bumpedFee,
newFeeLimit: bumpedFeeLimit,
retry: 0,
txError: txError,
errType: errType,
}
return eb.saveTryAgainAttempt(ctx, lgr, params)
}

func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) tryAgainWithNewEstimation(ctx context.Context, lgr logger.Logger, txError error, errType client.SendTxReturnCode, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time) (err error, retryable bool) {
Expand All @@ -709,21 +739,32 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) tryA
lgr.Warnw("L2 rejected transaction due to incorrect fee, re-estimated and will try again",
"etxID", etx.ID, "err", err, "newGasPrice", fee, "newGasLimit", feeLimit)

return eb.saveTryAgainAttempt(ctx, lgr, etx, attempt, replacementAttempt, initialBroadcastAt, fee, feeLimit, 0, txError, errType)
params := tryAgainAttemptParam[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{
etx: etx,
attempt: attempt,
replacementAttempt: replacementAttempt,
initialBroadcastAt: initialBroadcastAt,
newFee: fee,
newFeeLimit: feeLimit,
retry: 0,
txError: txError,
errType: errType,
}
return eb.saveTryAgainAttempt(ctx, lgr, params)
}

func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveTryAgainAttempt(ctx context.Context, lgr logger.Logger, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], replacementAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time, newFee FEE, newFeeLimit uint64, retry int, txError error, errType client.SendTxReturnCode) (err error, retyrable bool) {
if err = eb.txStore.SaveReplacementInProgressAttempt(ctx, attempt, &replacementAttempt); err != nil {
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveTryAgainAttempt(ctx context.Context, lgr logger.Logger, param tryAgainAttemptParam[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) (err error, retyrable bool) {
if err = eb.txStore.SaveReplacementInProgressAttempt(ctx, param.attempt, &param.replacementAttempt); err != nil {
return fmt.Errorf("tryAgainWithNewFee failed: %w", err), true
}
lgr.Debugw("Bumped fee on initial send", "oldFee", attempt.TxFee.String(), "newFee", newFee.String(), "newFeeLimit", newFeeLimit)
lgr.Debugw("Bumped fee on initial send", "oldFee", param.attempt.TxFee.String(), "newFee", param.newFee.String(), "newFeeLimit", param.newFeeLimit)

// this avoids re-estimated insufficient fund tx gets processed immediately, we want to back off the tx when gas spikes
if errType == client.InsufficientFunds {
return txError, true
if param.errType == client.InsufficientFunds {
return param.txError, true
}

return eb.handleInProgressTx(ctx, etx, replacementAttempt, initialBroadcastAt, retry)
return eb.handleInProgressTx(ctx, param.etx, param.replacementAttempt, param.initialBroadcastAt, param.retry)
}

func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveFatallyErroredTransaction(lgr logger.Logger, etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error {
Expand Down

0 comments on commit 6f490e3

Please sign in to comment.