diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index e24cbaf25e..7dc4cd5f37 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -400,15 +400,6 @@ func (f *TxFetcher) Stop() { close(f.quit) } -func (f *TxFetcher) IsWorking() (bool, error) { - select { - case <-f.quit: - return false, errTerminated - default: - return true, nil - } -} - func (f *TxFetcher) loop() { var ( waitTimer = new(mclock.Timer) diff --git a/eth/handler_eth.go b/eth/handler_eth.go index 7f36ae722d..6a11bf3689 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -25,31 +25,10 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/eth/fetcher" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/p2p/enode" ) -// TxQueueSize is the size of the transaction queue used to enqueue transactions -const ( - TxQueueSize = 16 -) - -// enqueueTx is a channel to enqueue transactions in parallel. -// It is used to improve the performance of transaction enqueued. -var enqueueTx = make(chan func(), TxQueueSize) - -func init() { - // run the transaction enqueuing loop - for i := 0; i < TxQueueSize; i++ { - go func() { - for enqueue := range enqueueTx { - enqueue() - } - }() - } -} - // ethHandler implements the eth.Backend interface to handle the various network // packets that are sent as replies or broadcasts. type ethHandler handler @@ -113,28 +92,16 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error { return errors.New("disallowed broadcast blob transaction") } } - return asyncEnqueueTx(peer, *packet, h.txFetcher, false) + return h.txFetcher.Enqueue(peer.ID(), *packet, false) case *eth.PooledTransactionsResponse: - return asyncEnqueueTx(peer, *packet, h.txFetcher, true) + return h.txFetcher.Enqueue(peer.ID(), *packet, true) default: return fmt.Errorf("unexpected eth packet type: %T", packet) } } -func asyncEnqueueTx(peer *eth.Peer, txs []*types.Transaction, fetcher *fetcher.TxFetcher, directed bool) error { - if working, err := fetcher.IsWorking(); !working { - return err - } - enqueueTx <- func() { - if err := fetcher.Enqueue(peer.ID(), txs, directed); err != nil { - peer.Log().Warn("Failed to enqueue transaction", "err", err) - } - } - return nil -} - // handleBlockAnnounces is invoked from a peer's message handler when it transmits a // batch of block announcements for the local node to process. func (h *ethHandler) handleBlockAnnounces(peer *eth.Peer, hashes []common.Hash, numbers []uint64) error { diff --git a/eth/protocols/eth/broadcast.go b/eth/protocols/eth/broadcast.go index 89c848a429..002d222e64 100644 --- a/eth/protocols/eth/broadcast.go +++ b/eth/protocols/eth/broadcast.go @@ -34,8 +34,6 @@ const ( var ( txAnnounceAbandonMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/abandon", nil) txBroadcastAbandonMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/broadcasts/abandon", nil) - txP2PAnnQueueGauge = metrics.NewRegisteredGauge("eth/fetcher/transaction/p2p/ann/queue", nil) - txP2PBroadQueueGauge = metrics.NewRegisteredGauge("eth/fetcher/transaction/p2p/broad/queue", nil) ) // safeGetPeerIP @@ -135,8 +133,6 @@ func (p *Peer) broadcastTransactions() { queue = queue[:copy(queue, queue[len(queue)-maxQueuedTxs:])] } - txP2PBroadQueueGauge.Update(int64(len(queue))) - case <-done: done = nil @@ -212,8 +208,6 @@ func (p *Peer) announceTransactions() { queue = queue[:copy(queue, queue[len(queue)-maxQueuedTxAnns:])] } - txP2PAnnQueueGauge.Update(int64(len(queue))) - case <-done: done = nil