From a5b91d03f2e6bce44a22b13aeb439c68b7c72377 Mon Sep 17 00:00:00 2001 From: Devon Bear Date: Wed, 31 Jan 2024 15:27:39 -0500 Subject: [PATCH] fix(evict): Switch to LRU to prevent removed transactions from being re-gossiped from a peer instantly. (#1472) ## Summary by CodeRabbit - **Refactor** - Improved transaction handling efficiency in the network. - **Performance Improvements** - Enhanced cache management for better performance. --- cosmos/runtime/txpool/ante.go | 1 - cosmos/runtime/txpool/comet.go | 39 ++++++++++++-------------------- cosmos/runtime/txpool/mempool.go | 24 ++------------------ 3 files changed, 17 insertions(+), 47 deletions(-) diff --git a/cosmos/runtime/txpool/ante.go b/cosmos/runtime/txpool/ante.go index 18ea5905c..00215f04b 100644 --- a/cosmos/runtime/txpool/ante.go +++ b/cosmos/runtime/txpool/ante.go @@ -53,7 +53,6 @@ func (m *Mempool) AnteHandle( if shouldEject := m.shouldEjectFromCometMempool( ctx.BlockTime().Unix(), ethTx, ); shouldEject { - m.crc.DropRemoteTx(ethTx.Hash()) telemetry.IncrCounter(float32(1), MetricKeyAnteEjectedTxs) return ctx, errors.New("eject from comet mempool") } diff --git a/cosmos/runtime/txpool/comet.go b/cosmos/runtime/txpool/comet.go index c40809db9..4e5c5c68d 100644 --- a/cosmos/runtime/txpool/comet.go +++ b/cosmos/runtime/txpool/comet.go @@ -21,59 +21,50 @@ package txpool import ( - "sync" "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/lru" ) const ( - defaultCacheSize = 4096 + defaultCacheSize = 100000 ) // CometRemoteCache is used to mark which txs are added to our Polaris node remotely from // Comet CheckTX and when. type CometRemoteCache interface { IsRemoteTx(txHash common.Hash) bool - MarkRemoteSeen(txHash common.Hash) + MarkRemoteSeen(txHash common.Hash) bool TimeFirstSeen(txHash common.Hash) int64 // Unix timestamp - DropRemoteTx(txHash common.Hash) } // Thread-safe implementation of CometRemoteCache. type cometRemoteCache struct { - timeInserted map[common.Hash]int64 - timeInsertedMu sync.RWMutex + timeInserted *lru.Cache[common.Hash, int64] } func newCometRemoteCache() *cometRemoteCache { return &cometRemoteCache{ - timeInserted: make(map[common.Hash]int64, defaultCacheSize), + + timeInserted: lru.NewCache[common.Hash, int64](defaultCacheSize), } } func (crc *cometRemoteCache) IsRemoteTx(txHash common.Hash) bool { - crc.timeInsertedMu.RLock() - defer crc.timeInsertedMu.RUnlock() - _, ok := crc.timeInserted[txHash] - return ok + return crc.timeInserted.Contains(txHash) } // Record the time the tx was inserted from Comet successfully. -func (crc *cometRemoteCache) MarkRemoteSeen(txHash common.Hash) { - crc.timeInsertedMu.Lock() - crc.timeInserted[txHash] = time.Now().Unix() - crc.timeInsertedMu.Unlock() +func (crc *cometRemoteCache) MarkRemoteSeen(txHash common.Hash) bool { + if !crc.timeInserted.Contains(txHash) { + crc.timeInserted.Add(txHash, time.Now().Unix()) + return true + } + return false } func (crc *cometRemoteCache) TimeFirstSeen(txHash common.Hash) int64 { - crc.timeInsertedMu.RLock() - defer crc.timeInsertedMu.RUnlock() - return crc.timeInserted[txHash] -} - -func (crc *cometRemoteCache) DropRemoteTx(txHash common.Hash) { - crc.timeInsertedMu.Lock() - delete(crc.timeInserted, txHash) - crc.timeInsertedMu.Unlock() + i, _ := crc.timeInserted.Get(txHash) + return i } diff --git a/cosmos/runtime/txpool/mempool.go b/cosmos/runtime/txpool/mempool.go index cb63403fe..0d58c8b2a 100644 --- a/cosmos/runtime/txpool/mempool.go +++ b/cosmos/runtime/txpool/mempool.go @@ -140,7 +140,7 @@ func (m *Mempool) Insert(ctx context.Context, sdkTx sdk.Tx) error { } // Add the eth tx to the remote cache. - m.crc.MarkRemoteSeen(ethTx.Hash()) + _ = m.crc.MarkRemoteSeen(ethTx.Hash()) return nil } @@ -157,26 +157,6 @@ func (m *Mempool) Select(context.Context, [][]byte) mempool.Iterator { } // Remove is an intentional no-op as the eth txpool handles removals. -func (m *Mempool) Remove(tx sdk.Tx) error { - // Get the Eth payload envelope from the Cosmos transaction. - msgs := tx.GetMsgs() - if len(msgs) == 1 { - env, ok := utils.GetAs[*types.WrappedPayloadEnvelope](msgs[0]) - if !ok { - return nil - } - - // Unwrap the payload to unpack the individual eth transactions to remove from the txpool. - for _, txBz := range env.UnwrapPayload().ExecutionPayload.Transactions { - ethTx := new(ethtypes.Transaction) - if err := ethTx.UnmarshalBinary(txBz); err != nil { - continue - } - txHash := ethTx.Hash() - - // Remove the eth tx from comet seen tx cache. - m.crc.DropRemoteTx(txHash) - } - } +func (m *Mempool) Remove(_ sdk.Tx) error { return nil }