Skip to content

Commit

Permalink
optimization: enqueue transactions in parallel from p2p (#173)
Browse files Browse the repository at this point in the history
Co-authored-by: Owen <103096885+owen-reorg@users.noreply.github.com>
Co-authored-by: andyzhang2023 <andyzhang2023@gmail.com>
  • Loading branch information
3 people authored Oct 10, 2024
1 parent e3170ae commit 593869d
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 2 deletions.
9 changes: 9 additions & 0 deletions eth/fetcher/tx_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,15 @@ func (f *TxFetcher) Stop() {
close(f.quit)
}

func (f *TxFetcher) IsWorking() (bool, error) {
select {
case <-f.quit:
return false, errTerminated
default:
return true, nil
}
}

func (f *TxFetcher) loop() {
var (
waitTimer = new(mclock.Timer)
Expand Down
37 changes: 35 additions & 2 deletions eth/handler_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,31 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/fetcher"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/p2p/enode"
)

// TxQueueSize is the size of the transaction queue used to enqueue transactions
const (
TxQueueSize = 16
)

// enqueueTx is a channel to enqueue transactions in parallel.
// It is used to improve the performance of transaction enqueued.
var enqueueTx = make(chan func(), TxQueueSize)

func init() {
// run the transaction enqueuing loop
for i := 0; i < TxQueueSize; i++ {
go func() {
for enqueue := range enqueueTx {
enqueue()
}
}()
}
}

// ethHandler implements the eth.Backend interface to handle the various network
// packets that are sent as replies or broadcasts.
type ethHandler handler
Expand Down Expand Up @@ -92,16 +113,28 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
return errors.New("disallowed broadcast blob transaction")
}
}
return h.txFetcher.Enqueue(peer.ID(), *packet, false)
return asyncEnqueueTx(peer, *packet, h.txFetcher, false)

case *eth.PooledTransactionsResponse:
return h.txFetcher.Enqueue(peer.ID(), *packet, true)
return asyncEnqueueTx(peer, *packet, h.txFetcher, true)

default:
return fmt.Errorf("unexpected eth packet type: %T", packet)
}
}

func asyncEnqueueTx(peer *eth.Peer, txs []*types.Transaction, fetcher *fetcher.TxFetcher, directed bool) error {
if working, err := fetcher.IsWorking(); !working {
return err
}
enqueueTx <- func() {
if err := fetcher.Enqueue(peer.ID(), txs, directed); err != nil {
peer.Log().Warn("Failed to enqueue transaction", "err", err)
}
}
return nil
}

// handleBlockAnnounces is invoked from a peer's message handler when it transmits a
// batch of block announcements for the local node to process.
func (h *ethHandler) handleBlockAnnounces(peer *eth.Peer, hashes []common.Hash, numbers []uint64) error {
Expand Down
6 changes: 6 additions & 0 deletions eth/protocols/eth/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ const (
var (
txAnnounceAbandonMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/abandon", nil)
txBroadcastAbandonMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/broadcasts/abandon", nil)
txP2PAnnQueueGauge = metrics.NewRegisteredGauge("eth/fetcher/transaction/p2p/ann/queue", nil)
txP2PBroadQueueGauge = metrics.NewRegisteredGauge("eth/fetcher/transaction/p2p/broad/queue", nil)
)

// safeGetPeerIP
Expand Down Expand Up @@ -133,6 +135,8 @@ func (p *Peer) broadcastTransactions() {
queue = queue[:copy(queue, queue[len(queue)-maxQueuedTxs:])]
}

txP2PBroadQueueGauge.Update(int64(len(queue)))

case <-done:
done = nil

Expand Down Expand Up @@ -208,6 +212,8 @@ func (p *Peer) announceTransactions() {
queue = queue[:copy(queue, queue[len(queue)-maxQueuedTxAnns:])]
}

txP2PAnnQueueGauge.Update(int64(len(queue)))

case <-done:
done = nil

Expand Down

0 comments on commit 593869d

Please sign in to comment.