Skip to content

Commit

Permalink
make use of stateDBsToRelease
Browse files Browse the repository at this point in the history
  • Loading branch information
sunny2022da committed Sep 2, 2024
1 parent 4687c11 commit a41cfbe
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 10 deletions.
9 changes: 6 additions & 3 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1956,8 +1956,10 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
log.Debug("Disable Parallel Tx execution", "block", block.NumberU64(), "transactions", txsCount, "parallelTxNum", bc.vmConfig.ParallelTxNum)
} else {
bc.UseParallelProcessor()
statedb.CreateParallelDBManager(2 * txsCount)
log.Debug("Enable Parallel Tx execution", "block", block.NumberU64(), "transactions", txsCount, "parallelTxNum", bc.vmConfig.ParallelTxNum)
if bc.processor == bc.parallelProcessor {
statedb.CreateParallelDBManager(2 * txsCount)
log.Debug("Enable Parallel Tx execution", "block", block.NumberU64(), "transactions", txsCount, "parallelTxNum", bc.vmConfig.ParallelTxNum)
}
}
}
// If we have a followup block, run that against the current state to pre-cache
Expand Down Expand Up @@ -2924,7 +2926,8 @@ func (bc *BlockChain) UseParallelProcessor() {
bc.parallelExecution = true
bc.processor = bc.parallelProcessor
} else {
bc.CreateParallelProcessor(bc.vmConfig.ParallelTxNum)
log.Error("bc.ParallelProcessor is nil! fallback to serial processor!")
bc.UseSerialProcessor()
}
}

Expand Down
20 changes: 13 additions & 7 deletions core/parallel_state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ func (p *ParallelStateProcessor) init() {
log.Info("Parallel execution mode is enabled", "Parallel Num", p.parallelNum,
"CPUNum", runtime.NumCPU())
p.txResultChan = make(chan *ParallelTxResult, 20000)
p.slotDBsToRelease = make([]*state.ParallelStateDB, 200, 20000)
p.stopSlotChan = make(chan struct{}, 1)
p.stopConfirmChan = make(chan struct{}, 1)
p.stopConfirmStage2Chan = make(chan struct{}, 1)
Expand Down Expand Up @@ -197,14 +198,7 @@ func (p *ParallelStateProcessor) resetState(txNum int, statedb *state.StateDB) {

statedb.PrepareForParallel()
p.allTxReqs = make([]*ParallelTxRequest, 0, txNum)
p.slotDBsToRelease = make([]*state.ParallelStateDB, 0, txNum)

stateDBsToRelease := p.slotDBsToRelease
go func() {
for _, slotDB := range stateDBsToRelease {
slotDB.PutSyncPool()
}
}()
for _, slot := range p.slotState {
slot.pendingTxReqList = make([]*ParallelTxRequest, 0)
slot.activatedType = parallelPrimarySlot
Expand Down Expand Up @@ -420,6 +414,8 @@ func (p *ParallelStateProcessor) toConfirmTxIndex(targetTxIndex int, isStage2 bo
}

valid := p.toConfirmTxIndexResult(targetResult, isStage2)
// append result as it is going to be processed or merged.
p.slotDBsToRelease = append(p.slotDBsToRelease, targetResult.slotDB)
if !valid {
staticSlotIndex := targetResult.txReq.staticSlotIndex
conflictBase := targetResult.slotDB.BaseTxIndex()
Expand Down Expand Up @@ -949,6 +945,16 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat

// clean up when the block is processed
p.doCleanUp()

stateDBsToRelease := p.slotDBsToRelease
go func() {
for _, slotDB := range stateDBsToRelease {
if slotDB != nil {
slotDB.PutSyncPool()
}
}
}()

if p.error != nil {
return nil, nil, 0, p.error
}
Expand Down

0 comments on commit a41cfbe

Please sign in to comment.