Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: fill parlia db from snapshots #463

Merged
merged 1 commit into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -1670,7 +1670,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig,
}

stages := stages2.NewDefaultStages(context.Background(), db, snapDb, blobStore, p2p.Config{}, &cfg, sentryControlServer, notifications, nil, blockReader, blockRetire, agg, nil, nil,
heimdallClient, recents, signatures, logger)
engine, heimdallClient, recents, signatures, logger)
sync := stagedsync.New(cfg.Sync, stages, stagedsync.DefaultUnwindOrder, stagedsync.DefaultPruneOrder, logger)

miner := stagedsync.NewMiningState(&cfg.Miner)
Expand Down
132 changes: 5 additions & 127 deletions consensus/parlia/parlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package parlia

import (
"bytes"
"context"
"encoding/hex"
"errors"
"fmt"
Expand All @@ -29,7 +28,6 @@ import (
"github.com/holiman/uint256"
"github.com/ledgerwatch/erigon-lib/common/hexutility"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/state"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/log/v3"
Expand Down Expand Up @@ -218,7 +216,6 @@ type Parlia struct {
genesisHash libcommon.Hash
db kv.RwDB // Database to store and retrieve snapshot checkpoints
BlobStore services.BlobStorage
chainDb kv.RwDB

recentSnaps *lru.ARCCache[libcommon.Hash, *Snapshot] // Snapshots for recent block to speed up
signatures *lru.ARCCache[libcommon.Hash, libcommon.Address] // Signatures of recent blocks to speed up mining
Expand Down Expand Up @@ -248,7 +245,6 @@ func New(
db kv.RwDB,
blobStore services.BlobStorage,
blockReader services.FullBlockReader,
chainDb kv.RwDB,
logger log.Logger,
) *Parlia {
// get parlia config
Expand Down Expand Up @@ -289,7 +285,6 @@ func New(
config: parliaConfig,
db: db,
BlobStore: blobStore,
chainDb: chainDb,
recentSnaps: recentSnaps,
signatures: signatures,
validatorSetABIBeforeLuban: vABIBeforeLuban,
Expand Down Expand Up @@ -657,24 +652,19 @@ func (p *Parlia) verifyCascadingFields(chain consensus.ChainHeaderReader, header
}

// All basic checks passed, verify the seal and return
return p.verifySeal(chain, header, parents)
return p.verifySeal(header, snap)
}

// verifySeal checks whether the signature contained in the header satisfies the
// consensus protocol requirements. The method accepts an optional list of parent
// headers that aren't yet part of the local blockchain to generate the snapshots
// from.
func (p *Parlia) verifySeal(chain consensus.ChainHeaderReader, header *types.Header, parents []*types.Header) error {
func (p *Parlia) verifySeal(header *types.Header, snap *Snapshot) error {
// Verifying the genesis block is not supported
number := header.Number.Uint64()
if number == 0 {
return errUnknownBlock
}
// Retrieve the snapshot needed to verify this header and cache it
snap, err := p.snapshot(chain, number-1, header.ParentHash, parents, true /* verify */)
if err != nil {
return err
}

// Resolve the authorization key and check against validators
signer, err := ecrecover(header, p.signatures, p.chainConfig.ChainID)
Expand Down Expand Up @@ -768,15 +758,8 @@ func (p *Parlia) snapshot(chain consensus.ChainHeaderReader, number uint64, hash
var (
headers []*types.Header
snap *Snapshot
doLog bool
)

if s, ok := p.recentSnaps.Get(hash); ok {
snap = s
} else {
doLog = true
}

for snap == nil {
// If an in-memory snapshot was found, use that
if s, ok := p.recentSnaps.Get(hash); ok {
Expand All @@ -787,7 +770,7 @@ func (p *Parlia) snapshot(chain consensus.ChainHeaderReader, number uint64, hash
// If an on-disk checkpoint snapshot can be found, use that
if number%CheckpointInterval == 0 {
if s, err := loadSnapshot(p.config, p.signatures, p.db, number, hash); err == nil {
//log.Trace("Loaded snapshot from disk", "number", number, "hash", hash)
log.Trace("Loaded snapshot from disk", "number", number, "hash", hash)
snap = s
if !verify || snap != nil {
break
Expand Down Expand Up @@ -824,10 +807,6 @@ func (p *Parlia) snapshot(chain consensus.ChainHeaderReader, number uint64, hash
}
parents = parents[:len(parents)-1]
} else {
if doLog && number%100_000 == 0 {
// No explicit parents (or no more left), reach out to the database
p.logger.Info("[parlia] snapshots build, gather headers", "block", number)
}
header = chain.GetHeader(hash, number)
if header == nil {
return nil, fmt.Errorf("header = %v, hash = %v, err = %v", number, hash, consensus.ErrUnknownAncestor)
Expand All @@ -847,18 +826,17 @@ func (p *Parlia) snapshot(chain consensus.ChainHeaderReader, number uint64, hash
headers[i], headers[len(headers)-1-i] = headers[len(headers)-1-i], headers[i]
}

snap, err := snap.apply(headers, chain, parents, p.chainConfig, doLog)
snap, err := snap.apply(headers, chain, parents, p.chainConfig, p.recentSnaps)
if err != nil {
return nil, err
}
p.recentSnaps.Add(snap.Hash, snap)

// If we've generated a new checkpoint snapshot, save to disk
if verify && snap.Number%CheckpointInterval == 0 && len(headers) > 0 {
if err = snap.store(p.db); err != nil {
return nil, err
}
//log.Trace("Stored snapshot to disk", "number", snap.Number, "hash", snap.Hash)
log.Trace("Stored snapshot to disk", "number", snap.Number, "hash", snap.Hash)
}
return snap, err
}
Expand Down Expand Up @@ -1241,82 +1219,6 @@ func (p *Parlia) Authorize(val libcommon.Address, signFn SignFn) {
// Note, the method returns immediately and will send the result async. More
// than one result may also be returned depending on the consensus algorithm.
func (p *Parlia) Seal(chain consensus.ChainHeaderReader, block *types.Block, results chan<- *types.Block, stop <-chan struct{}) error {
header := block.Header()

// Sealing the genesis block is not supported
number := header.Number.Uint64()
if number == 0 {
return errUnknownBlock
}
// For 0-period chains, refuse to seal empty blocks (no reward but would spin sealing)
if p.config.Period == 0 && len(block.Transactions()) == 0 {
p.logger.Info("[parlia] Sealing paused, waiting for transactions")
return nil
}
// Don't hold the val fields for the entire sealing procedure
p.signerLock.RLock()
val, signFn := p.val, p.signFn
p.signerLock.RUnlock()

snap, err := p.snapshot(chain, number-1, header.ParentHash, nil, false /* verify */)
if err != nil {
return err
}

// Bail out if we're unauthorized to sign a block
if _, authorized := snap.Validators[val]; !authorized {
return fmt.Errorf("parlia.Seal: %w", errUnauthorizedValidator)
}

// If we're amongst the recent signers, wait for the next block
for seen, recent := range snap.Recents {
if recent == val {
// Signer is among recent, only wait if the current block doesn't shift it out
if limit := uint64(len(snap.Validators)/2 + 1); number < limit || seen > number-limit {
p.logger.Info("[parlia] Signed recently, must wait for others")
return nil
}
}
}

// Sweet, the protocol permits us to sign the block, wait for our time
delay := p.delayForRamanujanFork(snap, header)

p.logger.Info("Sealing block with", "number", number, "delay", delay, "headerDifficulty", header.Difficulty, "val", val.Hex(), "headerHash", header.Hash().Hex(), "gasUsed", header.GasUsed, "block txn number", block.Transactions().Len(), "State Root", header.Root)

// Sign all the things!
sig, err := signFn(val, crypto.Keccak256(parliaRLP(header, p.chainConfig.ChainID)), p.chainConfig.ChainID)
if err != nil {
return err
}
copy(header.Extra[len(header.Extra)-extraSeal:], sig)

// Wait until sealing is terminated or delay timeout.
//log.Trace("Waiting for slot to sign and propagate", "delay", common.PrettyDuration(delay))
go func() {
select {
case <-stop:
return
case <-time.After(delay):
}
if p.shouldWaitForCurrentBlockProcess(p.chainDb, header, snap) {
p.logger.Info("[parlia] Waiting for received in turn block to process")
select {
case <-stop:
p.logger.Info("[parlia] Received block process finished, abort block seal")
return
case <-time.After(time.Duration(processBackOffTime) * time.Second):
p.logger.Info("[parlia] Process backoff time exhausted, start to seal block")
}
}

select {
case results <- block.WithSeal(header):
default:
p.logger.Warn("[parlia] Sealing result is not read by miner", "sealhash", types.SealHash(header, p.chainConfig.ChainID))
}
}()

return nil
}

Expand Down Expand Up @@ -1419,30 +1321,6 @@ func (p *Parlia) IsSystemContract(to *libcommon.Address) bool {
return isToSystemContract(*to)
}

func (p *Parlia) shouldWaitForCurrentBlockProcess(chainDb kv.RwDB, header *types.Header, snap *Snapshot) bool {
if header.Difficulty.Cmp(diffInTurn) == 0 {
return false
}

roTx, err := chainDb.BeginRo(context.Background())
if err != nil {
return false
}
defer roTx.Rollback()
hash := rawdb.ReadHeadHeaderHash(roTx)
number := rawdb.ReadHeaderNumber(roTx, hash)

highestVerifiedHeader := rawdb.ReadHeader(roTx, hash, *number)
if highestVerifiedHeader == nil {
return false
}

if header.ParentHash == highestVerifiedHeader.ParentHash {
return true
}
return false
}

func (p *Parlia) EnoughDistance(chain consensus.ChainReader, header *types.Header) bool {
snap, err := p.snapshot(chain, header.Number.Uint64()-1, header.ParentHash, nil, false /* verify */)
if err != nil {
Expand Down
19 changes: 0 additions & 19 deletions consensus/parlia/ramanujanfork.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,12 @@ package parlia

import (
"fmt"
"math/rand"
"time"

"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon/core/types"
)

const (
wiggleTimeBeforeFork = 500 * time.Millisecond // Random delay (per signer) to allow concurrent signers
fixedBackOffTimeBeforeFork = 200 * time.Millisecond
)

func (p *Parlia) delayForRamanujanFork(snap *Snapshot, header *types.Header) time.Duration {
delay := time.Until(time.Unix(int64(header.Time), 0)) // nolint: gosimple
if p.chainConfig.IsRamanujan(header.Number.Uint64()) {
return delay
}
if header.Difficulty.Cmp(diffNoTurn) == 0 {
// It's not our turn explicitly to sign, delay it a bit
wiggle := time.Duration(len(snap.Validators)/2+1) * wiggleTimeBeforeFork
delay += fixedBackOffTimeBeforeFork + time.Duration(rand.Int63n(int64(wiggle))) // nolint
}
return delay
}

func (p *Parlia) blockTimeForRamanujanFork(snap *Snapshot, header, parent *types.Header) uint64 {
blockTime := parent.Time + p.config.Period
if p.chainConfig.IsRamanujan(header.Number.Uint64()) {
Expand Down
13 changes: 9 additions & 4 deletions consensus/parlia/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func (s *Snapshot) updateAttestation(header *types.Header, chainConfig *chain.Co
}
}

func (s *Snapshot) apply(headers []*types.Header, chain consensus.ChainHeaderReader, parents []*types.Header, chainConfig *chain.Config, doLog bool) (*Snapshot, error) {
func (s *Snapshot) apply(headers []*types.Header, chain consensus.ChainHeaderReader, parents []*types.Header, chainConfig *chain.Config, recentSnaps *lru.ARCCache[libcommon.Hash, *Snapshot]) (*Snapshot, error) {
// Allow passing in no headers for cleaner code
if len(headers) == 0 {
return s, nil
Expand All @@ -260,7 +260,7 @@ func (s *Snapshot) apply(headers []*types.Header, chain consensus.ChainHeaderRea

for _, header := range headers {
number := header.Number.Uint64()
if doLog && number%100_000 == 0 {
if number%100_000 == 0 {
log.Info("[parlia] snapshots build, recover from headers", "block", number)
}
// Delete the oldest validator from the recent list to allow it signing again
Expand Down Expand Up @@ -365,9 +365,14 @@ func (s *Snapshot) apply(headers []*types.Header, chain consensus.ChainHeaderRea
delete(snap.RecentForkHashes, number-i)
}
}
snap.Number = number
snap.Hash = header.Hash()
if snap.Number+s.config.Epoch >= headers[len(headers)-1].Number.Uint64() {
historySnap := snap.copy()
recentSnaps.Add(historySnap.Hash, historySnap)
}
}
snap.Number += uint64(len(headers))
snap.Hash = headers[len(headers)-1].Hash()

return snap, nil
}

Expand Down
2 changes: 1 addition & 1 deletion core/rawdb/rawdbreset/reset_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func ResetBlocks(tx kv.RwTx, db kv.RoDB, agg *state.Aggregator,
}

if br.FreezingCfg().Enabled && br.FrozenBlocks() > 0 {
if err := stagedsync.FillDBFromSnapshots("filling_db_from_snapshots", context.Background(), tx, dirs, br, agg, logger); err != nil {
if err := stagedsync.FillDBFromSnapshots("filling_db_from_snapshots", context.Background(), tx, dirs, br, agg, cc, engine, logger); err != nil {
return err
}
_ = stages.SaveStageProgress(tx, stages.Snapshots, br.FrozenBlocks())
Expand Down
4 changes: 2 additions & 2 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -808,7 +808,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
backend.ethBackendRPC, backend.miningRPC, backend.stateChangesClient = ethBackendRPC, miningRPC, stateDiffClient

backend.syncStages = stages2.NewDefaultStages(backend.sentryCtx, backend.chainDB, snapDb, blobStore, p2pConfig, config, backend.sentriesClient, backend.notifications, backend.downloaderClient,
blockReader, blockRetire, backend.agg, backend.silkworm, backend.forkValidator, heimdallClient, recents, signatures, logger)
blockReader, blockRetire, backend.agg, backend.silkworm, backend.forkValidator, backend.engine, heimdallClient, recents, signatures, logger)
backend.syncUnwindOrder = stagedsync.DefaultUnwindOrder
backend.syncPruneOrder = stagedsync.DefaultPruneOrder
backend.stagedSync = stagedsync.New(config.Sync, backend.syncStages, backend.syncUnwindOrder, backend.syncPruneOrder, logger)
Expand All @@ -831,7 +831,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
}

checkStateRoot := true
pipelineStages := stages2.NewPipelineStages(ctx, chainKv, blobStore, config, p2pConfig, backend.sentriesClient, backend.notifications, backend.downloaderClient, blockReader, blockRetire, backend.agg, backend.silkworm, backend.forkValidator, logger, checkStateRoot)
pipelineStages := stages2.NewPipelineStages(ctx, chainKv, blobStore, config, p2pConfig, backend.sentriesClient, backend.notifications, backend.downloaderClient, blockReader, blockRetire, backend.agg, backend.silkworm, backend.forkValidator, backend.engine, logger, checkStateRoot)
backend.pipelineStagedSync = stagedsync.New(config.Sync, pipelineStages, stagedsync.PipelineUnwindOrder, stagedsync.PipelinePruneOrder, logger)
backend.eth1ExecutionServer = eth1.NewEthereumExecutionModule(blockReader, chainKv, backend.pipelineStagedSync, backend.forkValidator, chainConfig, assembleBlockPOS, hook, backend.notifications.Accumulator, backend.notifications.StateChangesConsumer, logger, backend.engine, config.HistoryV3, ctx)
executionRpc := direct.NewExecutionClientDirect(backend.eth1ExecutionServer)
Expand Down
7 changes: 4 additions & 3 deletions eth/ethconsensusconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package ethconsensusconfig

import (
"context"
"github.com/ledgerwatch/erigon/core/blob_storage"
"github.com/spf13/afero"
"math"
"path/filepath"

"github.com/ledgerwatch/erigon/core/blob_storage"
"github.com/spf13/afero"

"github.com/davecgh/go-spew/spew"
"github.com/ledgerwatch/log/v3"

Expand Down Expand Up @@ -122,7 +123,7 @@ func CreateConsensusEngine(ctx context.Context, nodeConfig *nodecfg.Config, chai
}
blobStore := blob_storage.NewBlobStore(blobDb, afero.NewBasePathFs(afero.NewOsFs(), nodeConfig.Dirs.DataDir), math.MaxUint64, chainConfig, blockReader)

eng = parlia.New(chainConfig, db, blobStore, blockReader, chainDb[0], logger)
eng = parlia.New(chainConfig, db, blobStore, blockReader, logger)
}
case *borcfg.BorConfig:
// If Matic bor consensus is requested, set it up
Expand Down
Loading
Loading