Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support bolt #2752

Closed
wants to merge 38 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/utils/flags.go
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0x08Dc6385204D434f0A407902eF6A271de0366912

Original file line number Diff line number Diff line change
Expand Up @@ -1675,7 +1675,8 @@ func SetNodeConfig(ctx *cli.Context, cfg *node.Config) {
}
if ctx.IsSet(DBEngineFlag.Name) {
dbEngine := ctx.String(DBEngineFlag.Name)
if dbEngine != "leveldb" && dbEngine != "pebble" {

if dbEngine != "leveldb" && dbEngine != "pebble" && dbEngine != "bolt" {
Fatalf("Invalid choice for db.engine '%s', allowed 'leveldb' or 'pebble'", dbEngine)
}
log.Info(fmt.Sprintf("Using %s as db engine", dbEngine))
Expand Down
52 changes: 38 additions & 14 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1738,6 +1738,11 @@ func (bc *BlockChain) writeKnownBlock(block *types.Block) error {
// writeBlockWithState writes block, metadata and corresponding state data to the
// database.
func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) error {
log.Info("begin write block")
start := time.Now()
defer func() {
log.Info("begin finish block", "cost time", time.Since(start).Milliseconds(), "time", "ms")
}()
// Calculate the total difficulty of the block
ptd := bc.GetTd(block.ParentHash(), block.NumberU64()-1)
if ptd == nil {
Expand All @@ -1754,6 +1759,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
log.Info("block batch write start")
blockBatch := bc.db.BlockStore().NewBatch()
rawdb.WriteTd(blockBatch, block.Hash(), block.NumberU64(), externTd)
rawdb.WriteBlock(blockBatch, block)
Expand All @@ -1762,6 +1768,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
if bc.chainConfig.IsCancun(block.Number(), block.Time()) {
rawdb.WriteBlobSidecars(blockBatch, block.Hash(), block.NumberU64(), block.Sidecars())
}
log.Info("block batch write middle1")
if bc.db.StateStore() != nil {
rawdb.WritePreimages(bc.db.StateStore(), state.Preimages())
} else {
Expand All @@ -1770,12 +1777,14 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
if err := blockBatch.Write(); err != nil {
log.Crit("Failed to write block into disk", "err", err)
}
log.Info("block batch write middle2")
bc.hc.tdCache.Add(block.Hash(), externTd)
bc.blockCache.Add(block.Hash(), block)
bc.cacheReceipts(block.Hash(), receipts, block)
if bc.chainConfig.IsCancun(block.Number(), block.Time()) {
bc.sidecarsCache.Add(block.Hash(), block.Sidecars())
}
log.Info("block batch write finish")
wg.Done()
}()

Expand All @@ -1789,6 +1798,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
return nil
}

log.Info("trie db write start")
triedb := bc.stateCache.TrieDB()
// If we're running an archive node, always flush
if bc.cacheConfig.TrieDirtyDisabled {
Expand Down Expand Up @@ -1857,14 +1867,18 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
}()
}
wg2.Wait()
log.Info("trie db write finish")
return nil
}
// Commit all cached state changes into underlying memory database.

log.Info("state commit start")
_, diffLayer, err := state.Commit(block.NumberU64(), tryCommitTrieDB)
if err != nil {
return err
}

log.Info("state commit finish")
// Ensure no empty block body
if diffLayer != nil && block.Header().TxHash != types.EmptyRootHash {
// Filling necessary field
Expand All @@ -1880,7 +1894,10 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.

go bc.cacheDiffLayer(diffLayer, diffLayerCh)
}

log.Info("state commit finish2")
wg.Wait()
log.Info("state commit finish3")
return nil
}

Expand Down Expand Up @@ -2219,42 +2236,49 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
bc.updateHighestVerifiedHeader(block.Header())

// Enable prefetching to pull in trie node paths while processing transactions
statedb.StartPrefetcher("chain")
interruptCh := make(chan struct{})
// For diff sync, it may fallback to full sync, so we still do prefetch
if len(block.Transactions()) >= prefetchTxNumber {
// do Prefetch in a separate goroutine to avoid blocking the critical path
/*
statedb.StartPrefetcher("chain")
interruptCh := make(chan struct{})
// For diff sync, it may fallback to full sync, so we still do prefetch
if len(block.Transactions()) >= prefetchTxNumber {
// do Prefetch in a separate goroutine to avoid blocking the critical path

// 1.do state prefetch for snapshot cache
throwaway := statedb.CopyDoPrefetch()
go bc.prefetcher.Prefetch(block, throwaway, &bc.vmConfig, interruptCh)
// 1.do state prefetch for snapshot cache
throwaway := statedb.CopyDoPrefetch()
go bc.prefetcher.Prefetch(block, throwaway, &bc.vmConfig, interruptCh)

// 2.do trie prefetch for MPT trie node cache
// it is for the big state trie tree, prefetch based on transaction's From/To address.
// trie prefetcher is thread safe now, ok to prefetch in a separate routine
go throwaway.TriePrefetchInAdvance(block, signer)
}
// 2.do trie prefetch for MPT trie node cache
// it is for the big state trie tree, prefetch based on transaction's From/To address.
// trie prefetcher is thread safe now, ok to prefetch in a separate routine
go throwaway.TriePrefetchInAdvance(block, signer)
}

*/

// Process block using the parent state as reference point
statedb.SetExpectedStateRoot(block.Root())
pstart := time.Now()
fmt.Println("begin exection")
statedb, receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig)
close(interruptCh) // state prefetch can be stopped
// close(interruptCh) // state prefetch can be stopped
if err != nil {
bc.reportBlock(block, receipts, err)
statedb.StopPrefetcher()
return it.index, err
}
fmt.Println("finish exection")
ptime := time.Since(pstart)

// Validate the state using the default validator
vstart := time.Now()
fmt.Println("begin validation")
if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil {
log.Error("validate state failed", "error", err)
bc.reportBlock(block, receipts, err)
statedb.StopPrefetcher()
return it.index, err
}
fmt.Println("finish validation")
vtime := time.Since(vstart)
proctime := time.Since(start) // processing + validation

Expand Down
17 changes: 16 additions & 1 deletion core/rawdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"strings"
"time"

"github.com/ethereum/go-ethereum/ethdb/bboltdb"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/ethdb/leveldb"
Expand Down Expand Up @@ -680,9 +682,19 @@ func NewPebbleDBDatabase(file string, cache int, handles int, namespace string,
return NewDatabase(db), nil
}

func NewBlotDBDataBase(file string, cache int, handles int, namespace string, readonly, ephemeral bool) (ethdb.Database, error) {
db, err := bboltdb.New(file, cache, handles, namespace, readonly, ephemeral)
if err != nil {
return nil, err
}
log.Info("Using bblot as the backing database")
return NewDatabase(db), nil
}

const (
dbPebble = "pebble"
dbLeveldb = "leveldb"
dbBbolt = "bolt"
)

// PreexistingDatabase checks the given data directory whether a database is already
Expand Down Expand Up @@ -731,9 +743,12 @@ type OpenOptions struct {
// db is existent | from db | specified type (if compatible)
func openKeyValueDatabase(o OpenOptions) (ethdb.Database, error) {
// Reject any unsupported database type
if len(o.Type) != 0 && o.Type != dbLeveldb && o.Type != dbPebble {
if len(o.Type) != 0 && o.Type != dbLeveldb && o.Type != dbPebble && o.Type != dbBbolt {
return nil, fmt.Errorf("unknown db.engine %v", o.Type)
}
if o.Type == dbBbolt {
return NewBlotDBDataBase(o.Directory, o.Cache, o.Handles, o.Namespace, o.ReadOnly, o.Ephemeral)
}
// Retrieve any pre-existing database's type and use that or the requested one
// as long as there's no conflict between the two types
existingDb := PreexistingDatabase(o.Directory)
Expand Down
4 changes: 4 additions & 0 deletions core/state/snapshot/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,9 +718,13 @@ func (dl *diskLayer) generate(stats *generatorStats) {
close(dl.genPending)
dl.lock.Unlock()

log.Info("Generated state snapshot finish0")

// Someone will be looking for us, wait it out
abort = <-dl.genAbort
log.Info("Generated state snapshot finish1")
abort <- nil
log.Info("Generated state snapshot finish2")
}

// increaseKey increase the input key by one bit. Return nil if the entire
Expand Down
2 changes: 2 additions & 0 deletions core/state/snapshot/holdable_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package snapshot
import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
)

// holdableIterator is a wrapper of underlying database iterator. It extends
Expand Down Expand Up @@ -73,6 +74,7 @@ func (it *holdableIterator) Release() {
it.atHeld = false
it.key = nil
it.val = nil
log.Info("iterator release")
it.it.Release()
}

Expand Down
Loading
Loading