Skip to content

Commit

Permalink
refactor: more code extraction in execute
Browse files Browse the repository at this point in the history
  • Loading branch information
V-Staykov committed Dec 5, 2024
1 parent 3776d29 commit f524705
Show file tree
Hide file tree
Showing 2 changed files with 303 additions and 257 deletions.
298 changes: 266 additions & 32 deletions eth/stagedsync/stage_execute_block_eecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,61 +2,172 @@ package stagedsync

import (
"context"
"errors"
"fmt"

"github.com/ledgerwatch/erigon-lib/chain"
"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon-lib/kv/dbutils"
"github.com/ledgerwatch/erigon/consensus/misc"
"github.com/ledgerwatch/erigon/core"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/state"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/core/vm"
"github.com/ledgerwatch/erigon/eth/calltracer"
"github.com/ledgerwatch/erigon/eth/tracers/logger"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/erigon/turbo/shards"
"github.com/ledgerwatch/erigon/zk/hermez_db"
rawdbZk "github.com/ledgerwatch/erigon/zk/rawdb"
"github.com/ledgerwatch/log/v3"
)

var (
ErrExecutionError = fmt.Errorf("execution error")
)

type hermezDb interface {
WriteBlockInfoRoot(blockNum uint64, root common.Hash) error
SetNewTx(tx kv.RwTx)
state.ReadOnlyHermezDb
}

type blockExecutor struct {
// cfg ExecuteBlockCfg
ctx context.Context
logPrefix string
cfg ExecuteBlockCfg
tx kv.RwTx
batch kv.StatelessRwTx
blockReader services.FullBlockReader
accumulator *shards.Accumulator
engine consensus.Engine
vmConfig vm.Config
chainConfig *chain.Config
roHermezDb state.ReadOnlyHermezDb
stateStream bool
initialCycle bool
changeSetHook ChangeSetHook
nextStagesExpectData bool
hostoryPruneTo uint64
receiptsPruneTo uint64
callTracesPruneTo uint64

prevBlockRoot common.Hash
// set internelly
hermezDb hermezDb
stateStream bool

// these change on each block
prevBlockRoot common.Hash
prevBlockHash common.Hash
datastreamBlockHash common.Hash
block *types.Block
currentStateGas uint64
}

func (be *blockExecutor) executeBlock(
block *types.Block,
) (execRs *core.EphemeralExecResultZk, err error) {
func NewBlockExecutor(
ctx context.Context,
logPrefix string,
cfg ExecuteBlockCfg,
tx kv.RwTx,
batch kv.StatelessRwTx,
initialCycle bool,
nextStagesExpectData bool,
) *blockExecutor {
return &blockExecutor{
ctx: ctx,
logPrefix: logPrefix,
cfg: cfg,
tx: tx,
batch: batch,
initialCycle: initialCycle,
nextStagesExpectData: nextStagesExpectData,
hermezDb: hermez_db.NewHermezDb(tx),
}
}

func (be *blockExecutor) Innit(from, to uint64) (err error) {
be.stateStream = !be.initialCycle && be.cfg.stateStream && to-from < stateStreamLimit

be.prevBlockRoot, be.prevBlockHash, err = be.getBlockHashValues(from)
if err != nil {
return fmt.Errorf("getBlockHashValues: %w", err)
}

return nil
}

func (be *blockExecutor) SetNewTx(tx kv.RwTx, batch kv.StatelessRwTx) {
be.tx = tx
be.batch = batch

be.hermezDb.SetNewTx(tx)
}

func (be *blockExecutor) ExecuteBlock(blockNum, to uint64) error {
//fetch values pre execute
datastreamBlockHash, block, senders, err := be.getPreexecuteValues(blockNum, be.prevBlockHash)
if err != nil {
return fmt.Errorf("getPreexecuteValues: %w", err)
}
be.datastreamBlockHash = datastreamBlockHash
be.block = block

execRs, err := be.executeBlock(block, to)
if err != nil {
if !errors.Is(err, context.Canceled) {
log.Warn(fmt.Sprintf("[%s] Execution failed", be.logPrefix), "block", blockNum, "hash", be.datastreamBlockHash.Hex(), "err", err)
if be.cfg.hd != nil {
be.cfg.hd.ReportBadHeaderPoS(be.datastreamBlockHash, block.ParentHash())
}
if be.cfg.badBlockHalt {
return fmt.Errorf("executeBlockZk: %w", err)
}
}
return fmt.Errorf("%w: %w", ErrExecutionError, err)
}

if execRs.BlockInfoTree != nil {
if err = be.hermezDb.WriteBlockInfoRoot(blockNum, *execRs.BlockInfoTree); err != nil {
return fmt.Errorf("WriteBlockInfoRoot: %w", err)
}
}

// exec loop variables
header := block.HeaderNoCopy()
header.GasUsed = uint64(execRs.GasUsed)
header.ReceiptHash = types.DeriveSha(execRs.Receipts)
header.Bloom = execRs.Bloom

be.prevBlockRoot = block.Root()
be.prevBlockHash = header.Hash()

be.currentStateGas = be.currentStateGas + header.GasUsed

//commit values post execute
if err := be.postExecuteCommitValues(be.datastreamBlockHash, block, senders); err != nil {
return fmt.Errorf("postExecuteCommitValues: %w", err)
}

return nil
}

func (be *blockExecutor) getBlockHashValues(number uint64) (common.Hash, common.Hash, error) {
prevheaderHash, err := rawdb.ReadCanonicalHash(be.tx, number)
if err != nil {
return common.Hash{}, common.Hash{}, err
}
header, err := be.cfg.blockReader.Header(be.ctx, be.tx, prevheaderHash, number)
if err != nil {
return common.Hash{}, common.Hash{}, err
}

return header.Root, prevheaderHash, nil
}

func (be *blockExecutor) executeBlock(block *types.Block, to uint64) (execRs *core.EphemeralExecResultZk, err error) {
blockNum := block.NumberU64()

// Incremental move of next stages depend on fully written ChangeSets, Receipts, CallTraceSet
writeChangeSets := be.nextStagesExpectData || blockNum > be.hostoryPruneTo
writeReceipts := be.nextStagesExpectData || blockNum > be.receiptsPruneTo
writeCallTraces := be.nextStagesExpectData || blockNum > be.callTracesPruneTo
writeChangeSets := be.nextStagesExpectData || blockNum > be.cfg.prune.History.PruneTo(to)
writeReceipts := be.nextStagesExpectData || blockNum > be.cfg.prune.Receipts.PruneTo(to)
writeCallTraces := be.nextStagesExpectData || blockNum > be.cfg.prune.CallTraces.PruneTo(to)

stateReader, stateWriter, err := newStateReaderWriter(be.batch, be.tx, block, writeChangeSets, be.accumulator, be.blockReader, be.stateStream)
stateReader, stateWriter, err := newStateReaderWriter(be.batch, be.tx, block, writeChangeSets, be.cfg.accumulator, be.cfg.blockReader, be.stateStream)
if err != nil {
return nil, fmt.Errorf("newStateReaderWriter: %w", err)
}

// where the magic happens
getHeader := func(hash common.Hash, number uint64) *types.Header {
h, _ := be.blockReader.Header(context.Background(), be.tx, hash, number)
h, _ := be.cfg.blockReader.Header(context.Background(), be.tx, hash, number)
return h
}

Expand All @@ -66,11 +177,27 @@ func (be *blockExecutor) executeBlock(
}

callTracer := calltracer.NewCallTracer()
be.vmConfig.Debug = true
be.vmConfig.Tracer = callTracer
be.cfg.vmConfig.Debug = true
be.cfg.vmConfig.Tracer = callTracer

getHashFn := core.GetHashFn(block.Header(), getHeader)
if execRs, err = core.ExecuteBlockEphemerallyZk(be.chainConfig, &be.vmConfig, getHashFn, be.engine, block, stateReader, stateWriter, ChainReaderImpl{config: be.chainConfig, tx: be.tx, blockReader: be.blockReader}, getTracer, be.roHermezDb, &be.prevBlockRoot); err != nil {
if execRs, err = core.ExecuteBlockEphemerallyZk(
be.cfg.chainConfig,
be.cfg.vmConfig,
getHashFn,
be.cfg.engine,
block,
stateReader,
stateWriter,
ChainReaderImpl{
config: be.cfg.chainConfig,
tx: be.tx,
blockReader: be.cfg.blockReader,
},
getTracer,
be.hermezDb,
&be.prevBlockRoot,
); err != nil {
return nil, fmt.Errorf("ExecuteBlockEphemerallyZk: %w", err)
}

Expand All @@ -87,17 +214,124 @@ func (be *blockExecutor) executeBlock(
}
}

if be.changeSetHook != nil {
if be.cfg.changeSetHook != nil {
if hasChangeSet, ok := stateWriter.(HasChangeSetWriter); ok {
be.changeSetHook(blockNum, hasChangeSet.ChangeSetWriter())
be.cfg.changeSetHook(blockNum, hasChangeSet.ChangeSetWriter())
}
}
if writeCallTraces {
if err := callTracer.WriteToDb(be.tx, block, be.vmConfig); err != nil {
if err := callTracer.WriteToDb(be.tx, block, *be.cfg.vmConfig); err != nil {
return nil, fmt.Errorf("WriteToDb: %w", err)
}
}

be.prevBlockRoot = block.Root()
return execRs, nil
}

// gets the pre-execute values for a block and sets the previous block hash
func (be *blockExecutor) getPreexecuteValues(blockNum uint64, prevBlockHash common.Hash) (common.Hash, *types.Block, []common.Address, error) {
preExecuteHeaderHash, err := rawdb.ReadCanonicalHash(be.tx, blockNum)
if err != nil {
return common.Hash{}, nil, nil, fmt.Errorf("ReadCanonicalHash: %w", err)
}

block, senders, err := be.cfg.blockReader.BlockWithSenders(be.ctx, be.tx, preExecuteHeaderHash, blockNum)
if err != nil {
return common.Hash{}, nil, nil, fmt.Errorf("BlockWithSenders: %w", err)
}

if block == nil {
return common.Hash{}, nil, nil, fmt.Errorf("empty block blocknum: %d", blockNum)
}

block.HeaderNoCopy().ParentHash = prevBlockHash

if be.cfg.chainConfig.IsLondon(blockNum) {
parentHeader, err := be.cfg.blockReader.Header(be.ctx, be.tx, prevBlockHash, blockNum-1)
if err != nil {
return common.Hash{}, nil, nil, fmt.Errorf("cfg.blockReader.Header: %w", err)
}
block.HeaderNoCopy().BaseFee = misc.CalcBaseFeeZk(be.cfg.chainConfig, parentHeader)
}

return preExecuteHeaderHash, block, senders, nil
}

func (be *blockExecutor) postExecuteCommitValues(
datastreamBlockHash common.Hash,
block *types.Block,
senders []common.Address,
) error {
header := block.Header()
blockHash := header.Hash()
blockNum := block.NumberU64()

// if datastream hash was wrong, remove old data
if blockHash != datastreamBlockHash {
if be.cfg.chainConfig.IsForkId9Elderberry2(blockNum) {
log.Warn(fmt.Sprintf("[%s] Blockhash mismatch", be.logPrefix), "blockNumber", blockNum, "datastreamBlockHash", datastreamBlockHash, "calculatedBlockHash", blockHash)
}
if err := rawdbZk.DeleteSenders(be.tx, datastreamBlockHash, blockNum); err != nil {
return fmt.Errorf("DeleteSenders: %w", err)
}
if err := rawdbZk.DeleteHeader(be.tx, datastreamBlockHash, blockNum); err != nil {
return fmt.Errorf("DeleteHeader: %w", err)
}

bodyForStorage, err := rawdb.ReadBodyForStorageByKey(be.tx, dbutils.BlockBodyKey(blockNum, datastreamBlockHash))
if err != nil {
return fmt.Errorf("ReadBodyForStorageByKey: %w", err)
}

if err := rawdb.DeleteBodyAndTransactions(be.tx, blockNum, datastreamBlockHash); err != nil {
return fmt.Errorf("DeleteBodyAndTransactions: %w", err)
}
if err := rawdb.WriteBodyAndTransactions(be.tx, blockHash, blockNum, block.Transactions(), bodyForStorage); err != nil {
return fmt.Errorf("WriteBodyAndTransactions: %w", err)
}

// [zkevm] senders were saved in stage_senders for headerHashes based on incomplete headers
// in stage execute we complete the headers and senders should be moved to the correct headerHash
// also we should delete other data based on the old hash, since it is unaccessable now
if err := rawdb.WriteSenders(be.tx, blockHash, blockNum, senders); err != nil {
return fmt.Errorf("failed to write senders: %w", err)
}
}

// TODO: how can we store this data right first time? Or mop up old data as we're currently duping storage
/*
, \ / ,
/ \ )\__/( / \
/ \ (_\ /_) / \
____/_____\__\@ @/___/_____\____
| |\../| |
| \VV/ |
| ZKEVM duping storage |
|_________________________________|
| /\ / \\ \ /\ |
| / V )) V \ |
|/ ` // ' \|
` V '
we need to write the header back to the db at this point as the gas
used wasn't available from the data stream, or receipt hash, or bloom, so we're relying on execution to
provide it. We also need to update the canonical hash, so we can retrieve this newly updated header
later.
*/
if err := rawdb.WriteHeader_zkEvm(be.tx, header); err != nil {
return fmt.Errorf("WriteHeader_zkEvm: %w", err)
}
if err := rawdb.WriteHeadHeaderHash(be.tx, blockHash); err != nil {
return fmt.Errorf("WriteHeadHeaderHash: %w", err)
}
if err := rawdb.WriteCanonicalHash(be.tx, blockHash, blockNum); err != nil {
return fmt.Errorf("WriteCanonicalHash: %w", err)
}

// write the new block lookup entries
if err := rawdb.WriteTxLookupEntries_zkEvm(be.tx, block); err != nil {
return fmt.Errorf("WriteTxLookupEntries_zkEvm: %w", err)
}

return nil
}
Loading

0 comments on commit f524705

Please sign in to comment.