Skip to content

Commit

Permalink
Merge branch 'develop' into feat-difflayertool
Browse files Browse the repository at this point in the history
  • Loading branch information
owen-reorg authored Oct 14, 2024
2 parents 3bf57a4 + 0bd2d52 commit f62e9e6
Show file tree
Hide file tree
Showing 13 changed files with 204 additions and 54 deletions.
3 changes: 3 additions & 0 deletions accounts/abi/bind/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func TestWaitDeployed(t *testing.T) {

// Send and mine the transaction.
backend.Client().SendTransaction(ctx, tx)
time.Sleep(500 * time.Millisecond) //wait for the tx to be mined
backend.Commit()

select {
Expand Down Expand Up @@ -117,6 +118,7 @@ func TestWaitDeployedCornerCases(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
backend.Client().SendTransaction(ctx, tx)
time.Sleep(500 * time.Millisecond) //wait for the tx to be mined
backend.Commit()
notContractCreation := errors.New("tx is not contract creation")
if _, err := bind.WaitDeployed(ctx, backend.Client(), tx); err.Error() != notContractCreation.Error() {
Expand All @@ -135,5 +137,6 @@ func TestWaitDeployedCornerCases(t *testing.T) {
}()

backend.Client().SendTransaction(ctx, tx)
time.Sleep(500 * time.Millisecond) //wait for the tx to be mined
cancel()
}
79 changes: 74 additions & 5 deletions core/txpool/legacypool/cache_for_miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"sync"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/metrics"
"github.com/holiman/uint256"
)

var (
Expand All @@ -20,12 +22,18 @@ type cacheForMiner struct {
pending map[common.Address]map[*types.Transaction]struct{}
locals map[common.Address]bool
addrLock sync.Mutex

allCache map[common.Address][]*txpool.LazyTransaction
filteredCache map[common.Address][]*txpool.LazyTransaction
cacheLock sync.Mutex
}

func newCacheForMiner() *cacheForMiner {
return &cacheForMiner{
pending: make(map[common.Address]map[*types.Transaction]struct{}),
locals: make(map[common.Address]bool),
pending: make(map[common.Address]map[*types.Transaction]struct{}),
locals: make(map[common.Address]bool),
allCache: make(map[common.Address][]*txpool.LazyTransaction),
filteredCache: make(map[common.Address][]*txpool.LazyTransaction),
}
}

Expand Down Expand Up @@ -67,8 +75,9 @@ func (pc *cacheForMiner) del(txs types.Transactions, signer types.Signer) {
}
}

func (pc *cacheForMiner) dump() map[common.Address]types.Transactions {
func (pc *cacheForMiner) sync2cache(pool txpool.LazyResolver, filter func(txs types.Transactions, addr common.Address) types.Transactions) {
pending := make(map[common.Address]types.Transactions)

pc.txLock.Lock()
for addr, txlist := range pc.pending {
pending[addr] = make(types.Transactions, 0, len(txlist))
Expand All @@ -77,11 +86,71 @@ func (pc *cacheForMiner) dump() map[common.Address]types.Transactions {
}
}
pc.txLock.Unlock()
for _, txs := range pending {

// convert pending to lazyTransactions
filteredLazy := make(map[common.Address][]*txpool.LazyTransaction)
allLazy := make(map[common.Address][]*txpool.LazyTransaction)
for addr, txs := range pending {
// sorted by nonce
sort.Sort(types.TxByNonce(txs))
filterd := filter(txs, addr)
if len(txs) > 0 {
lazies := make([]*txpool.LazyTransaction, len(txs))
for i, tx := range txs {
lazies[i] = &txpool.LazyTransaction{
Pool: pool,
Hash: tx.Hash(),
Tx: tx,
Time: tx.Time(),
GasFeeCap: uint256.MustFromBig(tx.GasFeeCap()),
GasTipCap: uint256.MustFromBig(tx.GasTipCap()),
Gas: tx.Gas(),
BlobGas: tx.BlobGas(),
}
}
allLazy[addr] = lazies
filteredLazy[addr] = lazies[:len(filterd)]
}
}

pc.cacheLock.Lock()
pc.filteredCache = filteredLazy
pc.allCache = allLazy
pc.cacheLock.Unlock()
}

func (pc *cacheForMiner) dump(filtered bool) map[common.Address][]*txpool.LazyTransaction {
pc.cacheLock.Lock()
pending := pc.allCache
if filtered {
pending = pc.filteredCache
}
pc.cacheLock.Unlock()
return pending

//pendingLazy := make(map[common.Address][]*txpool.LazyTransaction)
//var txnum = 0
//for addr, txs := range pending {
// // If the miner requests tip enforcement, cap the lists now
// if enforceTip && !pc.IsLocal(addr) {
// for i, tx := range txs {
// if tx.Tx.EffectiveGasTipIntCmp(gasPrice, baseFee) < 0 {
// txs = txs[:i]
// break
// }
// }
// }
// if len(txs) > 0 {
// lazies := make([]*txpool.LazyTransaction, len(txs))
// for i, tx := range txs {
// lazies[i] = tx
// txnum++
// }
// pendingLazy[addr] = lazies
// }
//}
//log.Info("cacheForMiner dump", "duration", time.Since(start), "accounts", len(pending), "txs", txnum)
//return pendingLazy
}

func (pc *cacheForMiner) markLocal(addr common.Address) {
Expand All @@ -91,7 +160,7 @@ func (pc *cacheForMiner) markLocal(addr common.Address) {
pc.locals[addr] = true
}

func (pc *cacheForMiner) isLocal(addr common.Address) bool {
func (pc *cacheForMiner) IsLocal(addr common.Address) bool {
pc.addrLock.Lock()
defer pc.addrLock.Unlock()
return pc.locals[addr]
Expand Down
102 changes: 60 additions & 42 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,9 @@ func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.A
// Set the basic pool parameters
pool.gasTip.Store(uint256.NewInt(gasTip))

// set dumper
pool.pendingCache.sync2cache(pool, pool.createFilter(pool.gasTip.Load().ToBig(), head.BaseFee))

// Initialize the state with head block, or fallback to empty one in
// case the head state is not available (might occur when node is not
// fully synced).
Expand Down Expand Up @@ -383,9 +386,27 @@ func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.A
}
pool.wg.Add(1)
go pool.loop()
go pool.loopOfSync()
return nil
}

func (pool *LegacyPool) loopOfSync() {
ticker := time.NewTicker(200 * time.Millisecond)
for {
select {
case <-pool.reorgShutdownCh:
return
case <-ticker.C:
gasTip := pool.gasTip.Load()
currHead := pool.currentHead.Load()
if gasTip == nil || currHead == nil {
continue
}
pool.pendingCache.sync2cache(pool, pool.createFilter(gasTip.ToBig(), currHead.BaseFee))
}
}
}

// loop is the transaction pool's main event loop, waiting for and reacting to
// outside blockchain events as well as for various reporting and transaction
// eviction events.
Expand Down Expand Up @@ -624,57 +645,35 @@ func (pool *LegacyPool) ContentFrom(addr common.Address) ([]*types.Transaction,
// The transactions can also be pre-filtered by the dynamic fee components to
// reduce allocations and load on downstream subsystems.
func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction {
// TODO need to confirm
defer func(t0 time.Time) {
getPendingDurationTimer.Update(time.Since(t0))
}(time.Now())
empty := txpool.PendingFilter{}
if filter == empty {
// return all pending transactions, no filtering
return pool.pendingCache.dump(false)
}
// If only blob transactions are requested, this pool is unsuitable as it
// contains none, don't even bother.
if filter.OnlyBlobTxs {
return nil
}
defer func(t0 time.Time) {
getPendingDurationTimer.Update(time.Since(t0))
}(time.Now())
// It is a bit tricky here, we don't do the filtering here.
return pool.pendingCache.dump(true)
}

// Convert the new uint256.Int types to the old big.Int ones used by the legacy pool
var (
minTipBig *big.Int
baseFeeBig *big.Int
)
if filter.MinTip != nil {
minTipBig = filter.MinTip.ToBig()
}
if filter.BaseFee != nil {
baseFeeBig = filter.BaseFee.ToBig()
}
pending := make(map[common.Address][]*txpool.LazyTransaction, len(pool.pending))
for addr, txs := range pool.pendingCache.dump() {

// If the miner requests tip enforcement, cap the lists now
if minTipBig != nil && !pool.locals.contains(addr) {
func (pool *LegacyPool) createFilter(gasPrice, baseFee *big.Int) func(txs types.Transactions, addr common.Address) types.Transactions {
return func(txs types.Transactions, addr common.Address) types.Transactions {
if !pool.pendingCache.IsLocal(addr) {
for i, tx := range txs {
if tx.EffectiveGasTipIntCmp(minTipBig, baseFeeBig) < 0 {
if tx.EffectiveGasTipIntCmp(gasPrice, baseFee) < 0 {
txs = txs[:i]
break
}
}
}
if len(txs) > 0 {
lazies := make([]*txpool.LazyTransaction, len(txs))
for i := 0; i < len(txs); i++ {
lazies[i] = &txpool.LazyTransaction{
Pool: pool,
Hash: txs[i].Hash(),
Tx: txs[i],
Time: txs[i].Time(),
GasFeeCap: uint256.MustFromBig(txs[i].GasFeeCap()),
GasTipCap: uint256.MustFromBig(txs[i].GasTipCap()),
Gas: txs[i].Gas(),
BlobGas: txs[i].BlobGas(),
}
}
pending[addr] = lazies
}
return txs
}
return pending
}

// Locals retrieves the accounts currently considered local by the pool.
Expand Down Expand Up @@ -840,6 +839,16 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
}
// If the transaction pool is full, discard underpriced transactions
if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue {
currHead := pool.currentHead.Load()
if currHead != nil && currHead.BaseFee != nil && pool.priced.NeedReheap(currHead) {
if pool.chainconfig.IsLondon(new(big.Int).Add(currHead.Number, big.NewInt(1))) {
baseFee := eip1559.CalcBaseFee(pool.chainconfig, currHead, currHead.Time+1)
pool.priced.SetBaseFee(baseFee)
}
pool.priced.Reheap()
pool.priced.currHead = currHead
}

// If the new transaction is underpriced, don't accept it
if !isLocal && pool.priced.Underpriced(tx) {
log.Trace("Discarding underpriced transaction", "hash", hash, "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
Expand Down Expand Up @@ -1110,7 +1119,9 @@ func (pool *LegacyPool) addRemoteSync(tx *types.Transaction) error {
// to the add is finished. Only use this during tests for determinism!
func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error {
defer func(t0 time.Time) {
addTimer.UpdateSince(t0)
if len(txs) > 0 {
addTimer.Update(time.Since(t0) / time.Duration(len(txs)))
}
}(time.Now())
// Do not treat as local if local transactions have been disabled
local = local && !pool.config.NoLocals
Expand Down Expand Up @@ -1147,7 +1158,9 @@ func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error
pool.mu.Lock()
t0 := time.Now()
newErrs, dirtyAddrs := pool.addTxsLocked(news, local)
addWithLockTimer.UpdateSince(t0)
if len(news) > 0 {
addWithLockTimer.Update(time.Since(t0) / time.Duration(len(news)))
}
pool.mu.Unlock()

var nilSlot = 0
Expand Down Expand Up @@ -1403,6 +1416,9 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
reorgDurationTimer.Update(time.Since(t0))
if reset != nil {
reorgresetTimer.UpdateSince(t0)
if reset.newHead != nil {
log.Info("Transaction pool reorged", "from", reset.oldHead.Number.Uint64(), "to", reset.newHead.Number.Uint64())
}
}
}(time.Now())
defer close(done)
Expand Down Expand Up @@ -1451,10 +1467,12 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
if pool.chainconfig.IsLondon(new(big.Int).Add(reset.newHead.Number, big.NewInt(1))) {
pendingBaseFee = eip1559.CalcBaseFee(pool.chainconfig, reset.newHead, reset.newHead.Time+1)
pool.priced.SetBaseFee(pendingBaseFee)
} else {
pool.priced.Reheap()
}
}
gasTip, baseFee := pool.gasTip.Load(), pendingBaseFee
go func() {
pool.pendingCache.sync2cache(pool, pool.createFilter(gasTip.ToBig(), baseFee))
}()
// Update all accounts to the latest known pending nonce
nonces := make(map[common.Address]uint64, len(pool.pending))
for addr, list := range pool.pending {
Expand Down
1 change: 1 addition & 0 deletions core/txpool/legacypool/legacypool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2073,6 +2073,7 @@ func TestDualHeapEviction(t *testing.T) {
add(false)
for baseFee = 0; baseFee <= 1000; baseFee += 100 {
pool.priced.SetBaseFee(big.NewInt(int64(baseFee)))
pool.priced.Reheap()
add(true)
check(highCap, "fee cap")
add(false)
Expand Down
6 changes: 5 additions & 1 deletion core/txpool/legacypool/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ func (h *priceHeap) Pop() interface{} {
// better candidates for inclusion while in other cases (at the top of the baseFee peak)
// the floating heap is better. When baseFee is decreasing they behave similarly.
type pricedList struct {
currHead *types.Header // Current block header for effective tip calculation
// Number of stale price points to (re-heap trigger).
stales atomic.Int64

Expand Down Expand Up @@ -667,6 +668,10 @@ func (l *pricedList) Discard(slots int, force bool) (types.Transactions, bool) {
return drop, true
}

func (l *pricedList) NeedReheap(currHead *types.Header) bool {
return l.currHead == nil || currHead == nil || currHead.Hash().Cmp(l.currHead.Hash()) != 0
}

// Reheap forcibly rebuilds the heap based on the current remote transaction set.
func (l *pricedList) Reheap() {
l.reheapMu.Lock()
Expand Down Expand Up @@ -698,5 +703,4 @@ func (l *pricedList) Reheap() {
// necessary to call right before SetBaseFee when processing a new block.
func (l *pricedList) SetBaseFee(baseFee *big.Int) {
l.urgent.baseFee = baseFee
l.Reheap()
}
9 changes: 9 additions & 0 deletions eth/fetcher/tx_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,15 @@ func (f *TxFetcher) Stop() {
close(f.quit)
}

func (f *TxFetcher) IsWorking() (bool, error) {
select {
case <-f.quit:
return false, errTerminated
default:
return true, nil
}
}

func (f *TxFetcher) loop() {
var (
waitTimer = new(mclock.Timer)
Expand Down
Loading

0 comments on commit f62e9e6

Please sign in to comment.