Skip to content

Commit

Permalink
Add nonce validation after broadcast for Hedera (#13957)
Browse files Browse the repository at this point in the history
* Added post-broadcast nonce validation for Hedera

* Added changeset

* Updated chaintype docs for Hedera

* Fixed lint errors

* Added condition to handle on-chain seq less than tx seq and updated comment
  • Loading branch information
amit-momin authored Aug 1, 2024
1 parent 610a516 commit 20dbba8
Show file tree
Hide file tree
Showing 12 changed files with 213 additions and 37 deletions.
5 changes: 5 additions & 0 deletions .changeset/friendly-impalas-sniff.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

Added nonce validation immediately after broadcast for Hedera #internal
92 changes: 74 additions & 18 deletions common/txmgr/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ const (
// TransmitCheckTimeout controls the maximum amount of time that will be
// spent on the transmit check.
TransmitCheckTimeout = 2 * time.Second

// maxBroadcastRetries is the number of times a transaction broadcast is retried when the sequence fails to increment on Hedera
maxHederaBroadcastRetries = 3

// hederaChainType is the string representation of the Hedera chain type
// Temporary solution until the Broadcaster is moved to the EVM code base
hederaChainType = "hedera"
)

var (
Expand Down Expand Up @@ -114,6 +121,7 @@ type Broadcaster[
sequenceTracker txmgrtypes.SequenceTracker[ADDR, SEQ]
resumeCallback ResumeCallback
chainID CHAIN_ID
chainType string
config txmgrtypes.BroadcasterChainConfig
feeConfig txmgrtypes.BroadcasterFeeConfig
txConfig txmgrtypes.BroadcasterTransactionsConfig
Expand Down Expand Up @@ -163,6 +171,7 @@ func NewBroadcaster[
lggr logger.Logger,
checkerFactory TransmitCheckerFactory[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
autoSyncSequence bool,
chainType string,
) *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {
lggr = logger.Named(lggr, "Broadcaster")
b := &Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{
Expand All @@ -171,6 +180,7 @@ func NewBroadcaster[
client: client,
TxAttemptBuilder: txAttemptBuilder,
chainID: client.ConfiguredChainID(),
chainType: chainType,
config: config,
feeConfig: feeConfig,
txConfig: txConfig,
Expand Down Expand Up @@ -411,7 +421,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand
return fmt.Errorf("handleAnyInProgressTx failed: %w", err), true
}
if etx != nil {
if err, retryable := eb.handleInProgressTx(ctx, *etx, etx.TxAttempts[0], etx.CreatedAt); err != nil {
if err, retryable := eb.handleInProgressTx(ctx, *etx, etx.TxAttempts[0], etx.CreatedAt, 0); err != nil {
return fmt.Errorf("handleAnyInProgressTx failed: %w", err), retryable
}
}
Expand Down Expand Up @@ -464,12 +474,12 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand
return fmt.Errorf("processUnstartedTxs failed on UpdateTxUnstartedToInProgress: %w", err), true
}

return eb.handleInProgressTx(ctx, *etx, attempt, time.Now())
return eb.handleInProgressTx(ctx, *etx, attempt, time.Now(), 0)
}

// There can be at most one in_progress transaction per address.
// Here we complete the job that we didn't finish last time.
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) handleInProgressTx(ctx context.Context, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time) (error, bool) {
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) handleInProgressTx(ctx context.Context, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time, retryCount int) (error, bool) {
if etx.State != TxInProgress {
return fmt.Errorf("invariant violation: expected transaction %v to be in_progress, it was %s", etx.ID, etx.State), false
}
Expand All @@ -478,6 +488,11 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand
lgr.Infow("Sending transaction", "txAttemptID", attempt.ID, "txHash", attempt.Hash, "meta", etx.Meta, "feeLimit", attempt.ChainSpecificFeeLimit, "callerProvidedFeeLimit", etx.FeeLimit, "attempt", attempt, "etx", etx)
errType, err := eb.client.SendTransactionReturnCode(ctx, etx, attempt, lgr)

// The validation below is only applicable to Hedera because it has instant finality and a unique sequence behavior
if eb.chainType == hederaChainType {
errType, err = eb.validateOnChainSequence(ctx, lgr, errType, err, etx, retryCount)
}

if errType != client.Fatal {
etx.InitialBroadcastAt = &initialBroadcastAt
etx.BroadcastAt = &initialBroadcastAt
Expand Down Expand Up @@ -538,7 +553,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand
eb.sequenceTracker.GenerateNextSequence(etx.FromAddress, *etx.Sequence)
return err, true
case client.Underpriced:
return eb.tryAgainBumpingGas(ctx, lgr, err, etx, attempt, initialBroadcastAt)
return eb.tryAgainBumpingGas(ctx, lgr, err, etx, attempt, initialBroadcastAt, retryCount+1)
case client.InsufficientFunds:
// NOTE: This bails out of the entire cycle and essentially "blocks" on
// any transaction that gets insufficient_funds. This is OK if a
Expand Down Expand Up @@ -600,6 +615,44 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand
}
}

func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) validateOnChainSequence(ctx context.Context, lgr logger.SugaredLogger, errType client.SendTxReturnCode, err error, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], retryCount int) (client.SendTxReturnCode, error) {
// Only check if sequence was incremented if broadcast was successful, otherwise return the existing err type
if errType != client.Successful {
return errType, err
}
// Transaction sequence cannot be nil here since a sequence is required to broadcast
txSeq := *etx.Sequence
// Retrieve the latest mined sequence from on-chain
nextSeqOnChain, err := eb.client.SequenceAt(ctx, etx.FromAddress, nil)
if err != nil {
return errType, err
}

// Check that the transaction count has incremented on-chain to include the broadcasted transaction
// Insufficient transaction fee is a common scenario in which the sequence is not incremented by the chain even though we got a successful response
// If the sequence failed to increment and hasn't reached the max retries, return the Underpriced error to try again with a bumped attempt
if nextSeqOnChain.Int64() == txSeq.Int64() && retryCount < maxHederaBroadcastRetries {
return client.Underpriced, nil
}

// If the transaction reaches the retry limit and fails to get included, mark it as fatally errored
// Some unknown error other than insufficient tx fee could be the cause
if nextSeqOnChain.Int64() == txSeq.Int64() && retryCount >= maxHederaBroadcastRetries {
err := fmt.Errorf("failed to broadcast transaction on %s after %d retries", hederaChainType, retryCount)
lgr.Error(err.Error())
return client.Fatal, err
}

// Belts and braces approach to detect and handle sqeuence gaps if the broadcast is considered successful
if nextSeqOnChain.Int64() < txSeq.Int64() {
err := fmt.Errorf("next expected sequence on-chain (%s) is less than the broadcasted transaction's sequence (%s)", nextSeqOnChain.String(), txSeq.String())
lgr.Criticalw("Sequence gap has been detected and needs to be filled", "error", err)
return client.Fatal, err
}

return client.Successful, nil
}

// Finds next transaction in the queue, assigns a sequence, and moves it to "in_progress" state ready for broadcast.
// Returns nil if no transactions are in queue
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) nextUnstartedTransactionWithSequence(fromAddress ADDR) (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) {
Expand All @@ -622,23 +675,26 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) next
return etx, nil
}

func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) tryAgainBumpingGas(ctx context.Context, lgr logger.Logger, txError error, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time) (err error, retryable bool) {
logger.With(lgr,
"sendError", txError,
"attemptFee", attempt.TxFee,
"maxGasPriceConfig", eb.feeConfig.MaxFeePrice(),
).Errorf("attempt fee %v was rejected by the node for being too low. "+
"Node returned: '%s'. "+
"Will bump and retry. ACTION REQUIRED: This is a configuration error. "+
"Consider increasing FeeEstimator.PriceDefault (current value: %s)",
attempt.TxFee, txError.Error(), eb.feeConfig.FeePriceDefault())
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) tryAgainBumpingGas(ctx context.Context, lgr logger.Logger, txError error, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time, retry int) (err error, retryable bool) {
// This log error is not applicable to Hedera since the action required would not be needed for its gas estimator
if eb.chainType != hederaChainType {
logger.With(lgr,
"sendError", txError,
"attemptFee", attempt.TxFee,
"maxGasPriceConfig", eb.feeConfig.MaxFeePrice(),
).Errorf("attempt fee %v was rejected by the node for being too low. "+
"Node returned: '%s'. "+
"Will bump and retry. ACTION REQUIRED: This is a configuration error. "+
"Consider increasing FeeEstimator.PriceDefault (current value: %s)",
attempt.TxFee, txError.Error(), eb.feeConfig.FeePriceDefault())
}

replacementAttempt, bumpedFee, bumpedFeeLimit, retryable, err := eb.NewBumpTxAttempt(ctx, etx, attempt, nil, lgr)
if err != nil {
return fmt.Errorf("tryAgainBumpFee failed: %w", err), retryable
}

return eb.saveTryAgainAttempt(ctx, lgr, etx, attempt, replacementAttempt, initialBroadcastAt, bumpedFee, bumpedFeeLimit)
return eb.saveTryAgainAttempt(ctx, lgr, etx, attempt, replacementAttempt, initialBroadcastAt, bumpedFee, bumpedFeeLimit, retry)
}

func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) tryAgainWithNewEstimation(ctx context.Context, lgr logger.Logger, txError error, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time) (err error, retryable bool) {
Expand All @@ -655,15 +711,15 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) tryA
lgr.Warnw("L2 rejected transaction due to incorrect fee, re-estimated and will try again",
"etxID", etx.ID, "err", err, "newGasPrice", fee, "newGasLimit", feeLimit)

return eb.saveTryAgainAttempt(ctx, lgr, etx, attempt, replacementAttempt, initialBroadcastAt, fee, feeLimit)
return eb.saveTryAgainAttempt(ctx, lgr, etx, attempt, replacementAttempt, initialBroadcastAt, fee, feeLimit, 0)
}

func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveTryAgainAttempt(ctx context.Context, lgr logger.Logger, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], replacementAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time, newFee FEE, newFeeLimit uint64) (err error, retyrable bool) {
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveTryAgainAttempt(ctx context.Context, lgr logger.Logger, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], replacementAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time, newFee FEE, newFeeLimit uint64, retry int) (err error, retyrable bool) {
if err = eb.txStore.SaveReplacementInProgressAttempt(ctx, attempt, &replacementAttempt); err != nil {
return fmt.Errorf("tryAgainWithNewFee failed: %w", err), true
}
lgr.Debugw("Bumped fee on initial send", "oldFee", attempt.TxFee.String(), "newFee", newFee.String(), "newFeeLimit", newFeeLimit)
return eb.handleInProgressTx(ctx, etx, replacementAttempt, initialBroadcastAt)
return eb.handleInProgressTx(ctx, etx, replacementAttempt, initialBroadcastAt, retry)
}

func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveFatallyErroredTransaction(lgr logger.Logger, etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error {
Expand Down
12 changes: 8 additions & 4 deletions common/txmgr/confirmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,11 +664,15 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) bat
}

if receipt.GetStatus() == 0 {
rpcError, errExtract := ec.client.CallContract(ctx, attempt, receipt.GetBlockNumber())
if errExtract == nil {
l.Warnw("transaction reverted on-chain", "hash", receipt.GetTxHash(), "rpcError", rpcError.String())
if receipt.GetRevertReason() != nil {
l.Warnw("transaction reverted on-chain", "hash", receipt.GetTxHash(), "revertReason", *receipt.GetRevertReason())
} else {
l.Warnw("transaction reverted on-chain unable to extract revert reason", "hash", receipt.GetTxHash(), "err", err)
rpcError, errExtract := ec.client.CallContract(ctx, attempt, receipt.GetBlockNumber())
if errExtract == nil {
l.Warnw("transaction reverted on-chain", "hash", receipt.GetTxHash(), "rpcError", rpcError.String())
} else {
l.Warnw("transaction reverted on-chain unable to extract revert reason", "hash", receipt.GetTxHash(), "err", err)
}
}
// This might increment more than once e.g. in case of re-orgs going back and forth we might re-fetch the same receipt
promRevertedTxCount.WithLabelValues(ec.chainID.String()).Add(1)
Expand Down
1 change: 1 addition & 0 deletions common/txmgr/types/tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,5 @@ type ChainReceipt[TX_HASH, BLOCK_HASH types.Hashable] interface {
GetFeeUsed() uint64
GetTransactionIndex() uint
GetBlockHash() BLOCK_HASH
GetRevertReason() *string
}
10 changes: 5 additions & 5 deletions core/chains/evm/config/chaintype/chaintype.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const (
ChainArbitrum ChainType = "arbitrum"
ChainCelo ChainType = "celo"
ChainGnosis ChainType = "gnosis"
ChainHedera ChainType = "hedera"
ChainKroma ChainType = "kroma"
ChainMetis ChainType = "metis"
ChainOptimismBedrock ChainType = "optimismBedrock"
Expand All @@ -19,7 +20,6 @@ const (
ChainXLayer ChainType = "xlayer"
ChainZkEvm ChainType = "zkevm"
ChainZkSync ChainType = "zksync"
ChainHedera ChainType = "hedera"
)

// IsL2 returns true if this chain is a Layer 2 chain. Notably:
Expand All @@ -36,7 +36,7 @@ func (c ChainType) IsL2() bool {

func (c ChainType) IsValid() bool {
switch c {
case "", ChainArbitrum, ChainCelo, ChainGnosis, ChainKroma, ChainMetis, ChainOptimismBedrock, ChainScroll, ChainWeMix, ChainXLayer, ChainZkEvm, ChainZkSync, ChainHedera:
case "", ChainArbitrum, ChainCelo, ChainGnosis, ChainHedera, ChainKroma, ChainMetis, ChainOptimismBedrock, ChainScroll, ChainWeMix, ChainXLayer, ChainZkEvm, ChainZkSync:
return true
}
return false
Expand All @@ -50,6 +50,8 @@ func ChainTypeFromSlug(slug string) ChainType {
return ChainCelo
case "gnosis":
return ChainGnosis
case "hedera":
return ChainHedera
case "kroma":
return ChainKroma
case "metis":
Expand All @@ -66,8 +68,6 @@ func ChainTypeFromSlug(slug string) ChainType {
return ChainZkEvm
case "zksync":
return ChainZkSync
case "hedera":
return ChainHedera
default:
return ChainType(slug)
}
Expand Down Expand Up @@ -123,6 +123,7 @@ var ErrInvalidChainType = fmt.Errorf("must be one of %s or omitted", strings.Joi
string(ChainArbitrum),
string(ChainCelo),
string(ChainGnosis),
string(ChainHedera),
string(ChainKroma),
string(ChainMetis),
string(ChainOptimismBedrock),
Expand All @@ -131,5 +132,4 @@ var ErrInvalidChainType = fmt.Errorf("must be one of %s or omitted", strings.Joi
string(ChainXLayer),
string(ChainZkEvm),
string(ChainZkSync),
string(ChainHedera),
}, ", "))
Loading

0 comments on commit 20dbba8

Please sign in to comment.