Skip to content
This repository has been archived by the owner on Jun 9, 2024. It is now read-only.

Commit

Permalink
fix(evict): Switch to LRU to prevent removed transactions from being …
Browse files Browse the repository at this point in the history
…re-gossiped from a peer instantly. (#1472)

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **Refactor**
	- Improved transaction handling efficiency in the network.
- **Performance Improvements**
	- Enhanced cache management for better performance.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
  • Loading branch information
itsdevbear authored Jan 31, 2024
1 parent 6c003aa commit a5b91d0
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 47 deletions.
1 change: 0 additions & 1 deletion cosmos/runtime/txpool/ante.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ func (m *Mempool) AnteHandle(
if shouldEject := m.shouldEjectFromCometMempool(
ctx.BlockTime().Unix(), ethTx,
); shouldEject {
m.crc.DropRemoteTx(ethTx.Hash())
telemetry.IncrCounter(float32(1), MetricKeyAnteEjectedTxs)
return ctx, errors.New("eject from comet mempool")
}
Expand Down
39 changes: 15 additions & 24 deletions cosmos/runtime/txpool/comet.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,59 +21,50 @@
package txpool

import (
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/lru"
)

const (
defaultCacheSize = 4096
defaultCacheSize = 100000
)

// CometRemoteCache is used to mark which txs are added to our Polaris node remotely from
// Comet CheckTX and when.
type CometRemoteCache interface {
IsRemoteTx(txHash common.Hash) bool
MarkRemoteSeen(txHash common.Hash)
MarkRemoteSeen(txHash common.Hash) bool
TimeFirstSeen(txHash common.Hash) int64 // Unix timestamp
DropRemoteTx(txHash common.Hash)
}

// Thread-safe implementation of CometRemoteCache.
type cometRemoteCache struct {
timeInserted map[common.Hash]int64
timeInsertedMu sync.RWMutex
timeInserted *lru.Cache[common.Hash, int64]
}

func newCometRemoteCache() *cometRemoteCache {
return &cometRemoteCache{
timeInserted: make(map[common.Hash]int64, defaultCacheSize),

timeInserted: lru.NewCache[common.Hash, int64](defaultCacheSize),
}
}

func (crc *cometRemoteCache) IsRemoteTx(txHash common.Hash) bool {
crc.timeInsertedMu.RLock()
defer crc.timeInsertedMu.RUnlock()
_, ok := crc.timeInserted[txHash]
return ok
return crc.timeInserted.Contains(txHash)
}

// Record the time the tx was inserted from Comet successfully.
func (crc *cometRemoteCache) MarkRemoteSeen(txHash common.Hash) {
crc.timeInsertedMu.Lock()
crc.timeInserted[txHash] = time.Now().Unix()
crc.timeInsertedMu.Unlock()
func (crc *cometRemoteCache) MarkRemoteSeen(txHash common.Hash) bool {
if !crc.timeInserted.Contains(txHash) {
crc.timeInserted.Add(txHash, time.Now().Unix())
return true
}
return false
}

func (crc *cometRemoteCache) TimeFirstSeen(txHash common.Hash) int64 {
crc.timeInsertedMu.RLock()
defer crc.timeInsertedMu.RUnlock()
return crc.timeInserted[txHash]
}

func (crc *cometRemoteCache) DropRemoteTx(txHash common.Hash) {
crc.timeInsertedMu.Lock()
delete(crc.timeInserted, txHash)
crc.timeInsertedMu.Unlock()
i, _ := crc.timeInserted.Get(txHash)
return i
}
24 changes: 2 additions & 22 deletions cosmos/runtime/txpool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (m *Mempool) Insert(ctx context.Context, sdkTx sdk.Tx) error {
}

// Add the eth tx to the remote cache.
m.crc.MarkRemoteSeen(ethTx.Hash())
_ = m.crc.MarkRemoteSeen(ethTx.Hash())

return nil
}
Expand All @@ -157,26 +157,6 @@ func (m *Mempool) Select(context.Context, [][]byte) mempool.Iterator {
}

// Remove is an intentional no-op as the eth txpool handles removals.
func (m *Mempool) Remove(tx sdk.Tx) error {
// Get the Eth payload envelope from the Cosmos transaction.
msgs := tx.GetMsgs()
if len(msgs) == 1 {
env, ok := utils.GetAs[*types.WrappedPayloadEnvelope](msgs[0])
if !ok {
return nil
}

// Unwrap the payload to unpack the individual eth transactions to remove from the txpool.
for _, txBz := range env.UnwrapPayload().ExecutionPayload.Transactions {
ethTx := new(ethtypes.Transaction)
if err := ethTx.UnmarshalBinary(txBz); err != nil {
continue
}
txHash := ethTx.Hash()

// Remove the eth tx from comet seen tx cache.
m.crc.DropRemoteTx(txHash)
}
}
func (m *Mempool) Remove(_ sdk.Tx) error {
return nil
}

0 comments on commit a5b91d0

Please sign in to comment.