Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor trie_prefetcher to be similar to upstream structurally #1395

Merged
merged 24 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions RELEASES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@

## Pending Release

## Updates
* Refactored trie_prefetcher.go to be structurally similar to upstream.

## [v0.7.0](https://github.com/ava-labs/subnet-evm/releases/tag/v0.7.0)

### Updates

- Changed default write option from `Sync` to `NoSync` in PebbleDB

## Fixes
### Fixes

- Fixed database close on shutdown

Expand Down
20 changes: 4 additions & 16 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1349,16 +1349,6 @@ func (bc *BlockChain) insertBlock(block *types.Block, writes bool) error {
blockContentValidationTimer.Inc(time.Since(substart).Milliseconds())

// No validation errors for the block
var activeState *state.StateDB
defer func() {
// The chain importer is starting and stopping trie prefetchers. If a bad
// block or other error is hit however, an early return may not properly
// terminate the background threads. This defer ensures that we clean up
// and dangling prefetcher, without deferring each and holding on live refs.
if activeState != nil {
activeState.StopPrefetcher()
}
}()

// Retrieve the parent block to determine which root to build state on
substart = time.Now()
Expand All @@ -1377,8 +1367,8 @@ func (bc *BlockChain) insertBlock(block *types.Block, writes bool) error {
blockStateInitTimer.Inc(time.Since(substart).Milliseconds())

// Enable prefetching to pull in trie node paths while processing transactions
statedb.StartPrefetcher("chain", bc.cacheConfig.TriePrefetcherParallelism)
activeState = statedb
statedb.StartPrefetcher("chain", state.WithConcurrentWorkers(bc.cacheConfig.TriePrefetcherParallelism))
defer statedb.StopPrefetcher()

// Process block using the parent state as reference point
pstart := time.Now()
Expand Down Expand Up @@ -1736,10 +1726,8 @@ func (bc *BlockChain) reprocessBlock(parent *types.Block, current *types.Block)
}

// Enable prefetching to pull in trie node paths while processing transactions
statedb.StartPrefetcher("chain", bc.cacheConfig.TriePrefetcherParallelism)
defer func() {
statedb.StopPrefetcher()
}()
statedb.StartPrefetcher("chain", state.WithConcurrentWorkers(bc.cacheConfig.TriePrefetcherParallelism))
defer statedb.StopPrefetcher()

// Process previously stored block
receipts, _, usedGas, err := bc.processor.Process(current, parent.Header(), statedb, vm.Config{})
Expand Down
22 changes: 20 additions & 2 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/ava-labs/subnet-evm/trie"
"github.com/ava-labs/subnet-evm/trie/trienode"
"github.com/ava-labs/subnet-evm/trie/triestate"
"github.com/ava-labs/subnet-evm/utils"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
Expand Down Expand Up @@ -200,16 +201,33 @@ func NewWithSnapshot(root common.Hash, db Database, snap snapshot.Snapshot) (*St
return sdb, nil
}

type workerPool struct {
*utils.BoundedWorkers
}

func (wp *workerPool) Done() {
// Done is guaranteed to only be called after all work is already complete,
// so Wait()ing is redundant, but it also releases resources.
wp.BoundedWorkers.Wait()
}

func WithConcurrentWorkers(prefetchers int) PrefetcherOption {
pool := &workerPool{
BoundedWorkers: utils.NewBoundedWorkers(prefetchers),
}
return WithWorkerPools(func() WorkerPool { return pool })
}

// StartPrefetcher initializes a new trie prefetcher to pull in nodes from the
// state trie concurrently while the state is mutated so that when we reach the
// commit phase, most of the needed data is already hot.
func (s *StateDB) StartPrefetcher(namespace string, maxConcurrency int) {
func (s *StateDB) StartPrefetcher(namespace string, opts ...PrefetcherOption) {
if s.prefetcher != nil {
s.prefetcher.close()
s.prefetcher = nil
}
if s.snap != nil {
s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace, maxConcurrency)
s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace, opts...)
}
}

Expand Down
Loading
Loading