Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: sequencer auto recover when meet an unexpected shutdown #166

Merged
merged 14 commits into from
Nov 13, 2024
Merged
84 changes: 74 additions & 10 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,16 +242,17 @@ type BlockChain struct {
chainConfig *params.ChainConfig // Chain & network configuration
cacheConfig *CacheConfig // Cache configuration for pruning

db ethdb.Database // Low level persistent database to store final content in
snaps *snapshot.Tree // Snapshot tree for fast trie leaf access
triegc *prque.Prque[int64, common.Hash] // Priority queue mapping block numbers to tries to gc
gcproc time.Duration // Accumulates canonical block processing for trie dumping
lastWrite uint64 // Last block when the state was flushed
flushInterval atomic.Int64 // Time interval (processing time) after which to flush a state
triedb *triedb.Database // The database handler for maintaining trie nodes.
stateCache state.Database // State database to reuse between imports (contains state cache)
proofKeeper *ProofKeeper // Store/Query op-proposal proof to ensure consistent.
txIndexer *txIndexer // Transaction indexer, might be nil if not enabled
db ethdb.Database // Low level persistent database to store final content in
snaps *snapshot.Tree // Snapshot tree for fast trie leaf access
triegc *prque.Prque[int64, common.Hash] // Priority queue mapping block numbers to tries to gc
gcproc time.Duration // Accumulates canonical block processing for trie dumping
lastWrite uint64 // Last block when the state was flushed
flushInterval atomic.Int64 // Time interval (processing time) after which to flush a state
triedb *triedb.Database // The database handler for maintaining trie nodes.
stateCache state.Database // State database to reuse between imports (contains state cache)
proofKeeper *ProofKeeper // Store/Query op-proposal proof to ensure consistent.
txIndexer *txIndexer // Transaction indexer, might be nil if not enabled
stateRecoveringStatus atomic.Bool

hc *HeaderChain
rmLogsFeed event.Feed
Expand Down Expand Up @@ -2223,11 +2224,25 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
return 0, nil
}

func (bc *BlockChain) RecoverStateAndSetHead(block *types.Block) (common.Hash, error) {
return bc.recoverStateAndSetHead(block)
}

// recoverAncestors finds the closest ancestor with available state and re-execute
// all the ancestor blocks since that.
// recoverAncestors is only used post-merge.
// We return the hash of the latest block that we could correctly validate.
func (bc *BlockChain) recoverAncestors(block *types.Block) (common.Hash, error) {
if bc.stateRecoveringStatus.Load() {
log.Warn("recover is already in progress, skipping", "block", block.Hash())
return common.Hash{}, errors.New("state recover in progress")
}

bc.stateRecoveringStatus.Store(true)
defer func() {
bc.stateRecoveringStatus.Store(false)
}()

// Gather all the sidechain hashes (full blocks may be memory heavy)
var (
hashes []common.Hash
Expand Down Expand Up @@ -2644,6 +2659,55 @@ func (bc *BlockChain) InsertHeaderChain(chain []*types.Header) (int, error) {
return 0, err
}

// recoverStateAndSetHead attempts to recover the state of the blockchain by re-importing
// missing blocks and advancing the chain head. It ensures the state is available
// for the given block and its ancestors before updating the head.
func (bc *BlockChain) recoverStateAndSetHead(block *types.Block) (common.Hash, error) {
var (
hashes []common.Hash
numbers []uint64
parent = block
)
for parent != nil && !bc.HasState(parent.Root()) {
if bc.stateRecoverable(parent.Root()) {
if err := bc.triedb.Recover(parent.Root()); err != nil {
return common.Hash{}, err
}
break
}
hashes = append(hashes, parent.Hash())
numbers = append(numbers, parent.NumberU64())
parent = bc.GetBlock(parent.ParentHash(), parent.NumberU64()-1)

// If the chain is terminating, stop iteration
if bc.insertStopped() {
log.Debug("Abort during blocks iteration")
return common.Hash{}, errInsertionInterrupted
}
}
if parent == nil {
return common.Hash{}, errors.New("missing parent")
}
// Import all the pruned blocks to make the state available
for i := len(hashes) - 1; i >= 0; i-- {
// If the chain is terminating, stop processing blocks
if bc.insertStopped() {
log.Debug("Abort during blocks processing")
return common.Hash{}, errInsertionInterrupted
}
var b *types.Block
if i == 0 {
b = block
} else {
b = bc.GetBlock(hashes[i], numbers[i])
}
if _, err := bc.insertChain(types.Blocks{b}, true); err != nil {
return b.ParentHash(), err
}
}
return block.Hash(), nil
}

// SetBlockValidatorAndProcessorForTesting sets the current validator and processor.
// This method can be used to force an invalid blockchain to be verified for tests.
// This method is unsafe and should only be used before block import starts.
Expand Down
2 changes: 1 addition & 1 deletion eth/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (cs *chainSyncer) modeAndLocalHead() (downloader.SyncMode, *big.Int) {
if !cs.handler.chain.NoTries() && !cs.handler.chain.HasState(head.Root) {
block := cs.handler.chain.CurrentSnapBlock()
td := cs.handler.chain.GetTd(block.Hash(), block.Number.Uint64())
log.Info("Reenabled snap sync as chain is stateless")
log.Info("Reenabled snap sync as chain is stateless", "lost block", block.Number.Uint64())
return downloader.SnapSync, td
}
// Nope, we're really full syncing
Expand Down
56 changes: 56 additions & 0 deletions miner/fix_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package miner

import (
"fmt"
"sync"

"github.com/ethereum/go-ethereum/beacon/engine"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
)

// StateFixManager manages the fix operation state and notification mechanism.
type StateFixManager struct {
mutex sync.Mutex // Protects access to fix state
}

// NewFixManager initializes a FixManager with required dependencies
func NewFixManager() *StateFixManager {
return &StateFixManager{}
}

// StartFix launches a goroutine to manage the fix process and tracks the fix state.
func (fm *StateFixManager) StartFix(worker *worker, id engine.PayloadID, parentHash common.Hash) error {
fm.mutex.Lock()
defer fm.mutex.Unlock()

log.Info("Fix is in progress for the block", "id", id)

err := worker.fix(parentHash)
if err != nil {
log.Error("Fix process failed", "error", err)
return err
}

log.Info("Fix process completed successfully", "id", id)
return nil
}

// RecoverFromLocal attempts to recover the block and MPT data from the local chain.
//
// blockHash: The latest header(unsafe block) hash of the block to recover.
func (fm *StateFixManager) RecoverFromLocal(w *worker, blockHash common.Hash) error {
block := w.chain.GetBlockByHash(blockHash)
if block == nil {
return fmt.Errorf("block not found in local chain")
}

log.Info("Fixing data for block", "block number", block.NumberU64())
latestValid, err := w.chain.RecoverStateAndSetHead(block)
if err != nil {
return fmt.Errorf("failed to recover state: %v", err)
}

log.Info("Recovered states up to block", "latestValid", latestValid)
return nil
}
5 changes: 3 additions & 2 deletions miner/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ import (
"context"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/consensus/misc/eip1559"
"github.com/ethereum/go-ethereum/consensus/misc/eip4844"
"math/big"
"sync"
"time"

"github.com/ethereum/go-ethereum/consensus/misc/eip1559"
"github.com/ethereum/go-ethereum/consensus/misc/eip4844"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/consensus"
Expand Down
74 changes: 74 additions & 0 deletions miner/payload_building.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,22 @@
package miner

import (
"context"
"crypto/sha256"
"encoding/binary"
"errors"
"fmt"
"math/big"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/beacon/engine"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/tracers"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
Expand Down Expand Up @@ -263,6 +268,23 @@ func (payload *Payload) stopBuilding() {
})
}

// fix attempts to recover and repair the block and its associated data (such as MPT)
// from the local blockchain
// blockHash: The hash of the latest block that needs to be recovered and fixed.
func (w *worker) fix(blockHash common.Hash) error {
log.Info("Fix operation started")

// Try to recover from local data
err := w.stateFixManager.RecoverFromLocal(w, blockHash)
if err != nil {
log.Error("Failed to recover from local data", "err", err)
return err
}

log.Info("Fix operation completed successfully")
return nil
}

// buildPayload builds the payload according to the provided parameters.
func (w *worker) buildPayload(args *BuildPayloadArgs) (*Payload, error) {
if args.NoTxPool { // don't start the background payload updating job if there is no tx pool to pull from
Expand Down Expand Up @@ -318,6 +340,18 @@ func (w *worker) buildPayload(args *BuildPayloadArgs) (*Payload, error) {
return nil, err
}

//check state of parent block
_, err = w.retrieveParentState(fullParams)
if err != nil && strings.Contains(err.Error(), "missing trie node") {
log.Error("missing parent state when building block, try to fix...")
// fix state data
fixErr := w.StartStateFix(args.Id(), fullParams.parentHash)
if fixErr != nil {
log.Error("fix failed", "err", fixErr)
}
return nil, err
}

payload := newPayload(nil, args.Id())
// set shared interrupt
fullParams.interrupt = payload.interrupt
Expand Down Expand Up @@ -430,3 +464,43 @@ func (w *worker) cacheMiningBlock(block *types.Block, env *environment) {
log.Info("Successfully cached sealed new block", "number", block.Number(), "root", block.Root(), "hash", hash,
"elapsed", common.PrettyDuration(time.Since(start)))
}

func (w *worker) retrieveParentState(genParams *generateParams) (state *state.StateDB, err error) {
w.mu.RLock()
defer w.mu.RUnlock()

log.Info("retrieveParentState validate")
// Find the parent block for sealing task
parent := w.chain.CurrentBlock()
if genParams.parentHash != (common.Hash{}) {
block := w.chain.GetBlockByHash(genParams.parentHash)
if block == nil {
return nil, fmt.Errorf("missing parent")
}
parent = block.Header()
}

state, err = w.chain.StateAt(parent.Root)

// If there is an error and Optimism is enabled in the chainConfig, allow reorg
if err != nil && w.chainConfig.Optimism != nil {
if historicalBackend, ok := w.eth.(BackendWithHistoricalState); ok {
// Attempt to retrieve the historical state
var release tracers.StateReleaseFunc
parentBlock := w.eth.BlockChain().GetBlockByHash(parent.Hash())
state, release, err = historicalBackend.StateAtBlock(
context.Background(), parentBlock, ^uint64(0), nil, false, false,
)

// Copy the state and release the resources
state = state.Copy()
release()
}
}

// Return the state and any error encountered
if err != nil {
return nil, err
}
return state, nil
}
12 changes: 11 additions & 1 deletion miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import (

mapset "github.com/deckarep/golang-set/v2"

"github.com/holiman/uint256"

"github.com/ethereum/go-ethereum/beacon/engine"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/consensus/misc"
Expand All @@ -43,7 +46,6 @@ import (
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/trie"
"github.com/holiman/uint256"
)

const (
Expand Down Expand Up @@ -268,6 +270,13 @@ type worker struct {

// MEV
bundleCache *BundleCache

// FixManager
stateFixManager *StateFixManager
}

func (w *worker) StartStateFix(id engine.PayloadID, parentHash common.Hash) error {
return w.stateFixManager.StartFix(w, id, parentHash)
}

func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(header *types.Header) bool, init bool) *worker {
Expand All @@ -294,6 +303,7 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus
resubmitIntervalCh: make(chan time.Duration),
resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize),
bundleCache: NewBundleCache(),
stateFixManager: NewFixManager(),
}
// Subscribe for transaction insertion events (whether from network or resurrects)
worker.txsSub = eth.TxPool().SubscribeTransactions(worker.txsCh, true)
Expand Down
3 changes: 2 additions & 1 deletion miner/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"testing"
"time"

"github.com/holiman/uint256"

"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
Expand All @@ -37,7 +39,6 @@ import (
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/params"
"github.com/holiman/uint256"
)

const (
Expand Down
Loading