Skip to content

Commit

Permalink
Merge pull request #203 from welkin22/feature/TxDAG-PEVM-patch
Browse files Browse the repository at this point in the history
pevm: If TxDAG is nil, then use the serial processor to handle it.
  • Loading branch information
welkin22 authored Oct 22, 2024
2 parents 743ca54 + 0fb3f73 commit c5f1c9e
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 21 deletions.
8 changes: 6 additions & 2 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,6 @@ type BlockChain struct {
txDAGWriteCh chan TxDAGOutputItem
txDAGReader *TxDAGFileReader
serialProcessor Processor
parallelProcessor Processor
}

// NewBlockChain returns a fully initialised block chain using information
Expand Down Expand Up @@ -537,6 +536,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis

if bc.vmConfig.EnableParallelExec {
bc.processor = newPEVMProcessor(chainConfig, bc, engine)
bc.serialProcessor = NewStateProcessor(chainConfig, bc, engine)
log.Info("Parallel V2 enabled", "parallelNum", ParallelNum())
} else {
bc.processor = NewStateProcessor(chainConfig, bc, engine)
Expand Down Expand Up @@ -1936,7 +1936,11 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)

// Process block using the parent state as reference point
pstart = time.Now()
receipts, logs, usedGas, err = bc.processor.Process(block, statedb, bc.vmConfig)
if bc.vmConfig.TxDAG == nil && bc.vmConfig.EnableParallelUnorderedMerge {
receipts, logs, usedGas, err = bc.serialProcessor.Process(block, statedb, bc.vmConfig)
} else {
receipts, logs, usedGas, err = bc.processor.Process(block, statedb, bc.vmConfig)
}
if err != nil {
bc.reportBlock(block, receipts, err)
followupInterrupt.Store(true)
Expand Down
12 changes: 4 additions & 8 deletions core/pevm_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (p *PEVMProcessor) executeInSlot(maindb *state.StateDB, txReq *PEVMTxReques
// if it is in Stage 2 it is a likely result, not 100% sure
func (p *PEVMProcessor) toConfirmTxIndexResult(txResult *PEVMTxResult) error {
txReq := txResult.txReq
if !p.unorderedMerge || !txReq.useDAG {
if !p.unorderedMerge {
// If we do not use a DAG, then we need to check for conflicts to ensure correct execution.
// When we perform an unordered merge, we cannot conduct conflict checks
// and can only choose to trust that the DAG is correct and that conflicts do not exist.
Expand Down Expand Up @@ -166,7 +166,6 @@ func (p *PEVMProcessor) confirmTxResult(statedb *state.StateDB, gp *GasPool, res

isByzantium := p.config.IsByzantium(header.Number)
isEIP158 := p.config.IsEIP158(header.Number)
//result.slotDB.FinaliseForParallel(isByzantium || isEIP158, statedb)
if err := result.slotDB.Merge(isByzantium || isEIP158); err != nil {
// something very wrong, should not happen
log.Error("merge slotDB failed", "err", err)
Expand Down Expand Up @@ -285,17 +284,14 @@ func (p *PEVMProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
}(time.Now())
log.Debug("pevm confirm", "txIndex", pr.txReq.txIndex)
return p.confirmTxResult(statedb, gp, pr)
}, p.unorderedMerge && txDAG != nil)
}, p.unorderedMerge)
parallelRunDuration := time.Since(start) - buildLevelsDuration
if err != nil {
tx := allTxs[txIndex]
log.Error("ProcessParallel tx failed", "txIndex", txIndex, "txHash", tx.Hash().Hex(), "err", err)
return nil, nil, 0, fmt.Errorf("could not apply tx %d [%v]: %w", txIndex, tx.Hash().Hex(), err)
}

//fmt.Printf("ProcessParallel tx all done, parallelNum:%d, txNum: %d, conflictNum: %d, executeDuration:%s, confirmDurations:%s, buildLevelsDuration:%s, runDuration:%s\n",
// ParallelNum(), txNum, p.debugConflictRedoNum, time.Duration(executeDurations), time.Duration(confirmDurations), buildLevelsDuration, parallelRunDuration)

// len(commonTxs) could be 0, such as: https://bscscan.com/block/14580486
var redoRate int = 0
if len(p.commonTxs) == 0 {
Expand Down Expand Up @@ -334,8 +330,8 @@ func (p *PEVMProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
var cumulativeGasUsed uint64
for _, receipt := range p.receipts {
// reset the log index
for _, log := range receipt.Logs {
log.Index = uint(lindex)
for _, oneLog := range receipt.Logs {
oneLog.Index = uint(lindex)
lindex++
}
// re-calculate the cumulativeGasUsed
Expand Down
3 changes: 0 additions & 3 deletions core/state/pevm_statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,9 +486,6 @@ func (pst *UncommittedDB) Merge(deleteEmptyObjects bool) error {
// so we don't need to merge anything.
return nil
}
//if err := pst.conflictsToMaindb(); err != nil {
// return err
//}

// 0. set the TxContext
pst.maindb.SetTxContext(pst.txHash, pst.txIndex)
Expand Down
8 changes: 0 additions & 8 deletions core/state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,6 @@ func NewStateProcessor(config *params.ChainConfig, bc *BlockChain, engine consen
}
}

// CreateSerialProcessor create a new StateProcessor
func (bc *BlockChain) CreateSerialProcessor(config *params.ChainConfig, bc2 *BlockChain, engine consensus.Engine) {
if bc.serialProcessor == nil {
bc.serialProcessor = NewStateProcessor(config, bc2, engine)
bc.parallelExecution = false
}
}

// Process processes the state changes according to the Ethereum rules by running
// the transaction messages using the statedb and applying any rewards to both
// the processor (coinbase) and any included uncles.
Expand Down

0 comments on commit c5f1c9e

Please sign in to comment.