Skip to content

Commit

Permalink
refine: simplify fix logic
Browse files Browse the repository at this point in the history
  • Loading branch information
krish-nr committed Nov 5, 2024
1 parent d0fedcb commit 487b722
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 127 deletions.
68 changes: 40 additions & 28 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,16 +242,17 @@ type BlockChain struct {
chainConfig *params.ChainConfig // Chain & network configuration
cacheConfig *CacheConfig // Cache configuration for pruning

db ethdb.Database // Low level persistent database to store final content in
snaps *snapshot.Tree // Snapshot tree for fast trie leaf access
triegc *prque.Prque[int64, common.Hash] // Priority queue mapping block numbers to tries to gc
gcproc time.Duration // Accumulates canonical block processing for trie dumping
lastWrite uint64 // Last block when the state was flushed
flushInterval atomic.Int64 // Time interval (processing time) after which to flush a state
triedb *triedb.Database // The database handler for maintaining trie nodes.
stateCache state.Database // State database to reuse between imports (contains state cache)
proofKeeper *ProofKeeper // Store/Query op-proposal proof to ensure consistent.
txIndexer *txIndexer // Transaction indexer, might be nil if not enabled
db ethdb.Database // Low level persistent database to store final content in
snaps *snapshot.Tree // Snapshot tree for fast trie leaf access
triegc *prque.Prque[int64, common.Hash] // Priority queue mapping block numbers to tries to gc
gcproc time.Duration // Accumulates canonical block processing for trie dumping
lastWrite uint64 // Last block when the state was flushed
flushInterval atomic.Int64 // Time interval (processing time) after which to flush a state
triedb *triedb.Database // The database handler for maintaining trie nodes.
stateCache state.Database // State database to reuse between imports (contains state cache)
proofKeeper *ProofKeeper // Store/Query op-proposal proof to ensure consistent.
txIndexer *txIndexer // Transaction indexer, might be nil if not enabled
stateRecoveringStatus bool

hc *HeaderChain
rmLogsFeed event.Feed
Expand Down Expand Up @@ -337,24 +338,25 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
}

bc := &BlockChain{
chainConfig: chainConfig,
cacheConfig: cacheConfig,
db: db,
triedb: triedb,
triegc: prque.New[int64, common.Hash](nil),
quit: make(chan struct{}),
chainmu: syncx.NewClosableMutex(),
bodyCache: lru.NewCache[common.Hash, *types.Body](bodyCacheLimit),
bodyRLPCache: lru.NewCache[common.Hash, rlp.RawValue](bodyCacheLimit),
receiptsCache: lru.NewCache[common.Hash, []*types.Receipt](receiptsCacheLimit),
blockCache: lru.NewCache[common.Hash, *types.Block](blockCacheLimit),
txLookupCache: lru.NewCache[common.Hash, txLookup](txLookupCacheLimit),
miningReceiptsCache: lru.NewCache[common.Hash, []*types.Receipt](receiptsCacheLimit),
miningTxLogsCache: lru.NewCache[common.Hash, []*types.Log](txLogsCacheLimit),
miningStateCache: lru.NewCache[common.Hash, *state.StateDB](miningStateCacheLimit),
futureBlocks: lru.NewCache[common.Hash, *types.Block](maxFutureBlocks),
engine: engine,
vmConfig: vmConfig,
chainConfig: chainConfig,
cacheConfig: cacheConfig,
db: db,
triedb: triedb,
triegc: prque.New[int64, common.Hash](nil),
quit: make(chan struct{}),
chainmu: syncx.NewClosableMutex(),
bodyCache: lru.NewCache[common.Hash, *types.Body](bodyCacheLimit),
bodyRLPCache: lru.NewCache[common.Hash, rlp.RawValue](bodyCacheLimit),
receiptsCache: lru.NewCache[common.Hash, []*types.Receipt](receiptsCacheLimit),
blockCache: lru.NewCache[common.Hash, *types.Block](blockCacheLimit),
txLookupCache: lru.NewCache[common.Hash, txLookup](txLookupCacheLimit),
miningReceiptsCache: lru.NewCache[common.Hash, []*types.Receipt](receiptsCacheLimit),
miningTxLogsCache: lru.NewCache[common.Hash, []*types.Log](txLogsCacheLimit),
miningStateCache: lru.NewCache[common.Hash, *state.StateDB](miningStateCacheLimit),
futureBlocks: lru.NewCache[common.Hash, *types.Block](maxFutureBlocks),
engine: engine,
vmConfig: vmConfig,
stateRecoveringStatus: false,
}
bc.flushInterval.Store(int64(cacheConfig.TrieTimeLimit))
bc.forker = NewForkChoice(bc, shouldPreserve)
Expand Down Expand Up @@ -2232,6 +2234,16 @@ func (bc *BlockChain) RecoverStateAndSetHead(block *types.Block) (common.Hash, e
// recoverAncestors is only used post-merge.
// We return the hash of the latest block that we could correctly validate.
func (bc *BlockChain) recoverAncestors(block *types.Block) (common.Hash, error) {
if bc.stateRecoveringStatus {
log.Warn("recover is already in progress, skipping", "block", block.Hash())
return common.Hash{}, errors.New("state recover in progress")
}

bc.stateRecoveringStatus = true
defer func() {
bc.stateRecoveringStatus = false
}()

// Gather all the sidechain hashes (full blocks may be memory heavy)
var (
hashes []common.Hash
Expand Down
4 changes: 0 additions & 4 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1847,7 +1847,3 @@ func (d *Downloader) reportSnapSyncProgress(force bool) {
log.Info("Syncing: chain download in progress", "synced", progress, "chain", syncedBytes, "headers", headers, "bodies", bodies, "receipts", receipts, "eta", common.PrettyDuration(eta))
d.syncLogTime = time.Now()
}

func (d *Downloader) GetAllPeers() []*peerConnection {
return d.peers.AllPeers()
}
14 changes: 0 additions & 14 deletions eth/downloader/fetchers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package downloader

import (
"fmt"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -114,16 +113,3 @@ func (d *Downloader) fetchHeadersByNumber(p *peerConnection, number uint64, amou
return *res.Res.(*eth.BlockHeadersRequest), res.Meta.([]common.Hash), nil
}
}

func (d *Downloader) GetHeaderByHashFromPeer(peer *peerConnection, blockHash common.Hash) (*types.Header, error) {
headers, _, err := d.fetchHeadersByHash(peer, blockHash, 1, 0, false)
if err != nil {
return nil, fmt.Errorf("failed to fetch header from peer: %v", err)
}

if len(headers) == 0 {
return nil, fmt.Errorf("no headers returned for hash: %v", blockHash)
}

return headers[0], nil
}
42 changes: 4 additions & 38 deletions miner/fix_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,18 @@ import (

"github.com/ethereum/go-ethereum/beacon/engine"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/log"
)

// StateFixManager manages the fix operation state and notification mechanism.
type StateFixManager struct {
mutex sync.Mutex // Protects access to fix state
isFixInProgress bool // Tracks if a fix operation is in progress
downloader *downloader.Downloader // Used to trigger BeaconSync operations
mutex sync.Mutex // Protects access to fix state
isFixInProgress bool // Tracks if a fix operation is in progress
}

// NewFixManager initializes a FixManager with required dependencies
func NewFixManager(downloader *downloader.Downloader) *StateFixManager {
return &StateFixManager{
downloader: downloader,
}
func NewFixManager() *StateFixManager {
return &StateFixManager{}
}

// StartFix launches a goroutine to manage the fix process and tracks the fix state.
Expand Down Expand Up @@ -68,32 +63,3 @@ func (fm *StateFixManager) RecoverFromLocal(w *worker, blockHash common.Hash) er
log.Info("Recovered states up to block", "latestValid", latestValid)
return nil
}

// RecoverFromPeer attempts to retrieve the block header from peers and triggers BeaconSync if successful.
//
// blockHash: The latest header(unsafe block) hash of the block to recover.
func (fm *StateFixManager) RecoverFromPeer(blockHash common.Hash) error {
peers := fm.downloader.GetAllPeers()
if len(peers) == 0 {
return fmt.Errorf("no peers available")
}

var header *types.Header
var err error
for _, peer := range peers {
header, err = fm.downloader.GetHeaderByHashFromPeer(peer, blockHash)
if err == nil && header != nil {
break
}
log.Warn("Failed to retrieve header from peer", "err", err)
}

if header == nil {
return fmt.Errorf("failed to retrieve header from all valid peers")
}

log.Info("Successfully retrieved header from peer", "blockHash", blockHash)

fm.downloader.BeaconSync(downloader.FullSync, header, nil)
return nil
}
6 changes: 0 additions & 6 deletions miner/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ var DefaultMevConfig = MevConfig{
type Backend interface {
BlockChain() *core.BlockChain
TxPool() *txpool.TxPool
Downloader() *downloader.Downloader
}

type BackendWithHistoricalState interface {
Expand Down Expand Up @@ -308,11 +307,6 @@ func (miner *Miner) BuildPayload(args *BuildPayloadArgs) (*Payload, error) {
return miner.worker.buildPayload(args)
}

// Worker builds the payload according to the provided parameters.
func (miner *Miner) Worker() *worker {
return miner.worker
}

func (miner *Miner) SimulateBundle(bundle *types.Bundle) (*big.Int, error) {

env, err := miner.prepareSimulationEnv()
Expand Down
4 changes: 0 additions & 4 deletions miner/miner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,6 @@ func (m *mockBackend) TxPool() *txpool.TxPool {
return m.txPool
}

func (m *mockBackend) Downloader() *downloader.Downloader {
return nil
}

func (m *mockBackend) StateAtBlock(block *types.Block, reexec uint64, base *state.StateDB, checkLive bool, preferDisk bool) (statedb *state.StateDB, err error) {
return nil, errors.New("not supported")
}
Expand Down
30 changes: 3 additions & 27 deletions miner/payload_building.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,38 +269,16 @@ func (payload *Payload) stopBuilding() {
}

// fix attempts to recover and repair the block and its associated data (such as MPT)
// either from the local blockchain or from peers.
//
// In most cases, the block can be recovered from the local node's data. However,
// there is a corner case where this may not be possible: If the sequencer
// broadcasts a block but the local node crashes before fully writing the block to its local
// storage, the local chain might be lagging behind by one block compared to peers.
// In such cases, we need to recover the missing block data from peers.
//
// The function first tries to recover the block using the local blockchain via the
// fixManager.RecoverFromLocal method. If local recovery fails (e.g., due to the node
// missing the block), it attempts to retrieve the block header from peers and triggers
//
// from the local blockchain
// blockHash: The hash of the latest block that needs to be recovered and fixed.
func (w *worker) fix(blockHash common.Hash) error {
log.Info("Fix operation started")

// Try to recover from local data
err := w.stateFixManager.RecoverFromLocal(w, blockHash)
if err != nil {
// Only proceed to peer recovery if the error is "block not found in local chain"
if strings.Contains(err.Error(), "block not found") {
log.Warn("Local recovery failed, trying to recover from peers", "err", err)

// Try to recover from peers
err = w.stateFixManager.RecoverFromPeer(blockHash)
if err != nil {
return err
}
} else {
log.Error("Failed to recover from local data", "err", err)
return err
}
log.Error("Failed to recover from local data", "err", err)
return err
}

log.Info("Fix operation completed successfully")
Expand Down Expand Up @@ -406,10 +384,8 @@ func (w *worker) buildPayload(args *BuildPayloadArgs) (*Payload, error) {
start := time.Now()
// getSealingBlock is interrupted by shared interrupt
r := w.getSealingBlock(fullParams)

dur := time.Since(start)
// update handles error case

payload.update(r, dur, func() {
w.cacheMiningBlock(r.block, r.env)
})
Expand Down
2 changes: 1 addition & 1 deletion miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus
resubmitIntervalCh: make(chan time.Duration),
resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize),
bundleCache: NewBundleCache(),
stateFixManager: NewFixManager(eth.Downloader()),
stateFixManager: NewFixManager(),
}
// Subscribe for transaction insertion events (whether from network or resurrects)
worker.txsSub = eth.TxPool().SubscribeTransactions(worker.txsCh, true)
Expand Down
9 changes: 4 additions & 5 deletions miner/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"testing"
"time"

"github.com/holiman/uint256"

"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
Expand All @@ -34,11 +36,9 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/params"
"github.com/holiman/uint256"
)

const (
Expand Down Expand Up @@ -146,9 +146,8 @@ func newTestWorkerBackend(t *testing.T, chainConfig *params.ChainConfig, engine
}
}

func (b *testWorkerBackend) BlockChain() *core.BlockChain { return b.chain }
func (b *testWorkerBackend) TxPool() *txpool.TxPool { return b.txPool }
func (b *testWorkerBackend) Downloader() *downloader.Downloader { return nil }
func (b *testWorkerBackend) BlockChain() *core.BlockChain { return b.chain }
func (b *testWorkerBackend) TxPool() *txpool.TxPool { return b.txPool }

func (b *testWorkerBackend) newRandomTx(creation bool) *types.Transaction {
var tx *types.Transaction
Expand Down

0 comments on commit 487b722

Please sign in to comment.