From e53b785125ba077a073cccaa9e1b11ab84e425f7 Mon Sep 17 00:00:00 2001 From: andyzhang2023 Date: Tue, 20 Aug 2024 15:48:46 +0800 Subject: [PATCH 1/2] optimize p2p broadcasting: send tx directly to static nodes --- eth/backend.go | 4 +++- eth/handler.go | 34 ++++++++++++++++++++++++++++++++-- 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/eth/backend.go b/eth/backend.go index b690938e87..b1356ed4c3 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -21,12 +21,13 @@ import ( "context" "errors" "fmt" - "github.com/ethereum/go-ethereum/core/txpool/bundlepool" "math/big" "runtime" "sync" "time" + "github.com/ethereum/go-ethereum/core/txpool/bundlepool" + "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" @@ -308,6 +309,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { // Permit the downloader to use the trie cache allowance during fast sync cacheLimit := cacheConfig.TrieCleanLimit + cacheConfig.TrieDirtyLimit + cacheConfig.SnapshotLimit if eth.handler, err = newHandler(&handlerConfig{ + StaticNodes: stack.Config().P2P.StaticNodes, Database: chainDb, Chain: eth.blockchain, TxPool: eth.txPool, diff --git a/eth/handler.go b/eth/handler.go index db8f0ed5cd..64bfa14376 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -41,6 +41,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/triedb/pathdb" ) @@ -88,6 +89,7 @@ type txPool interface { // handlerConfig is the collection of initialization parameters to create a full // node network handler. type handlerConfig struct { + StaticNodes []*enode.Node Database ethdb.Database // Database for direct sync insertions Chain *core.BlockChain // Blockchain to serve data from TxPool txPool // Transaction pool to propagate from @@ -137,6 +139,8 @@ type handler struct { handlerStartCh chan struct{} handlerDoneCh chan struct{} + + staticNodes map[string]struct{} } // newHandler returns a handler for all Ethereum chain management protocol. @@ -159,7 +163,12 @@ func newHandler(config *handlerConfig) (*handler, error) { quitSync: make(chan struct{}), handlerDoneCh: make(chan struct{}), handlerStartCh: make(chan struct{}), + staticNodes: make(map[string]struct{}), + } + for _, node := range config.StaticNodes { + h.staticNodes[node.ID().String()] = struct{}{} } + if config.Sync == downloader.FullSync { // The database seems empty as the current block is the genesis. Yet the snap // block is ahead, so snap sync was enabled for this node at a certain point. @@ -642,6 +651,7 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) { peers := h.peers.peersWithoutTransaction(tx.Hash()) var numDirect int + var direct, announce []*ethPeer = nil, peers switch { case tx.Type() == types.BlobTxType: blobTxs++ @@ -649,14 +659,34 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) { largeTxs++ default: numDirect = int(math.Sqrt(float64(len(peers)))) + // Split the peers into trusted and strangers + // trusted peers we send the tx directly; all static nodes are trusted + // strangers we announce the tx + direct = make([]*ethPeer, 0, numDirect) + announce = make([]*ethPeer, 0, len(peers)-numDirect) + for _, peer := range peers { + if _, ok := h.staticNodes[peer.ID()]; ok { + direct = append(direct, peer) + } else { + announce = append(announce, peer) + } + } + + // if tructed peers are not enough, move some strangers into trusted + for len(direct) < numDirect && len(announce) > 0 { + // shift one peer to trusted + direct = append(direct, announce[0]) + announce = announce[1:] + } } + // Send the tx unconditionally to a subset of our peers - for _, peer := range peers[:numDirect] { + for _, peer := range direct { txset[peer] = append(txset[peer], tx.Hash()) log.Trace("Broadcast transaction", "peer", peer.ID(), "hash", tx.Hash()) } // For the remaining peers, send announcement only - for _, peer := range peers[numDirect:] { + for _, peer := range announce { annos[peer] = append(annos[peer], tx.Hash()) log.Trace("Announce transaction", "peer", peer.ID(), "hash", tx.Hash()) } From 87d9ccaedbfbf0cff9a3318e7332c7bf98aa0303 Mon Sep 17 00:00:00 2001 From: andyzhang2023 Date: Wed, 11 Sep 2024 09:28:03 +0800 Subject: [PATCH 2/2] fix some comments --- eth/handler.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/eth/handler.go b/eth/handler.go index 64bfa14376..31d851e03d 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -659,9 +659,9 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) { largeTxs++ default: numDirect = int(math.Sqrt(float64(len(peers)))) - // Split the peers into trusted and strangers - // trusted peers we send the tx directly; all static nodes are trusted - // strangers we announce the tx + // Split the peers into direct-peers and announce-peers + // we send the tx directly to direct-peers; all static nodes are direct-peers + // we announce the tx to announce-peers direct = make([]*ethPeer, 0, numDirect) announce = make([]*ethPeer, 0, len(peers)-numDirect) for _, peer := range peers { @@ -672,7 +672,7 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) { } } - // if tructed peers are not enough, move some strangers into trusted + // if directly-peers are not enough, move some announce-peers into directly pool for len(direct) < numDirect && len(announce) > 0 { // shift one peer to trusted direct = append(direct, announce[0])