From 8bcaeecce06677f51ffb23e7934f22c585631fc7 Mon Sep 17 00:00:00 2001 From: galaio Date: Tue, 27 Aug 2024 10:48:26 +0800 Subject: [PATCH 1/2] pevm: fix some bad check & support to fallback to serial processor; --- core/blockchain.go | 4 ++++ core/parallel_state_processor.go | 14 ++++++++------ 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index ac5c824fdf..31fab2b676 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1982,6 +1982,10 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) // Process block using the parent state as reference point pstart = time.Now() receipts, logs, usedGas, err = bc.processor.Process(block, statedb, bc.vmConfig) + if err == FallbackToSerialProcessorErr { + bc.UseSerialProcessor() + receipts, logs, usedGas, err = bc.processor.Process(block, statedb, bc.vmConfig) + } if err != nil { bc.reportBlock(block, receipts, err) followupInterrupt.Store(true) diff --git a/core/parallel_state_processor.go b/core/parallel_state_processor.go index 8bedca8b6a..62c00b5aa8 100644 --- a/core/parallel_state_processor.go +++ b/core/parallel_state_processor.go @@ -26,6 +26,10 @@ const ( stage2AheadNum = 3 // enter ConfirmStage2 in advance to avoid waiting for Fat Tx ) +var ( + FallbackToSerialProcessorErr = errors.New("fallback to serial processor") +) + type ParallelStateProcessor struct { StateProcessor parallelNum int // leave a CPU to dispatcher @@ -571,11 +575,6 @@ func (p *ParallelStateProcessor) runQuickMergeSlotLoop(slotIndex int, slotType i if txReq.txIndex <= int(p.mergedTxIndex.Load()) { continue } - - if txReq.txIndex != next { - log.Warn("query next txReq wrong", "slot", slotIndex, "next", next, "actual", txReq.txIndex) - break - } if !atomic.CompareAndSwapInt32(&txReq.runnable, 1, 0) { continue } @@ -782,7 +781,6 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat if p.bc.txDAGReader != nil { // load cache txDAG from file first txDAG = p.bc.txDAGReader.TxDAG(block.NumberU64()) - } else { // load TxDAG from block txDAG, err = types.GetTxDAG(block) @@ -797,6 +795,10 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat } } + if txDAG != nil && txDAG.Type() == types.EmptyTxDAGType { + return nil, nil, 0, FallbackToSerialProcessorErr + } + txNum := len(allTxs) latestExcludedTx := -1 // Iterate over and process the individual transactions From 5f3e1facc1d73079dd9840dfee65c3f51c64c252 Mon Sep 17 00:00:00 2001 From: galaio Date: Tue, 27 Aug 2024 12:23:56 +0800 Subject: [PATCH 2/2] pevm: fix some bad check & support to fallback to serial processor; --- core/blockchain.go | 42 +++++++++++++++++++++++++++++--- core/parallel_state_processor.go | 27 +------------------- core/vm/interpreter.go | 2 ++ 3 files changed, 41 insertions(+), 30 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 31fab2b676..98ae94d906 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1948,15 +1948,16 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) activeState = statedb if bc.vmConfig.EnableParallelExec { + bc.parseTxDAG(block) txsCount := block.Transactions().Len() threshold := min(bc.vmConfig.ParallelTxNum/2+2, 4) - if txsCount >= threshold { + if txsCount < threshold || bc.isEmptyTxDAG() { + bc.UseSerialProcessor() + log.Debug("Disable Parallel Tx execution", "block", block.NumberU64(), "transactions", txsCount, "parallelTxNum", bc.vmConfig.ParallelTxNum) + } else { bc.UseParallelProcessor() statedb.CreateParallelDBManager(2 * txsCount) log.Debug("Enable Parallel Tx execution", "block", block.NumberU64(), "transactions", txsCount, "parallelTxNum", bc.vmConfig.ParallelTxNum) - } else { - bc.UseSerialProcessor() - log.Debug("Disable Parallel Tx execution", "block", block.NumberU64(), "transactions", txsCount, "parallelTxNum", bc.vmConfig.ParallelTxNum) } } // If we have a followup block, run that against the current state to pre-cache @@ -2136,6 +2137,39 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) return it.index, err } +func (bc *BlockChain) parseTxDAG(block *types.Block) { + if !bc.enableTxDAG { + return + } + var ( + txDAG types.TxDAG + err error + ) + if bc.txDAGReader != nil { + // load cache txDAG from file first + txDAG = bc.txDAGReader.TxDAG(block.NumberU64()) + } else { + // load TxDAG from block + txDAG, err = types.GetTxDAG(block) + if err != nil { + log.Warn("pevm decode txdag failed", "block", block.NumberU64(), "err", err) + } + } + if err := types.ValidateTxDAG(txDAG, len(block.Transactions())); err != nil { + log.Warn("pevm cannot apply wrong txdag", + "block", block.NumberU64(), "txs", len(block.Transactions()), "err", err) + txDAG = nil + } + bc.vmConfig.TxDAG = txDAG +} + +func (bc *BlockChain) isEmptyTxDAG() bool { + if bc.vmConfig.TxDAG != nil && bc.vmConfig.TxDAG.Type() == types.EmptyTxDAGType { + return true + } + return false +} + // insertSideChain is called when an import batch hits upon a pruned ancestor // error, which happens when a sidechain with a sufficiently old fork-block is // found. diff --git a/core/parallel_state_processor.go b/core/parallel_state_processor.go index 62c00b5aa8..c73948d32c 100644 --- a/core/parallel_state_processor.go +++ b/core/parallel_state_processor.go @@ -772,32 +772,7 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat ProcessBeaconBlockRoot(*beaconRoot, vmenv, statedb) } statedb.MarkFullProcessed() - - var ( - txDAG types.TxDAG - ) - if p.bc.enableTxDAG { - var err error - if p.bc.txDAGReader != nil { - // load cache txDAG from file first - txDAG = p.bc.txDAGReader.TxDAG(block.NumberU64()) - } else { - // load TxDAG from block - txDAG, err = types.GetTxDAG(block) - if err != nil { - log.Debug("pevm decode txdag failed", "block", block.NumberU64(), "err", err) - } - } - if err := types.ValidateTxDAG(txDAG, len(block.Transactions())); err != nil { - log.Warn("pevm cannot apply wrong txdag", - "block", block.NumberU64(), "txs", len(block.Transactions()), "err", err) - txDAG = nil - } - } - - if txDAG != nil && txDAG.Type() == types.EmptyTxDAGType { - return nil, nil, 0, FallbackToSerialProcessorErr - } + txDAG := cfg.TxDAG txNum := len(allTxs) latestExcludedTx := -1 diff --git a/core/vm/interpreter.go b/core/vm/interpreter.go index b18fef42da..40267fca16 100644 --- a/core/vm/interpreter.go +++ b/core/vm/interpreter.go @@ -19,6 +19,7 @@ package vm import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/math" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" @@ -37,6 +38,7 @@ type Config struct { ParallelTxNum int // Number of slot for transaction execution OptimismPrecompileOverrides PrecompileOverrides // Precompile overrides for Optimism EnableOpcodeOptimizations bool // Enable opcode optimization + TxDAG types.TxDAG } // ScopeContext contains the things that are per-call, such as stack and memory,