From 0fb3f7384649240e9f6009202b70817e9299ddf8 Mon Sep 17 00:00:00 2001 From: welkin22 Date: Tue, 22 Oct 2024 16:17:04 +0800 Subject: [PATCH] If TxDAG is nil, then use the serial processor to handle it. --- core/blockchain.go | 8 ++++++-- core/pevm_processor.go | 12 ++++-------- core/state/pevm_statedb.go | 3 --- core/state_processor.go | 8 -------- 4 files changed, 10 insertions(+), 21 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index f1fbe0dac2..fb7df3be5c 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -317,7 +317,6 @@ type BlockChain struct { txDAGWriteCh chan TxDAGOutputItem txDAGReader *TxDAGFileReader serialProcessor Processor - parallelProcessor Processor } // NewBlockChain returns a fully initialised block chain using information @@ -537,6 +536,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis if bc.vmConfig.EnableParallelExec { bc.processor = newPEVMProcessor(chainConfig, bc, engine) + bc.serialProcessor = NewStateProcessor(chainConfig, bc, engine) log.Info("Parallel V2 enabled", "parallelNum", ParallelNum()) } else { bc.processor = NewStateProcessor(chainConfig, bc, engine) @@ -1936,7 +1936,11 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) // Process block using the parent state as reference point pstart = time.Now() - receipts, logs, usedGas, err = bc.processor.Process(block, statedb, bc.vmConfig) + if bc.vmConfig.TxDAG == nil && bc.vmConfig.EnableParallelUnorderedMerge { + receipts, logs, usedGas, err = bc.serialProcessor.Process(block, statedb, bc.vmConfig) + } else { + receipts, logs, usedGas, err = bc.processor.Process(block, statedb, bc.vmConfig) + } if err != nil { bc.reportBlock(block, receipts, err) followupInterrupt.Store(true) diff --git a/core/pevm_processor.go b/core/pevm_processor.go index 339e4d7179..3acfbcae86 100644 --- a/core/pevm_processor.go +++ b/core/pevm_processor.go @@ -122,7 +122,7 @@ func (p *PEVMProcessor) executeInSlot(maindb *state.StateDB, txReq *PEVMTxReques // if it is in Stage 2 it is a likely result, not 100% sure func (p *PEVMProcessor) toConfirmTxIndexResult(txResult *PEVMTxResult) error { txReq := txResult.txReq - if !p.unorderedMerge || !txReq.useDAG { + if !p.unorderedMerge { // If we do not use a DAG, then we need to check for conflicts to ensure correct execution. // When we perform an unordered merge, we cannot conduct conflict checks // and can only choose to trust that the DAG is correct and that conflicts do not exist. @@ -166,7 +166,6 @@ func (p *PEVMProcessor) confirmTxResult(statedb *state.StateDB, gp *GasPool, res isByzantium := p.config.IsByzantium(header.Number) isEIP158 := p.config.IsEIP158(header.Number) - //result.slotDB.FinaliseForParallel(isByzantium || isEIP158, statedb) if err := result.slotDB.Merge(isByzantium || isEIP158); err != nil { // something very wrong, should not happen log.Error("merge slotDB failed", "err", err) @@ -285,7 +284,7 @@ func (p *PEVMProcessor) Process(block *types.Block, statedb *state.StateDB, cfg }(time.Now()) log.Debug("pevm confirm", "txIndex", pr.txReq.txIndex) return p.confirmTxResult(statedb, gp, pr) - }, p.unorderedMerge && txDAG != nil) + }, p.unorderedMerge) parallelRunDuration := time.Since(start) - buildLevelsDuration if err != nil { tx := allTxs[txIndex] @@ -293,9 +292,6 @@ func (p *PEVMProcessor) Process(block *types.Block, statedb *state.StateDB, cfg return nil, nil, 0, fmt.Errorf("could not apply tx %d [%v]: %w", txIndex, tx.Hash().Hex(), err) } - //fmt.Printf("ProcessParallel tx all done, parallelNum:%d, txNum: %d, conflictNum: %d, executeDuration:%s, confirmDurations:%s, buildLevelsDuration:%s, runDuration:%s\n", - // ParallelNum(), txNum, p.debugConflictRedoNum, time.Duration(executeDurations), time.Duration(confirmDurations), buildLevelsDuration, parallelRunDuration) - // len(commonTxs) could be 0, such as: https://bscscan.com/block/14580486 var redoRate int = 0 if len(p.commonTxs) == 0 { @@ -334,8 +330,8 @@ func (p *PEVMProcessor) Process(block *types.Block, statedb *state.StateDB, cfg var cumulativeGasUsed uint64 for _, receipt := range p.receipts { // reset the log index - for _, log := range receipt.Logs { - log.Index = uint(lindex) + for _, oneLog := range receipt.Logs { + oneLog.Index = uint(lindex) lindex++ } // re-calculate the cumulativeGasUsed diff --git a/core/state/pevm_statedb.go b/core/state/pevm_statedb.go index 38cde4b53f..80bdb712dd 100644 --- a/core/state/pevm_statedb.go +++ b/core/state/pevm_statedb.go @@ -486,9 +486,6 @@ func (pst *UncommittedDB) Merge(deleteEmptyObjects bool) error { // so we don't need to merge anything. return nil } - //if err := pst.conflictsToMaindb(); err != nil { - // return err - //} // 0. set the TxContext pst.maindb.SetTxContext(pst.txHash, pst.txIndex) diff --git a/core/state_processor.go b/core/state_processor.go index df9d788707..d5308af37e 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -54,14 +54,6 @@ func NewStateProcessor(config *params.ChainConfig, bc *BlockChain, engine consen } } -// CreateSerialProcessor create a new StateProcessor -func (bc *BlockChain) CreateSerialProcessor(config *params.ChainConfig, bc2 *BlockChain, engine consensus.Engine) { - if bc.serialProcessor == nil { - bc.serialProcessor = NewStateProcessor(config, bc2, engine) - bc.parallelExecution = false - } -} - // Process processes the state changes according to the Ethereum rules by running // the transaction messages using the statedb and applying any rewards to both // the processor (coinbase) and any included uncles.