Skip to content

Commit

Permalink
fix: use slotdb pool
Browse files Browse the repository at this point in the history
  • Loading branch information
sunny2022da committed Aug 16, 2024
1 parent 5cde1ce commit 973a89a
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 7 deletions.
1 change: 1 addition & 0 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1952,6 +1952,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
threshold := min(bc.vmConfig.ParallelTxNum/2+2, 4)
if txsCount >= threshold {
bc.UseParallelProcessor()
statedb.CreateParallelDBManager(2 * txsCount)
log.Debug("Enable Parallel Tx execution", "block", block.NumberU64(), "transactions", txsCount, "parallelTxNum", bc.vmConfig.ParallelTxNum)
} else {
bc.UseSerialProcessor()
Expand Down
68 changes: 61 additions & 7 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package state

import (
"container/list"
"errors"
"fmt"
"math/big"
Expand Down Expand Up @@ -272,8 +273,9 @@ type StateDB struct {
AccountDeleted int
StorageDeleted int

isParallel bool
parallel ParallelState // to keep all the parallel execution elements
isParallel bool
parallel ParallelState // to keep all the parallel execution elements
parallelDBManager *ParallelDBManager
// Testing hooks
onCommit func(states *triestate.Set) // Hook invoked when commit is performed
}
Expand Down Expand Up @@ -1373,8 +1375,7 @@ func (s *StateDB) PutSyncPool() {
snapStoragePool.Put(s.snapStorage)
}

// CopyForSlot copy all the basic fields, initialize the memory ones
func (s *StateDB) CopyForSlot() *ParallelStateDB {
func NewEmptySlotDB() *ParallelStateDB {
parallel := ParallelState{
// The stateObjects in Parallel is thread-local.
// The base stateDB's stateObjects is thread-unsafe as it is not guarded by lock.
Expand Down Expand Up @@ -1413,7 +1414,7 @@ func (s *StateDB) CopyForSlot() *ParallelStateDB {
}
state := &ParallelStateDB{
StateDB: StateDB{
db: s.db,
db: nil,
trie: nil, // Parallel StateDB may access the trie, but it takes no effect to the baseDB.
accounts: make(map[common.Hash][]byte),
storages: make(map[common.Hash]map[common.Hash][]byte),
Expand All @@ -1426,15 +1427,23 @@ func (s *StateDB) CopyForSlot() *ParallelStateDB {
refund: 0, // should be 0
logs: logsPool.Get().(map[common.Hash][]*types.Log),
logSize: 0,
preimages: make(map[common.Hash][]byte, len(s.preimages)),
preimages: nil,
journal: journalPool.Get().(*journal),
hasher: crypto.NewKeccakState(),
isParallel: true,
parallel: parallel,
},
}

state.snapDestructs = addressToStructPool.Get().(map[common.Address]struct{})
return state
}

// CopyForSlot copy all the basic fields, initialize the memory ones
func (s *StateDB) CopyForSlot() *ParallelStateDB {
state := s.parallelDBManager.allocate()
state.db = s.db
s.preimages = make(map[common.Hash][]byte, len(s.preimages))

s.snapParallelLock.RLock()
for k, v := range s.snapDestructs {
state.snapDestructs[k] = v
Expand Down Expand Up @@ -2790,3 +2799,48 @@ func (s *StateDB) MergeSlotDB(slotDb *ParallelStateDB, slotReceipt *types.Receip
s.SetTxContext(slotDb.thash, slotDb.txIndex)
return s
}

func (s *StateDB) CreateParallelDBManager(txCount int) {
// if enableDAG, it is high likely no conflict and hence no re-execution
// allocate the txCount of slotDBs to use.
if s.parallelDBManager == nil {
s.parallelDBManager = NewParallelDBManager(txCount, NewEmptySlotDB)
}
}

// ParallelDBManager manages a pool of ParallelDB instances
type ParallelDBManager struct {
pool *list.List
mutex sync.Mutex
newFunc func() *ParallelStateDB // Function to create a new ParallelDB instance
}

// NewParallelDBManager creates a new ParallelDBManager with the specified number of instance
func NewParallelDBManager(initialCount int, newFunc func() *ParallelStateDB) *ParallelDBManager {
manager := &ParallelDBManager{
pool: list.New(),
mutex: sync.Mutex{},
newFunc: newFunc,
}

for i := 0; i < initialCount; i++ {
manager.pool.PushBack(newFunc())
}

return manager
}

// allocate acquires a ParallelStateDB instance from the pool
// if the pool is empty, directly create a new one.
func (m *ParallelDBManager) allocate() *ParallelStateDB {
m.mutex.Lock()
defer m.mutex.Unlock()

if m.pool.Len() == 0 {
return m.newFunc()
}

elem := m.pool.Front()
m.pool.Remove(elem)
return elem.Value.(*ParallelStateDB)
}

0 comments on commit 973a89a

Please sign in to comment.