From 31cbbbbb91e637b32e00baa5771710faa03480ca Mon Sep 17 00:00:00 2001 From: andyzhang2023 Date: Fri, 12 Jul 2024 11:14:10 +0800 Subject: [PATCH 1/7] add debug log for txpool txpool pending cache optimize --- core/txpool/legacypool/cache_for_miner.go | 79 +++++++++++++++++++++-- core/txpool/legacypool/legacypool.go | 71 +++++++++----------- miner/worker.go | 2 +- 3 files changed, 107 insertions(+), 45 deletions(-) diff --git a/core/txpool/legacypool/cache_for_miner.go b/core/txpool/legacypool/cache_for_miner.go index 4d1ed2628d..9a2e2b0ba4 100644 --- a/core/txpool/legacypool/cache_for_miner.go +++ b/core/txpool/legacypool/cache_for_miner.go @@ -5,8 +5,10 @@ import ( "sync" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/metrics" + "github.com/holiman/uint256" ) var ( @@ -20,12 +22,18 @@ type cacheForMiner struct { pending map[common.Address]map[*types.Transaction]struct{} locals map[common.Address]bool addrLock sync.Mutex + + allCache map[common.Address][]*txpool.LazyTransaction + filteredCache map[common.Address][]*txpool.LazyTransaction + cacheLock sync.Mutex } func newCacheForMiner() *cacheForMiner { return &cacheForMiner{ - pending: make(map[common.Address]map[*types.Transaction]struct{}), - locals: make(map[common.Address]bool), + pending: make(map[common.Address]map[*types.Transaction]struct{}), + locals: make(map[common.Address]bool), + allCache: make(map[common.Address][]*txpool.LazyTransaction), + filteredCache: make(map[common.Address][]*txpool.LazyTransaction), } } @@ -67,8 +75,9 @@ func (pc *cacheForMiner) del(txs types.Transactions, signer types.Signer) { } } -func (pc *cacheForMiner) dump() map[common.Address]types.Transactions { +func (pc *cacheForMiner) sync2cache(pool txpool.LazyResolver, filter func(txs types.Transactions, addr common.Address) types.Transactions) { pending := make(map[common.Address]types.Transactions) + pc.txLock.Lock() for addr, txlist := range pc.pending { pending[addr] = make(types.Transactions, 0, len(txlist)) @@ -77,11 +86,71 @@ func (pc *cacheForMiner) dump() map[common.Address]types.Transactions { } } pc.txLock.Unlock() - for _, txs := range pending { + + // convert pending to lazyTransactions + filteredLazy := make(map[common.Address][]*txpool.LazyTransaction) + allLazy := make(map[common.Address][]*txpool.LazyTransaction) + for addr, txs := range pending { // sorted by nonce sort.Sort(types.TxByNonce(txs)) + filterd := filter(txs, addr) + if len(txs) > 0 { + lazies := make([]*txpool.LazyTransaction, len(txs)) + for i, tx := range txs { + lazies[i] = &txpool.LazyTransaction{ + Pool: pool, + Hash: tx.Hash(), + Tx: tx, + Time: tx.Time(), + GasFeeCap: uint256.MustFromBig(tx.GasFeeCap()), + GasTipCap: uint256.MustFromBig(tx.GasTipCap()), + Gas: tx.Gas(), + BlobGas: tx.BlobGas(), + } + } + allLazy[addr] = lazies + filteredLazy[addr] = lazies[:len(filterd)] + } } + + pc.cacheLock.Lock() + pc.filteredCache = filteredLazy + pc.allCache = allLazy + pc.cacheLock.Unlock() +} + +func (pc *cacheForMiner) dump(filtered bool) map[common.Address][]*txpool.LazyTransaction { + pc.cacheLock.Lock() + pending := pc.allCache + if filtered { + pending = pc.filteredCache + } + pc.cacheLock.Unlock() return pending + + //pendingLazy := make(map[common.Address][]*txpool.LazyTransaction) + //var txnum = 0 + //for addr, txs := range pending { + // // If the miner requests tip enforcement, cap the lists now + // if enforceTip && !pc.IsLocal(addr) { + // for i, tx := range txs { + // if tx.Tx.EffectiveGasTipIntCmp(gasPrice, baseFee) < 0 { + // txs = txs[:i] + // break + // } + // } + // } + // if len(txs) > 0 { + // lazies := make([]*txpool.LazyTransaction, len(txs)) + // for i, tx := range txs { + // lazies[i] = tx + // txnum++ + // } + // pendingLazy[addr] = lazies + // } + //} + //log.Info("cacheForMiner dump", "duration", time.Since(start), "accounts", len(pending), "txs", txnum) + //return pendingLazy } func (pc *cacheForMiner) markLocal(addr common.Address) { @@ -91,7 +160,7 @@ func (pc *cacheForMiner) markLocal(addr common.Address) { pc.locals[addr] = true } -func (pc *cacheForMiner) isLocal(addr common.Address) bool { +func (pc *cacheForMiner) IsLocal(addr common.Address) bool { pc.addrLock.Lock() defer pc.addrLock.Unlock() return pc.locals[addr] diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index a539253fa3..0e07a1603c 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -349,6 +349,9 @@ func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.A // Set the basic pool parameters pool.gasTip.Store(uint256.NewInt(gasTip)) + // set dumper + pool.pendingCache.sync2cache(pool, pool.createFilter(gasTip, head.BaseFee)) + // Initialize the state with head block, or fallback to empty one in // case the head state is not available (might occur when node is not // fully synced). @@ -383,9 +386,27 @@ func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.A } pool.wg.Add(1) go pool.loop() + go pool.loopOfSync() return nil } +func (pool *LegacyPool) loopOfSync() { + ticker := time.NewTicker(200 * time.Second) + for { + select { + case <-pool.reorgShutdownCh: + return + case <-ticker.C: + gasTip := pool.gasTip.Load() + currHead := pool.currentHead.Load() + if gasTip == nil || currHead == nil { + continue + } + pool.pendingCache.sync2cache(pool, pool.createFilter(gasTip, currHead.BaseFee)) + } + } +} + // loop is the transaction pool's main event loop, waiting for and reacting to // outside blockchain events as well as for various reporting and transaction // eviction events. @@ -628,53 +649,21 @@ func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address] defer func(t0 time.Time) { getPendingDurationTimer.Update(time.Since(t0)) }(time.Now()) - // If only blob transactions are requested, this pool is unsuitable as it - // contains none, don't even bother. - if filter.OnlyBlobTxs { - return nil - } - - // Convert the new uint256.Int types to the old big.Int ones used by the legacy pool - var ( - minTipBig *big.Int - baseFeeBig *big.Int - ) - if filter.MinTip != nil { - minTipBig = filter.MinTip.ToBig() - } - if filter.BaseFee != nil { - baseFeeBig = filter.BaseFee.ToBig() - } - pending := make(map[common.Address][]*txpool.LazyTransaction, len(pool.pending)) - for addr, txs := range pool.pendingCache.dump() { + return pool.pendingCache.dump(enforceTips) +} - // If the miner requests tip enforcement, cap the lists now - if minTipBig != nil && !pool.locals.contains(addr) { +func (pool *LegacyPool) createFilter(gasPrice, baseFee *big.Int) func(txs types.Transactions, addr common.Address) types.Transactions { + return func(txs types.Transactions, addr common.Address) types.Transactions { + if !pool.pendingCache.IsLocal(addr) { for i, tx := range txs { - if tx.EffectiveGasTipIntCmp(minTipBig, baseFeeBig) < 0 { + if tx.EffectiveGasTipIntCmp(gasPrice, baseFee) < 0 { txs = txs[:i] break } } } - if len(txs) > 0 { - lazies := make([]*txpool.LazyTransaction, len(txs)) - for i := 0; i < len(txs); i++ { - lazies[i] = &txpool.LazyTransaction{ - Pool: pool, - Hash: txs[i].Hash(), - Tx: txs[i], - Time: txs[i].Time(), - GasFeeCap: uint256.MustFromBig(txs[i].GasFeeCap()), - GasTipCap: uint256.MustFromBig(txs[i].GasTipCap()), - Gas: txs[i].Gas(), - BlobGas: txs[i].BlobGas(), - } - } - pending[addr] = lazies - } + return txs } - return pending } // Locals retrieves the accounts currently considered local by the pool. @@ -1455,6 +1444,10 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, pool.priced.Reheap() } } + gasTip, baseFee := pool.gasTip.Load(), pendingBaseFee + go func() { + pool.pendingCache.sync2cache(pool, pool.createFilter(gasTip.ToBig(), baseFee)) + }() // Update all accounts to the latest known pending nonce nonces := make(map[common.Address]uint64, len(pool.pending)) for addr, list := range pool.pending { diff --git a/miner/worker.go b/miner/worker.go index c5686f4d5d..e42d955656 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -1196,7 +1196,7 @@ func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment) err pendingBlobTxs := w.eth.TxPool().Pending(filter) packFromTxpoolTimer.UpdateSince(start) - log.Debug("packFromTxpoolTimer", "duration", common.PrettyDuration(time.Since(start)), "hash", env.header.Hash()) + log.Debug("packFromTxpoolTimer", "duration", common.PrettyDuration(time.Since(start)), "hash", env.header.Hash(), "txs", len(pending)) // Split the pending transactions into locals and remotes. localPlainTxs, remotePlainTxs := make(map[common.Address][]*txpool.LazyTransaction), pendingPlainTxs From 10243691eebbd1a36fb47269dcd0914779c59707 Mon Sep 17 00:00:00 2001 From: andyzhang2023 Date: Fri, 13 Sep 2024 09:58:59 +0800 Subject: [PATCH 2/7] adapt to upstream changes --- core/txpool/legacypool/legacypool.go | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 0e07a1603c..25ead4ba2a 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -350,7 +350,7 @@ func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.A pool.gasTip.Store(uint256.NewInt(gasTip)) // set dumper - pool.pendingCache.sync2cache(pool, pool.createFilter(gasTip, head.BaseFee)) + pool.pendingCache.sync2cache(pool, pool.createFilter(pool.gasTip.Load().ToBig(), head.BaseFee)) // Initialize the state with head block, or fallback to empty one in // case the head state is not available (might occur when node is not @@ -402,7 +402,7 @@ func (pool *LegacyPool) loopOfSync() { if gasTip == nil || currHead == nil { continue } - pool.pendingCache.sync2cache(pool, pool.createFilter(gasTip, currHead.BaseFee)) + pool.pendingCache.sync2cache(pool, pool.createFilter(gasTip.ToBig(), currHead.BaseFee)) } } } @@ -645,11 +645,21 @@ func (pool *LegacyPool) ContentFrom(addr common.Address) ([]*types.Transaction, // The transactions can also be pre-filtered by the dynamic fee components to // reduce allocations and load on downstream subsystems. func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction { - // TODO need to confirm + empty := txpool.PendingFilter{} + if filter == empty { + // return all pending transactions, no filtering + return pool.pendingCache.dump(false) + } + // If only blob transactions are requested, this pool is unsuitable as it + // contains none, don't even bother. + if filter.OnlyBlobTxs { + return nil + } defer func(t0 time.Time) { getPendingDurationTimer.Update(time.Since(t0)) }(time.Now()) - return pool.pendingCache.dump(enforceTips) + // It is a bit tricky here, we don't do the filtering here. + return pool.pendingCache.dump(true) } func (pool *LegacyPool) createFilter(gasPrice, baseFee *big.Int) func(txs types.Transactions, addr common.Address) types.Transactions { From d0f20cc75ff6c61254c7f14094c70c2df566f9ef Mon Sep 17 00:00:00 2001 From: andyzhang2023 Date: Thu, 19 Sep 2024 09:42:46 +0800 Subject: [PATCH 3/7] fix warning --- miner/worker.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/miner/worker.go b/miner/worker.go index e42d955656..c3ff52d5c0 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -20,12 +20,13 @@ import ( "context" "errors" "fmt" - mapset "github.com/deckarep/golang-set/v2" "math/big" "sync" "sync/atomic" "time" + mapset "github.com/deckarep/golang-set/v2" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/consensus/misc" @@ -1196,7 +1197,7 @@ func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment) err pendingBlobTxs := w.eth.TxPool().Pending(filter) packFromTxpoolTimer.UpdateSince(start) - log.Debug("packFromTxpoolTimer", "duration", common.PrettyDuration(time.Since(start)), "hash", env.header.Hash(), "txs", len(pending)) + log.Debug("packFromTxpoolTimer", "duration", common.PrettyDuration(time.Since(start)), "hash", env.header.Hash(), "txs", len(pendingPlainTxs)) // Split the pending transactions into locals and remotes. localPlainTxs, remotePlainTxs := make(map[common.Address][]*txpool.LazyTransaction), pendingPlainTxs From 0b873e21fd27210da9be518c51638fdbda968ba9 Mon Sep 17 00:00:00 2001 From: andyzhang2023 Date: Thu, 19 Sep 2024 14:04:02 +0800 Subject: [PATCH 4/7] fix pending cache interval --- core/txpool/legacypool/legacypool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 25ead4ba2a..5b7404d5b8 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -391,7 +391,7 @@ func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.A } func (pool *LegacyPool) loopOfSync() { - ticker := time.NewTicker(200 * time.Second) + ticker := time.NewTicker(200 * time.Millisecond) for { select { case <-pool.reorgShutdownCh: From 4ba1d241feae34309f9643debd79ead8653e95ab Mon Sep 17 00:00:00 2001 From: andyzhang2023 Date: Wed, 9 Oct 2024 11:07:59 +0800 Subject: [PATCH 5/7] fix ut --- ethclient/simulated/backend_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/ethclient/simulated/backend_test.go b/ethclient/simulated/backend_test.go index a8fd7913c3..9307e2105a 100644 --- a/ethclient/simulated/backend_test.go +++ b/ethclient/simulated/backend_test.go @@ -214,6 +214,7 @@ func TestForkResendTx(t *testing.T) { t.Fatalf("could not create transaction: %v", err) } client.SendTransaction(ctx, tx) + time.Sleep(1 * time.Second) sim.Commit() // 3. From 06d810d5bd1a67dca8f040dba0d77408709f2de9 Mon Sep 17 00:00:00 2001 From: andyzhang2023 Date: Wed, 9 Oct 2024 11:56:23 +0800 Subject: [PATCH 6/7] fix ut --- accounts/abi/bind/util_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/accounts/abi/bind/util_test.go b/accounts/abi/bind/util_test.go index 592465f2ac..87917d43fa 100644 --- a/accounts/abi/bind/util_test.go +++ b/accounts/abi/bind/util_test.go @@ -83,6 +83,7 @@ func TestWaitDeployed(t *testing.T) { // Send and mine the transaction. backend.Client().SendTransaction(ctx, tx) + time.Sleep(500 * time.Millisecond) //wait for the tx to be mined backend.Commit() select { @@ -117,6 +118,7 @@ func TestWaitDeployedCornerCases(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() backend.Client().SendTransaction(ctx, tx) + time.Sleep(500 * time.Millisecond) //wait for the tx to be mined backend.Commit() notContractCreation := errors.New("tx is not contract creation") if _, err := bind.WaitDeployed(ctx, backend.Client(), tx); err.Error() != notContractCreation.Error() { @@ -135,5 +137,6 @@ func TestWaitDeployedCornerCases(t *testing.T) { }() backend.Client().SendTransaction(ctx, tx) + time.Sleep(500 * time.Millisecond) //wait for the tx to be mined cancel() } From 8d4302bb55c0fba86a317d20e960de9fdc1e50f6 Mon Sep 17 00:00:00 2001 From: andyzhang2023 Date: Wed, 9 Oct 2024 14:54:17 +0800 Subject: [PATCH 7/7] fix ut --- ethclient/simulated/backend.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ethclient/simulated/backend.go b/ethclient/simulated/backend.go index 1df0a73150..5c137d4079 100644 --- a/ethclient/simulated/backend.go +++ b/ethclient/simulated/backend.go @@ -178,6 +178,8 @@ func (n *Backend) Close() error { // Commit seals a block and moves the chain forward to a new empty block. func (n *Backend) Commit() common.Hash { + // wait for the transactions to be sync into cache + time.Sleep(350 * time.Millisecond) return n.beacon.Commit() }