diff --git a/eth/backend.go b/eth/backend.go index b690938e87..b1356ed4c3 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -21,12 +21,13 @@ import ( "context" "errors" "fmt" - "github.com/ethereum/go-ethereum/core/txpool/bundlepool" "math/big" "runtime" "sync" "time" + "github.com/ethereum/go-ethereum/core/txpool/bundlepool" + "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" @@ -308,6 +309,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { // Permit the downloader to use the trie cache allowance during fast sync cacheLimit := cacheConfig.TrieCleanLimit + cacheConfig.TrieDirtyLimit + cacheConfig.SnapshotLimit if eth.handler, err = newHandler(&handlerConfig{ + StaticNodes: stack.Config().P2P.StaticNodes, Database: chainDb, Chain: eth.blockchain, TxPool: eth.txPool, diff --git a/eth/handler.go b/eth/handler.go index db8f0ed5cd..31d851e03d 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -41,6 +41,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/triedb/pathdb" ) @@ -88,6 +89,7 @@ type txPool interface { // handlerConfig is the collection of initialization parameters to create a full // node network handler. type handlerConfig struct { + StaticNodes []*enode.Node Database ethdb.Database // Database for direct sync insertions Chain *core.BlockChain // Blockchain to serve data from TxPool txPool // Transaction pool to propagate from @@ -137,6 +139,8 @@ type handler struct { handlerStartCh chan struct{} handlerDoneCh chan struct{} + + staticNodes map[string]struct{} } // newHandler returns a handler for all Ethereum chain management protocol. @@ -159,7 +163,12 @@ func newHandler(config *handlerConfig) (*handler, error) { quitSync: make(chan struct{}), handlerDoneCh: make(chan struct{}), handlerStartCh: make(chan struct{}), + staticNodes: make(map[string]struct{}), + } + for _, node := range config.StaticNodes { + h.staticNodes[node.ID().String()] = struct{}{} } + if config.Sync == downloader.FullSync { // The database seems empty as the current block is the genesis. Yet the snap // block is ahead, so snap sync was enabled for this node at a certain point. @@ -642,6 +651,7 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) { peers := h.peers.peersWithoutTransaction(tx.Hash()) var numDirect int + var direct, announce []*ethPeer = nil, peers switch { case tx.Type() == types.BlobTxType: blobTxs++ @@ -649,14 +659,34 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) { largeTxs++ default: numDirect = int(math.Sqrt(float64(len(peers)))) + // Split the peers into direct-peers and announce-peers + // we send the tx directly to direct-peers; all static nodes are direct-peers + // we announce the tx to announce-peers + direct = make([]*ethPeer, 0, numDirect) + announce = make([]*ethPeer, 0, len(peers)-numDirect) + for _, peer := range peers { + if _, ok := h.staticNodes[peer.ID()]; ok { + direct = append(direct, peer) + } else { + announce = append(announce, peer) + } + } + + // if directly-peers are not enough, move some announce-peers into directly pool + for len(direct) < numDirect && len(announce) > 0 { + // shift one peer to trusted + direct = append(direct, announce[0]) + announce = announce[1:] + } } + // Send the tx unconditionally to a subset of our peers - for _, peer := range peers[:numDirect] { + for _, peer := range direct { txset[peer] = append(txset[peer], tx.Hash()) log.Trace("Broadcast transaction", "peer", peer.ID(), "hash", tx.Hash()) } // For the remaining peers, send announcement only - for _, peer := range peers[numDirect:] { + for _, peer := range announce { annos[peer] = append(annos[peer], tx.Hash()) log.Trace("Announce transaction", "peer", peer.ID(), "hash", tx.Hash()) }