Skip to content

Commit

Permalink
Revert "optimization: enqueue transactions in parallel from p2p (bnb-…
Browse files Browse the repository at this point in the history
…chain#173)"

This reverts commit 593869d.
  • Loading branch information
welkin22 committed Oct 28, 2024
1 parent fc9dc36 commit 5d616e1
Show file tree
Hide file tree
Showing 3 changed files with 2 additions and 50 deletions.
9 changes: 0 additions & 9 deletions eth/fetcher/tx_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,15 +400,6 @@ func (f *TxFetcher) Stop() {
close(f.quit)
}

func (f *TxFetcher) IsWorking() (bool, error) {
select {
case <-f.quit:
return false, errTerminated
default:
return true, nil
}
}

func (f *TxFetcher) loop() {
var (
waitTimer = new(mclock.Timer)
Expand Down
37 changes: 2 additions & 35 deletions eth/handler_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,31 +25,10 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/fetcher"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/p2p/enode"
)

// TxQueueSize is the size of the transaction queue used to enqueue transactions
const (
TxQueueSize = 16
)

// enqueueTx is a channel to enqueue transactions in parallel.
// It is used to improve the performance of transaction enqueued.
var enqueueTx = make(chan func(), TxQueueSize)

func init() {
// run the transaction enqueuing loop
for i := 0; i < TxQueueSize; i++ {
go func() {
for enqueue := range enqueueTx {
enqueue()
}
}()
}
}

// ethHandler implements the eth.Backend interface to handle the various network
// packets that are sent as replies or broadcasts.
type ethHandler handler
Expand Down Expand Up @@ -113,28 +92,16 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
return errors.New("disallowed broadcast blob transaction")
}
}
return asyncEnqueueTx(peer, *packet, h.txFetcher, false)
return h.txFetcher.Enqueue(peer.ID(), *packet, false)

case *eth.PooledTransactionsResponse:
return asyncEnqueueTx(peer, *packet, h.txFetcher, true)
return h.txFetcher.Enqueue(peer.ID(), *packet, true)

default:
return fmt.Errorf("unexpected eth packet type: %T", packet)
}
}

func asyncEnqueueTx(peer *eth.Peer, txs []*types.Transaction, fetcher *fetcher.TxFetcher, directed bool) error {
if working, err := fetcher.IsWorking(); !working {
return err
}
enqueueTx <- func() {
if err := fetcher.Enqueue(peer.ID(), txs, directed); err != nil {
peer.Log().Warn("Failed to enqueue transaction", "err", err)
}
}
return nil
}

// handleBlockAnnounces is invoked from a peer's message handler when it transmits a
// batch of block announcements for the local node to process.
func (h *ethHandler) handleBlockAnnounces(peer *eth.Peer, hashes []common.Hash, numbers []uint64) error {
Expand Down
6 changes: 0 additions & 6 deletions eth/protocols/eth/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ const (
var (
txAnnounceAbandonMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/abandon", nil)
txBroadcastAbandonMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/broadcasts/abandon", nil)
txP2PAnnQueueGauge = metrics.NewRegisteredGauge("eth/fetcher/transaction/p2p/ann/queue", nil)
txP2PBroadQueueGauge = metrics.NewRegisteredGauge("eth/fetcher/transaction/p2p/broad/queue", nil)
)

// safeGetPeerIP
Expand Down Expand Up @@ -135,8 +133,6 @@ func (p *Peer) broadcastTransactions() {
queue = queue[:copy(queue, queue[len(queue)-maxQueuedTxs:])]
}

txP2PBroadQueueGauge.Update(int64(len(queue)))

case <-done:
done = nil

Expand Down Expand Up @@ -212,8 +208,6 @@ func (p *Peer) announceTransactions() {
queue = queue[:copy(queue, queue[len(queue)-maxQueuedTxAnns:])]
}

txP2PAnnQueueGauge.Update(int64(len(queue)))

case <-done:
done = nil

Expand Down

0 comments on commit 5d616e1

Please sign in to comment.