Skip to content
This repository has been archived by the owner on Jun 9, 2024. It is now read-only.

fix(evict): Switch to LRU to prevent removed transactions from being re-gossiped from a peer instantly. #1472

Merged
merged 3 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion cosmos/runtime/txpool/ante.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ func (m *Mempool) AnteHandle(
if shouldEject := m.shouldEjectFromCometMempool(
ctx.BlockTime().Unix(), ethTx,
); shouldEject {
m.crc.DropRemoteTx(ethTx.Hash())
telemetry.IncrCounter(float32(1), MetricKeyAnteEjectedTxs)
return ctx, errors.New("eject from comet mempool")
}
Expand Down
39 changes: 15 additions & 24 deletions cosmos/runtime/txpool/comet.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,59 +21,50 @@
package txpool

import (
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/lru"
)

const (
defaultCacheSize = 4096
defaultCacheSize = 100000
)

// CometRemoteCache is used to mark which txs are added to our Polaris node remotely from
// Comet CheckTX and when.
type CometRemoteCache interface {
IsRemoteTx(txHash common.Hash) bool
MarkRemoteSeen(txHash common.Hash)
MarkRemoteSeen(txHash common.Hash) bool
TimeFirstSeen(txHash common.Hash) int64 // Unix timestamp
DropRemoteTx(txHash common.Hash)
}

// Thread-safe implementation of CometRemoteCache.
type cometRemoteCache struct {
timeInserted map[common.Hash]int64
timeInsertedMu sync.RWMutex
timeInserted *lru.Cache[common.Hash, int64]
}

func newCometRemoteCache() *cometRemoteCache {
return &cometRemoteCache{
timeInserted: make(map[common.Hash]int64, defaultCacheSize),

timeInserted: lru.NewCache[common.Hash, int64](defaultCacheSize),
}
}

func (crc *cometRemoteCache) IsRemoteTx(txHash common.Hash) bool {
crc.timeInsertedMu.RLock()
defer crc.timeInsertedMu.RUnlock()
_, ok := crc.timeInserted[txHash]
return ok
return crc.timeInserted.Contains(txHash)
}

// Record the time the tx was inserted from Comet successfully.
func (crc *cometRemoteCache) MarkRemoteSeen(txHash common.Hash) {
crc.timeInsertedMu.Lock()
crc.timeInserted[txHash] = time.Now().Unix()
crc.timeInsertedMu.Unlock()
func (crc *cometRemoteCache) MarkRemoteSeen(txHash common.Hash) bool {
if !crc.timeInserted.Contains(txHash) {
crc.timeInserted.Add(txHash, time.Now().Unix())
return true
}
return false
}

func (crc *cometRemoteCache) TimeFirstSeen(txHash common.Hash) int64 {
crc.timeInsertedMu.RLock()
defer crc.timeInsertedMu.RUnlock()
return crc.timeInserted[txHash]
}

func (crc *cometRemoteCache) DropRemoteTx(txHash common.Hash) {
crc.timeInsertedMu.Lock()
delete(crc.timeInserted, txHash)
crc.timeInsertedMu.Unlock()
i, _ := crc.timeInserted.Get(txHash)
return i
}
24 changes: 2 additions & 22 deletions cosmos/runtime/txpool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (m *Mempool) Insert(ctx context.Context, sdkTx sdk.Tx) error {
}

// Add the eth tx to the remote cache.
m.crc.MarkRemoteSeen(ethTx.Hash())
_ = m.crc.MarkRemoteSeen(ethTx.Hash())

return nil
}
Expand All @@ -157,26 +157,6 @@ func (m *Mempool) Select(context.Context, [][]byte) mempool.Iterator {
}

// Remove is an intentional no-op as the eth txpool handles removals.
func (m *Mempool) Remove(tx sdk.Tx) error {
// Get the Eth payload envelope from the Cosmos transaction.
msgs := tx.GetMsgs()
if len(msgs) == 1 {
env, ok := utils.GetAs[*types.WrappedPayloadEnvelope](msgs[0])
if !ok {
return nil
}

// Unwrap the payload to unpack the individual eth transactions to remove from the txpool.
for _, txBz := range env.UnwrapPayload().ExecutionPayload.Transactions {
ethTx := new(ethtypes.Transaction)
if err := ethTx.UnmarshalBinary(txBz); err != nil {
continue
}
txHash := ethTx.Hash()

// Remove the eth tx from comet seen tx cache.
m.crc.DropRemoteTx(txHash)
}
}
func (m *Mempool) Remove(_ sdk.Tx) error {
return nil
}
Loading