Skip to content

Commit

Permalink
Fill consensus db from snapshots and remove chainDb in parlia (#460)
Browse files Browse the repository at this point in the history
  • Loading branch information
blxdyx committed Jul 30, 2024
1 parent f3bd21a commit 7fca645
Show file tree
Hide file tree
Showing 10 changed files with 62 additions and 199 deletions.
13 changes: 6 additions & 7 deletions cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,7 @@ func stageSnapshots(db kv.RwDB, ctx context.Context, logger log.Logger) error {
defer agg.Close()

br, bw := blocksIO(db, logger)
_, _, _, _, _ = newSync(ctx, db, nil /* miningConfig */, logger)
engine, _, _, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)
chainConfig, _ := fromdb.ChainConfig(db), fromdb.PruneMode(db)

return db.Update(ctx, func(tx kv.RwTx) error {
Expand All @@ -644,7 +644,7 @@ func stageSnapshots(db kv.RwDB, ctx context.Context, logger log.Logger) error {
}
}
dirs := datadir.New(datadirCli)
if err := reset2.ResetBlocks(tx, db, agg, br, bw, dirs, *chainConfig, logger); err != nil {
if err := reset2.ResetBlocks(tx, db, agg, br, bw, dirs, *chainConfig, engine, logger); err != nil {
return fmt.Errorf("resetting blocks: %w", err)
}
ac := agg.BeginFilesRo()
Expand Down Expand Up @@ -689,7 +689,7 @@ func stageHeaders(db kv.RwDB, ctx context.Context, logger log.Logger) error {
defer borSn.Close()
defer agg.Close()
br, bw := blocksIO(db, logger)
_, _, _, _, _ = newSync(ctx, db, nil /* miningConfig */, logger)
engine, _, _, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)
chainConfig, _ := fromdb.ChainConfig(db), fromdb.PruneMode(db)

if integritySlow {
Expand All @@ -716,7 +716,7 @@ func stageHeaders(db kv.RwDB, ctx context.Context, logger log.Logger) error {
}
}

if err := reset2.ResetBlocks(tx, db, agg, br, bw, dirs, *chainConfig, logger); err != nil {
if err := reset2.ResetBlocks(tx, db, agg, br, bw, dirs, *chainConfig, engine, logger); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -1486,8 +1486,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig,
blobStore = parlia.BlobStore
}

stages := stages2.NewDefaultStages(context.Background(), db, snapDb, blobStore, p2p.Config{}, &cfg, sentryControlServer, notifications, nil, blockReader, blockRetire, agg, nil, nil,
heimdallClient, recents, signatures, logger)
stages := stages2.NewDefaultStages(context.Background(), db, snapDb, blobStore, p2p.Config{}, &cfg, sentryControlServer, notifications, nil, blockReader, blockRetire, agg, nil, nil, engine, heimdallClient, recents, signatures, logger)
sync := stagedsync.New(cfg.Sync, stages, stagedsync.DefaultUnwindOrder, stagedsync.DefaultPruneOrder, logger)

miner := stagedsync.NewMiningState(&cfg.Miner)
Expand Down Expand Up @@ -1587,5 +1586,5 @@ func initConsensusEngine(ctx context.Context, cc *chain2.Config, dir string, db
consensusConfig = &config.Ethash
}
return ethconsensusconfig.CreateConsensusEngine(ctx, &nodecfg.Config{Dirs: datadir.New(dir)}, cc, consensusConfig, config.Miner.Notify, config.Miner.Noverify,
heimdallClient, config.WithoutHeimdall, blockReader, db.ReadOnly(), logger, db, nil), heimdallClient
heimdallClient, config.WithoutHeimdall, blockReader, db.ReadOnly(), logger, nil), heimdallClient
}
141 changes: 14 additions & 127 deletions consensus/parlia/parlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package parlia

import (
"bytes"
"context"
"encoding/hex"
"errors"
"fmt"
Expand Down Expand Up @@ -37,7 +36,6 @@ import (
"github.com/erigontech/erigon/consensus/parlia/finality"
"github.com/erigontech/erigon/core"
"github.com/erigontech/erigon/core/forkid"
"github.com/erigontech/erigon/core/rawdb"
"github.com/erigontech/erigon/core/state"
"github.com/erigontech/erigon/core/systemcontracts"
"github.com/erigontech/erigon/core/types"
Expand Down Expand Up @@ -220,7 +218,6 @@ type Parlia struct {
genesisHash libcommon.Hash
db kv.RwDB // Database to store and retrieve snapshot checkpoints
BlobStore services.BlobStorage
chainDb kv.RwDB

recentSnaps *lru.ARCCache[libcommon.Hash, *Snapshot] // Snapshots for recent block to speed up
signatures *lru.ARCCache[libcommon.Hash, libcommon.Address] // Signatures of recent blocks to speed up mining
Expand Down Expand Up @@ -250,7 +247,6 @@ func New(
db kv.RwDB,
blobStore services.BlobStorage,
blockReader services.FullBlockReader,
chainDb kv.RwDB,
logger log.Logger,
) *Parlia {
// get parlia config
Expand Down Expand Up @@ -291,7 +287,6 @@ func New(
config: parliaConfig,
db: db,
BlobStore: blobStore,
chainDb: chainDb,
recentSnaps: recentSnaps,
signatures: signatures,
validatorSetABIBeforeLuban: vABIBeforeLuban,
Expand Down Expand Up @@ -683,24 +678,19 @@ func (p *Parlia) verifyCascadingFields(chain consensus.ChainHeaderReader, header
}

// All basic checks passed, verify the seal and return
return p.verifySeal(chain, header, parents)
return p.verifySeal(header, snap)
}

// verifySeal checks whether the signature contained in the header satisfies the
// consensus protocol requirements. The method accepts an optional list of parent
// headers that aren't yet part of the local blockchain to generate the snapshots
// from.
func (p *Parlia) verifySeal(chain consensus.ChainHeaderReader, header *types.Header, parents []*types.Header) error {
func (p *Parlia) verifySeal(header *types.Header, snap *Snapshot) error {
// Verifying the genesis block is not supported
number := header.Number.Uint64()
if number == 0 {
return errUnknownBlock
}
// Retrieve the snapshot needed to verify this header and cache it
snap, err := p.snapshot(chain, number-1, header.ParentHash, parents, true /* verify */)
if err != nil {
return err
}

// Resolve the authorization key and check against validators
signer, err := ecrecover(header, p.signatures, p.chainConfig.ChainID)
Expand Down Expand Up @@ -743,15 +733,8 @@ func (p *Parlia) snapshot(chain consensus.ChainHeaderReader, number uint64, hash
var (
headers []*types.Header
snap *Snapshot
doLog bool
)

if s, ok := p.recentSnaps.Get(hash); ok {
snap = s
} else {
doLog = true
}

for snap == nil {
// If an in-memory snapshot was found, use that
if s, ok := p.recentSnaps.Get(hash); ok {
Expand All @@ -762,7 +745,7 @@ func (p *Parlia) snapshot(chain consensus.ChainHeaderReader, number uint64, hash
// If an on-disk checkpoint snapshot can be found, use that
if number%CheckpointInterval == 0 {
if s, err := loadSnapshot(p.config, p.signatures, p.db, number, hash); err == nil {
//log.Trace("Loaded snapshot from disk", "number", number, "hash", hash)
p.logger.Trace("Loaded snapshot from disk", "number", number, "hash", hash)
snap = s
if !verify || snap != nil {
break
Expand Down Expand Up @@ -799,10 +782,6 @@ func (p *Parlia) snapshot(chain consensus.ChainHeaderReader, number uint64, hash
}
parents = parents[:len(parents)-1]
} else {
if doLog && number%100_000 == 0 {
// No explicit parents (or no more left), reach out to the database
p.logger.Info("[parlia] snapshots build, gather headers", "block", number)
}
header = chain.GetHeader(hash, number)
if header == nil {
return nil, consensus.ErrUnknownAncestor
Expand All @@ -822,18 +801,17 @@ func (p *Parlia) snapshot(chain consensus.ChainHeaderReader, number uint64, hash
headers[i], headers[len(headers)-1-i] = headers[len(headers)-1-i], headers[i]
}

snap, err := snap.apply(headers, chain, parents, p.chainConfig, doLog)
snap, err := snap.apply(headers, chain, parents, p.chainConfig, p.recentSnaps)
if err != nil {
return nil, err
}
p.recentSnaps.Add(snap.Hash, snap)

// If we've generated a new checkpoint snapshot, save to disk
if verify && snap.Number%CheckpointInterval == 0 && len(headers) > 0 {
if err = snap.store(p.db); err != nil {
return nil, err
}
//log.Trace("Stored snapshot to disk", "number", snap.Number, "hash", snap.Hash)
p.logger.Trace("Stored snapshot to disk", "number", snap.Number, "hash", snap.Hash)
}
return snap, err
}
Expand Down Expand Up @@ -1200,82 +1178,6 @@ func (p *Parlia) Authorize(val libcommon.Address, signFn SignFn) {
// Note, the method returns immediately and will send the result async. More
// than one result may also be returned depending on the consensus algorithm.
func (p *Parlia) Seal(chain consensus.ChainHeaderReader, block *types.Block, results chan<- *types.Block, stop <-chan struct{}) error {
header := block.Header()

// Sealing the genesis block is not supported
number := header.Number.Uint64()
if number == 0 {
return errUnknownBlock
}
// For 0-period chains, refuse to seal empty blocks (no reward but would spin sealing)
if p.config.Period == 0 && len(block.Transactions()) == 0 {
p.logger.Info("[parlia] Sealing paused, waiting for transactions")
return nil
}
// Don't hold the val fields for the entire sealing procedure
p.signerLock.RLock()
val, signFn := p.val, p.signFn
p.signerLock.RUnlock()

snap, err := p.snapshot(chain, number-1, header.ParentHash, nil, false /* verify */)
if err != nil {
return err
}

// Bail out if we're unauthorized to sign a block
if _, authorized := snap.Validators[val]; !authorized {
return fmt.Errorf("parlia.Seal: %w", errUnauthorizedValidator)
}

// If we're amongst the recent signers, wait for the next block
for seen, recent := range snap.Recents {
if recent == val {
// Signer is among recent, only wait if the current block doesn't shift it out
if limit := uint64(len(snap.Validators)/2 + 1); number < limit || seen > number-limit {
p.logger.Info("[parlia] Signed recently, must wait for others")
return nil
}
}
}

// Sweet, the protocol permits us to sign the block, wait for our time
delay := p.delayForRamanujanFork(snap, header)

p.logger.Info("Sealing block with", "number", number, "delay", delay, "headerDifficulty", header.Difficulty, "val", val.Hex(), "headerHash", header.Hash().Hex(), "gasUsed", header.GasUsed, "block txn number", block.Transactions().Len(), "State Root", header.Root)

// Sign all the things!
sig, err := signFn(val, crypto.Keccak256(parliaRLP(header, p.chainConfig.ChainID)), p.chainConfig.ChainID)
if err != nil {
return err
}
copy(header.Extra[len(header.Extra)-extraSeal:], sig)

// Wait until sealing is terminated or delay timeout.
//log.Trace("Waiting for slot to sign and propagate", "delay", common.PrettyDuration(delay))
go func() {
select {
case <-stop:
return
case <-time.After(delay):
}
if p.shouldWaitForCurrentBlockProcess(p.chainDb, header, snap) {
p.logger.Info("[parlia] Waiting for received in turn block to process")
select {
case <-stop:
p.logger.Info("[parlia] Received block process finished, abort block seal")
return
case <-time.After(time.Duration(processBackOffTime) * time.Second):
p.logger.Info("[parlia] Process backoff time exhausted, start to seal block")
}
}

select {
case results <- block.WithSeal(header):
default:
p.logger.Warn("[parlia] Sealing result is not read by miner", "sealhash", types.SealHash(header, p.chainConfig.ChainID))
}
}()

return nil
}

Expand Down Expand Up @@ -1378,30 +1280,6 @@ func (p *Parlia) IsSystemContract(to *libcommon.Address) bool {
return isToSystemContract(*to)
}

func (p *Parlia) shouldWaitForCurrentBlockProcess(chainDb kv.RwDB, header *types.Header, snap *Snapshot) bool {
if header.Difficulty.Cmp(diffInTurn) == 0 {
return false
}

roTx, err := chainDb.BeginRo(context.Background())
if err != nil {
return false
}
defer roTx.Rollback()
hash := rawdb.ReadHeadHeaderHash(roTx)
number := rawdb.ReadHeaderNumber(roTx, hash)

highestVerifiedHeader := rawdb.ReadHeader(roTx, hash, *number)
if highestVerifiedHeader == nil {
return false
}

if header.ParentHash == highestVerifiedHeader.ParentHash {
return true
}
return false
}

func (p *Parlia) EnoughDistance(chain consensus.ChainReader, header *types.Header) bool {
snap, err := p.snapshot(chain, header.Number.Uint64()-1, header.ParentHash, nil, false /* verify */)
if err != nil {
Expand Down Expand Up @@ -1730,3 +1608,12 @@ func (c *Parlia) GetTransferFunc() evmtypes.TransferFunc {
func (c *Parlia) GetPostApplyMessageFunc() evmtypes.PostApplyMessageFunc {
return nil
}

func (p *Parlia) blockTimeVerifyForRamanujanFork(snap *Snapshot, header, parent *types.Header) error {
if p.chainConfig.IsRamanujan(header.Number.Uint64()) {
if header.Time < parent.Time+p.config.Period+backOffTime(snap, header, header.Coinbase, p.chainConfig) {
return fmt.Errorf("header %d, time %d, now %d, period: %d, backof: %d, %w", header.Number.Uint64(), header.Time, time.Now().Unix(), p.config.Period, backOffTime(snap, header, header.Coinbase, p.chainConfig), consensus.ErrFutureBlock)
}
}
return nil
}
45 changes: 0 additions & 45 deletions consensus/parlia/ramanujanfork.go

This file was deleted.

12 changes: 8 additions & 4 deletions consensus/parlia/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func (s *Snapshot) updateAttestation(header *types.Header, chainConfig *chain.Co
}
}

func (s *Snapshot) apply(headers []*types.Header, chain consensus.ChainHeaderReader, parents []*types.Header, chainConfig *chain.Config, doLog bool) (*Snapshot, error) {
func (s *Snapshot) apply(headers []*types.Header, chain consensus.ChainHeaderReader, parents []*types.Header, chainConfig *chain.Config, recentSnaps *lru.ARCCache[libcommon.Hash, *Snapshot]) (*Snapshot, error) {
// Allow passing in no headers for cleaner code
if len(headers) == 0 {
return s, nil
Expand All @@ -260,7 +260,7 @@ func (s *Snapshot) apply(headers []*types.Header, chain consensus.ChainHeaderRea

for _, header := range headers {
number := header.Number.Uint64()
if doLog && number%100_000 == 0 {
if number%100_000 == 0 {
log.Info("[parlia] snapshots build, recover from headers", "block", number)
}
// Delete the oldest validator from the recent list to allow it signing again
Expand Down Expand Up @@ -365,9 +365,13 @@ func (s *Snapshot) apply(headers []*types.Header, chain consensus.ChainHeaderRea
delete(snap.RecentForkHashes, number-i)
}
}
snap.Number = number
snap.Hash = header.Hash()
if snap.Number+s.config.Epoch >= headers[len(headers)-1].Number.Uint64() {
historySnap := snap.copy()
recentSnaps.Add(historySnap.Hash, historySnap)
}
}
snap.Number += uint64(len(headers))
snap.Hash = headers[len(headers)-1].Hash()
return snap, nil
}

Expand Down
Loading

0 comments on commit 7fca645

Please sign in to comment.