From 18066a38a8481e323846a8d0194327dfb663aa11 Mon Sep 17 00:00:00 2001 From: Bui Quang Minh Date: Wed, 20 Nov 2024 10:45:34 +0700 Subject: [PATCH] eth/fetcher: don't skip block/header when parent is not found Currently, we simply skip importing block/header when parent block/header is not found. However, since multiple blocks can be imported in parallel, the not found parent might be due to the fact that the parent import does not finish yet. This leads to a suitation that the correct block in canonical chain is skipped and the node gets stuck until the peer timeout. We observe this behavior when there are reorgs and block import is time consuming. This commit fixes it by not skipping these blocks (by not marking them as done) and re-queueing these for future processing when finding that the parent is in the queue already. --- eth/fetcher/block_fetcher.go | 71 ++++++++++++++++++++++++++++++++---- 1 file changed, 64 insertions(+), 7 deletions(-) diff --git a/eth/fetcher/block_fetcher.go b/eth/fetcher/block_fetcher.go index fd547ebce..46f98e2b3 100644 --- a/eth/fetcher/block_fetcher.go +++ b/eth/fetcher/block_fetcher.go @@ -20,6 +20,7 @@ package fetcher import ( "errors" "math/rand" + "sync" "time" "github.com/ethereum/go-ethereum/common" @@ -179,9 +180,11 @@ type BlockFetcher struct { completing map[common.Hash]*blockAnnounce // Blocks with headers, currently body-completing // Block cache - queue *prque.Prque // Queue containing the import operations (block number sorted) - queues map[string]int // Per peer block counts to prevent memory exhaustion - queued map[common.Hash]*blockOrHeaderInject // Set of already queued blocks (to dedup imports) + queueLock sync.Mutex // The lock to protect queue and queued from concurrent use + queue *prque.Prque // Queue containing the import operations (block number sorted) (must hold queueLock when accessing) + queued map[common.Hash]*blockOrHeaderInject // Set of already queued blocks (to dedup imports) (must hold queueLock when accessing) + + queues map[string]int // Per peer block counts to prevent memory exhaustion // Callbacks getHeader HeaderRetrievalFn // Retrieves a header from the local chain @@ -361,6 +364,8 @@ func (f *BlockFetcher) loop() { } // Import any queued blocks that could potentially fit height := f.chainHeight() + var insertedOps []*blockOrHeaderInject + f.queueLock.Lock() for !f.queue.Empty() { op := f.queue.PopItem().(*blockOrHeaderInject) hash := op.hash() @@ -381,6 +386,11 @@ func (f *BlockFetcher) loop() { f.forgetBlock(hash) continue } + insertedOps = append(insertedOps, op) + } + f.queueLock.Unlock() + + for _, op := range insertedOps { if f.light { f.importHeaders(op.origin, op.header) } else { @@ -442,7 +452,9 @@ func (f *BlockFetcher) loop() { case hash := <-f.done: // A pending import finished, remove all traces of the notification f.forgetHash(hash) + f.queueLock.Lock() f.forgetBlock(hash) + f.queueLock.Unlock() case <-fetchTimer.C: // At least one block's timer ran out, check for needing retrieval @@ -533,8 +545,12 @@ func (f *BlockFetcher) loop() { for _, header := range task.headers { hash := header.Hash() + f.queueLock.Lock() + isNotQueued := f.queued[hash] == nil + f.queueLock.Unlock() + // Filter fetcher-requested headers from other synchronisation algorithms - if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil { + if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && isNotQueued { // If the delivered header does not match the promised number, drop the announcer if header.Number.Uint64() != announce.number { log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number) @@ -628,7 +644,11 @@ func (f *BlockFetcher) loop() { txnHash common.Hash // calculated lazily and reused ) for hash, announce := range f.completing { - if f.queued[hash] != nil || announce.origin != task.peer { + f.queueLock.Lock() + isQueued := f.queued[hash] != nil + f.queueLock.Unlock() + + if isQueued || announce.origin != task.peer { continue } if uncleHash == (common.Hash{}) { @@ -754,6 +774,7 @@ func (f *BlockFetcher) enqueue(peer string, header *types.Header, block *types.B return } // Schedule the block for future importing + f.queueLock.Lock() if _, ok := f.queued[hash]; !ok { op := &blockOrHeaderInject{origin: peer} if header != nil { @@ -770,6 +791,7 @@ func (f *BlockFetcher) enqueue(peer string, header *types.Header, block *types.B } log.Debug("Queued delivered header or block", "peer", peer, "number", number, "hash", hash, "queued", f.queue.Size()) } + f.queueLock.Unlock() } // importHeaders spawns a new goroutine to run a header insertion into the chain. @@ -780,13 +802,30 @@ func (f *BlockFetcher) importHeaders(peer string, header *types.Header) { log.Debug("Importing propagated header", "peer", peer, "number", header.Number, "hash", hash) go func() { - defer func() { f.done <- hash }() + f.queueLock.Lock() + parentHash := header.ParentHash + _, isParentQueued := f.queued[parentHash] + // If the parent's unknown, abort insertion parent := f.getHeader(header.ParentHash) if parent == nil { log.Debug("Unknown parent of propagated header", "peer", peer, "number", header.Number, "hash", hash, "parent", header.ParentHash) + // This can be still a valid header but maybe the parent header parallel insert does not finish yet, + // put this header back to the queue for future processing + if isParentQueued { + op := &blockOrHeaderInject{ + origin: peer, + header: header, + } + f.queue.Push(op, -int64(header.Number.Uint64())) + f.queueLock.Unlock() + } return } + f.queueLock.Unlock() + + defer func() { f.done <- hash }() + // Validate the header and if something went wrong, drop the peer if err := f.verifyHeader(header); err != nil && err != consensus.ErrFutureBlock { log.Debug("Propagated header verification failed", "peer", peer, "number", header.Number, "hash", hash, "err", err) @@ -814,14 +853,31 @@ func (f *BlockFetcher) importBlocks(peer string, block *types.Block, sidecars [] // Run the import on a new thread log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash) go func() { - defer func() { f.done <- hash }() + f.queueLock.Lock() + parentHash := block.ParentHash() + _, isParentQueued := f.queued[parentHash] // If the parent's unknown, abort insertion parent := f.getBlock(block.ParentHash()) if parent == nil { log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash()) + // This can be still a valid block but maybe the parent block parallel insert does not finish yet, + // put this block back to the queue for future processing + if isParentQueued { + op := &blockOrHeaderInject{ + origin: peer, + block: block, + sidecars: sidecars, + } + f.queue.Push(op, -int64(block.NumberU64())) + } + f.queueLock.Unlock() return } + f.queueLock.Unlock() + + defer func() { f.done <- hash }() + // Quickly validate the header and propagate the block if it passes err := f.verifyHeader(block.Header()) if err == nil { @@ -906,6 +962,7 @@ func (f *BlockFetcher) forgetHash(hash common.Hash) { // forgetBlock removes all traces of a queued block from the fetcher's internal // state. +// The caller must hold the queueLock. func (f *BlockFetcher) forgetBlock(hash common.Hash) { if insert := f.queued[hash]; insert != nil { f.queues[insert.origin]--