Skip to content

Commit

Permalink
remove uncessary memory overhead and reuse SyncPool
Browse files Browse the repository at this point in the history
  • Loading branch information
sunny2022da committed Sep 3, 2024
1 parent 4687c11 commit 6a600fe
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 44 deletions.
9 changes: 6 additions & 3 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1956,8 +1956,10 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
log.Debug("Disable Parallel Tx execution", "block", block.NumberU64(), "transactions", txsCount, "parallelTxNum", bc.vmConfig.ParallelTxNum)
} else {
bc.UseParallelProcessor()
statedb.CreateParallelDBManager(2 * txsCount)
log.Debug("Enable Parallel Tx execution", "block", block.NumberU64(), "transactions", txsCount, "parallelTxNum", bc.vmConfig.ParallelTxNum)
if bc.processor == bc.parallelProcessor {
statedb.CreateParallelDBManager(2 * txsCount)
log.Debug("Enable Parallel Tx execution", "block", block.NumberU64(), "transactions", txsCount, "parallelTxNum", bc.vmConfig.ParallelTxNum)
}
}
}
// If we have a followup block, run that against the current state to pre-cache
Expand Down Expand Up @@ -2924,7 +2926,8 @@ func (bc *BlockChain) UseParallelProcessor() {
bc.parallelExecution = true
bc.processor = bc.parallelProcessor
} else {
bc.CreateParallelProcessor(bc.vmConfig.ParallelTxNum)
log.Error("bc.ParallelProcessor is nil! fallback to serial processor!")
bc.UseSerialProcessor()
}
}

Expand Down
13 changes: 5 additions & 8 deletions core/parallel_state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,14 +197,7 @@ func (p *ParallelStateProcessor) resetState(txNum int, statedb *state.StateDB) {

statedb.PrepareForParallel()
p.allTxReqs = make([]*ParallelTxRequest, 0, txNum)
p.slotDBsToRelease = make([]*state.ParallelStateDB, 0, txNum)

stateDBsToRelease := p.slotDBsToRelease
go func() {
for _, slotDB := range stateDBsToRelease {
slotDB.PutSyncPool()
}
}()
for _, slot := range p.slotState {
slot.pendingTxReqList = make([]*ParallelTxRequest, 0)
slot.activatedType = parallelPrimarySlot
Expand Down Expand Up @@ -335,7 +328,6 @@ func (p *ParallelStateProcessor) executeInSlot(slotIndex int, txReq *ParallelTxR
}

slotDB.SetTxContext(txReq.tx.Hash(), txReq.txIndex)

evm, result, err := applyTransactionStageExecution(txReq.msg, gpSlot, slotDB, vmenv, p.delayGasFee)
txResult := ParallelTxResult{
executedIndex: execNum,
Expand Down Expand Up @@ -460,6 +452,8 @@ func (p *ParallelStateProcessor) toConfirmTxIndex(targetTxIndex int, isStage2 bo
p.debugConflictRedoNum++
// interrupt its current routine, and switch to the other routine
p.switchSlot(staticSlotIndex)
// reclaim the result.
targetResult.slotDB.PutSyncPool()
return nil
}
continue
Expand Down Expand Up @@ -794,6 +788,8 @@ func (p *ParallelStateProcessor) confirmTxResults(statedb *state.StateDB, gp *Ga
}
p.txReqExecuteRecord[resultTxIndex]++
}
// after merge, the slotDB will not accessible, reclaim the resource
result.slotDB.PutSyncPool()
return result
}

Expand Down Expand Up @@ -949,6 +945,7 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat

// clean up when the block is processed
p.doCleanUp()

if p.error != nil {
return nil, nil, 0, p.error
}
Expand Down
1 change: 1 addition & 0 deletions core/state/parallel_statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ func (s *ParallelStateDB) GetCode(addr common.Address) []byte {
code = object.Code()
}
}

if _, ok := s.parallel.codeReadsInSlot[addr]; !ok {
s.parallel.codeReadsInSlot[addr] = code
}
Expand Down
35 changes: 2 additions & 33 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,6 @@ type StateDB struct {
trieParallelLock sync.Mutex // for parallel mode of trie, mostly for get states/objects from trie, lock required to handle trie tracer.
stateObjectDestructLock sync.RWMutex // for parallel mode, used in mainDB for mergeSlot and conflict check.
snapDestructs map[common.Address]struct{}
snapAccounts map[common.Address][]byte
snapStorage map[common.Address]map[string][]byte

// originalRoot is the pre-state root, before any changes were made.
// It will be updated when the Commit is called.
Expand Down Expand Up @@ -1253,14 +1251,6 @@ var addressToUintPool = sync.Pool{
New: func() interface{} { return make(map[common.Address]uint64, defaultNumOfSlots) },
}

var snapStoragePool = sync.Pool{
New: func() interface{} { return make(map[common.Address]map[string][]byte, defaultNumOfSlots) },
}

var snapStorageValuePool = sync.Pool{
New: func() interface{} { return make(map[string][]byte, defaultNumOfSlots) },
}

var logsPool = sync.Pool{
New: func() interface{} { return make(map[common.Hash][]*types.Log, defaultNumOfSlots) },
}
Expand Down Expand Up @@ -1361,20 +1351,6 @@ func (s *StateDB) PutSyncPool() {
delete(s.parallel.createdObjectRecord, key)
}
addressToStructPool.Put(s.parallel.createdObjectRecord)

for key := range s.snapAccounts {
delete(s.snapAccounts, key)
}
addressToBytesPool.Put(s.snapAccounts)

for key, storage := range s.snapStorage {
for key := range storage {
delete(storage, key)
}
snapStorageValuePool.Put(storage)
delete(s.snapStorage, key)
}
snapStoragePool.Put(s.snapStorage)
}

func NewEmptySlotDB() *ParallelStateDB {
Expand Down Expand Up @@ -2654,10 +2630,6 @@ func (s *StateDB) MergeSlotDB(slotDb *ParallelStateDB, slotReceipt *types.Receip
// remove the addr from snapAccounts&snapStorage only when object is deleted.
// "deleted" is not equal to "snapDestructs", since createObject() will add an addr for
// snapDestructs to destroy previous object, while it will keep the addr in snapAccounts & snapAccounts
s.snapParallelLock.Lock()
delete(s.snapAccounts, addr)
delete(s.snapStorage, addr)
s.snapParallelLock.Unlock()
s.AccountMux.Lock()
delete(s.accounts, dirtyObj.addrHash) // Clear out any previously updated account data (may be recreated via a resurrect)
delete(s.accountsOrigin, dirtyObj.address) // Clear out any previously updated account data (may be recreated via a resurrect)
Expand Down Expand Up @@ -2721,10 +2693,6 @@ func (s *StateDB) MergeSlotDB(slotDb *ParallelStateDB, slotReceipt *types.Receip
// remove the addr from snapAccounts&snapStorage only when object is deleted.
// "deleted" is not equal to "snapDestructs", since createObject() will add an addr for
// snapDestructs to destroy previous object, while it will keep the addr in snapAccounts & snapAccounts
s.snapParallelLock.Lock()
delete(s.snapAccounts, addr)
delete(s.snapStorage, addr)
s.snapParallelLock.Unlock()
s.AccountMux.Lock()
delete(s.accounts, dirtyObj.addrHash) // Clear out any previously updated account data (may be recreated via a resurrect)
delete(s.accountsOrigin, dirtyObj.address) // Clear out any previously updated account data (may be recreated via a resurrect)
Expand Down Expand Up @@ -2847,5 +2815,6 @@ func (m *ParallelDBManager) allocate() *ParallelStateDB {

elem := m.pool.Front()
m.pool.Remove(elem)
return elem.Value.(*ParallelStateDB)
ret := elem.Value.(*ParallelStateDB)
return ret
}

0 comments on commit 6a600fe

Please sign in to comment.