Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: support new api engine_opSealPayload and optimize overheads of block mining #193

Open
wants to merge 12 commits into
base: develop
Choose a base branch
from
12 changes: 12 additions & 0 deletions beacon/engine/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ import (
// building of the payload to commence.
type PayloadVersion byte

const (
GetPayloadStage = "getPayload"
NewPayloadStage = "newPayload"
ForkchoiceUpdatedStage = "forkchoiceUpdated"
)

var (
PayloadV1 PayloadVersion = 0x1
PayloadV2 PayloadVersion = 0x2
Expand Down Expand Up @@ -181,6 +187,12 @@ type ForkchoiceStateV1 struct {
FinalizedBlockHash common.Hash `json:"finalizedBlockHash"`
}

type OpSealPayloadResponse struct {
ErrStage string `json:"errStage"`
PayloadStatus PayloadStatusV1 `json:"payloadStatus"`
Payload *ExecutionPayloadEnvelope `json:"payload"`
}

func encodeTransactions(txs []*types.Transaction) [][]byte {
var enc = make([][]byte, len(txs))
for i, tx := range txs {
Expand Down
12 changes: 9 additions & 3 deletions consensus/beacon/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,11 +399,17 @@ func (beacon *Beacon) FinalizeAndAssemble(chain consensus.ChainHeaderReader, hea
// Finalize and assemble the block.
beacon.Finalize(chain, header, state, txs, uncles, withdrawals)

// Assign the final state root to header.
header.Root = state.IntermediateRoot(true)
rootCh := make(chan common.Hash)
go func() {
rootCh <- state.IntermediateRoot(true)
}()

block := types.NewBlockWithWithdrawals(header, txs, uncles, receipts, withdrawals, trie.NewStackTrie(nil))
headerWithRoot := block.Header()
headerWithRoot.Root = <-rootCh

// Assemble and return the final block.
return types.NewBlockWithWithdrawals(header, txs, uncles, receipts, withdrawals, trie.NewStackTrie(nil)), nil
return block.WithSeal(headerWithRoot), nil
}

// Seal generates a new sealing request for the given input block and pushes
Expand Down
8 changes: 5 additions & 3 deletions core/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error {

// ValidateState validates the various changes that happen after a state transition,
// such as amount of used gas, the receipt roots and the state root itself.
func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateDB, receipts types.Receipts, usedGas uint64) error {
func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateDB, receipts types.Receipts, usedGas uint64, skipRoot bool) error {
header := block.Header()
if block.GasUsed() != usedGas {
return fmt.Errorf("invalid gas used (remote: %d local: %d)", block.GasUsed(), usedGas)
Expand All @@ -186,14 +186,16 @@ func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateD
}
return nil
},
func() error {
}
if !skipRoot {
validateFuns = append(validateFuns, func() error {
// Validate the state root against the received state root and throw
// an error if they don't match.
if root := statedb.IntermediateRoot(v.config.IsEIP158(header.Number)); header.Root != root {
return fmt.Errorf("invalid merkle root (remote: %x local: %x) dberr: %w", header.Root, root, statedb.Error())
}
return nil
},
})
}
validateRes := make(chan error, len(validateFuns))
for _, f := range validateFuns {
Expand Down
73 changes: 66 additions & 7 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1142,6 +1142,13 @@ func (bc *BlockChain) procFutureBlocks() {
}
}

// CacheBlock cache block in memory
func (bc *BlockChain) CacheBlock(hash common.Hash, block *types.Block) {
bc.hc.numberCache.Add(hash, block.NumberU64())
bc.hc.headerCache.Add(hash, block.Header())
bc.blockCache.Add(hash, block)
}

// CacheMiningReceipts cache receipts in memory
func (bc *BlockChain) CacheMiningReceipts(hash common.Hash, receipts types.Receipts) {
bc.miningReceiptsCache.Add(hash, receipts)
Expand Down Expand Up @@ -1669,6 +1676,15 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
return 0, nil
}

minerMode := false
if len(chain) == 1 {
block := chain[0]
_, receiptExist := bc.miningReceiptsCache.Get(block.Hash())
_, logExist := bc.miningTxLogsCache.Get(block.Hash())
_, stateExist := bc.miningStateCache.Get(block.Hash())
minerMode = receiptExist && logExist && stateExist
}

// Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss)
SenderCacher.RecoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number(), chain[0].Time()), chain)

Expand All @@ -1692,7 +1708,14 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)

// Peek the error for the first block to decide the directing import logic
it := newInsertIterator(chain, results, bc.validator)
block, err := it.next()
var block *types.Block
var err error
if minerMode {
block = chain[0]
it.index = 0
} else {
block, err = it.next()
}

// Left-trim all the known blocks that don't need to build snapshot
if bc.skipBlock(err, it) {
Expand Down Expand Up @@ -1908,11 +1931,28 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
ptime := time.Since(pstart)

vstart := time.Now()
if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil {
bc.reportBlock(block, receipts, err)
followupInterrupt.Store(true)
return it.index, err
// Async validate if minerMode
asyncValidateStateCh := make(chan error, 1)
if minerMode {
header := block.Header()
// Can not validate root concurrently
if root := statedb.IntermediateRoot(bc.chainConfig.IsEIP158(header.Number)); header.Root != root {
err := fmt.Errorf("self mined block(hash: %x number %v) verify root err(mined: %x expected: %x) dberr: %w", block.Hash(), block.NumberU64(), header.Root, root, statedb.Error())
bc.reportBlock(block, receipts, err)
followupInterrupt.Store(true)
return it.index, err
}
go func() {
asyncValidateStateCh <- bc.validator.ValidateState(block, statedb, receipts, usedGas, true)
}()
} else {
if err := bc.validator.ValidateState(block, statedb, receipts, usedGas, false); err != nil {
bc.reportBlock(block, receipts, err)
followupInterrupt.Store(true)
return it.index, err
}
}

vtime := time.Since(vstart)
proctime := time.Since(start) // processing + validation

Expand Down Expand Up @@ -1947,6 +1987,13 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
if err != nil {
return it.index, err
}
if minerMode {
if err := <-asyncValidateStateCh; err != nil {
panic(fmt.Errorf("self mined block(hash: %x number %v) async verify state err: %w", block.Hash(), block.NumberU64(), err))
}
}
bc.CacheBlock(block.Hash(), block)

// Update the metrics touched during block commit
accountCommitTimer.Update(statedb.AccountCommits) // Account commits are complete, we can mark them
storageCommitTimer.Update(statedb.StorageCommits) // Storage commits are complete, we can mark them
Expand All @@ -1968,7 +2015,12 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
if bc.snaps != nil {
snapDiffItems, snapBufItems = bc.snaps.Size()
}
trieDiffNodes, trieBufNodes, trieImmutableBufNodes, _ := bc.triedb.Size()
var trieDiffNodes common.StorageSize = 0
var trieBufNodes common.StorageSize = 0
var trieImmutableBufNodes common.StorageSize = 0
if !minerMode {
trieDiffNodes, trieBufNodes, trieImmutableBufNodes, _ = bc.triedb.Size()
}
stats.report(chain, it.index, snapDiffItems, snapBufItems, trieDiffNodes, trieBufNodes, trieImmutableBufNodes, setHead)
blockGasUsedGauge.Update(int64(block.GasUsed()) / 1000000)

Expand Down Expand Up @@ -2450,10 +2502,17 @@ func (bc *BlockChain) SetCanonical(head *types.Block) (common.Hash, error) {
return common.Hash{}, err
}
}
bc.writeHeadBlock(head)

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
bc.writeHeadBlock(head)
}()
// Emit events
logs := bc.collectLogs(head, false)
wg.Wait()

bc.chainFeed.Send(ChainEvent{Block: head, Hash: head.Hash(), Logs: logs})
if len(logs) > 0 {
bc.logsFeed.Send(logs)
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error {
blockchain.reportBlock(block, receipts, err)
return err
}
err = blockchain.validator.ValidateState(block, statedb, receipts, usedGas)
err = blockchain.validator.ValidateState(block, statedb, receipts, usedGas, false)
if err != nil {
blockchain.reportBlock(block, receipts, err)
return err
Expand Down
2 changes: 1 addition & 1 deletion core/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type Validator interface {

// ValidateState validates the given statedb and optionally the receipts and
// gas used.
ValidateState(block *types.Block, state *state.StateDB, receipts types.Receipts, usedGas uint64) error
ValidateState(block *types.Block, state *state.StateDB, receipts types.Receipts, usedGas uint64, skipRoot bool) error
}

// Prefetcher is an interface for pre-caching transaction signatures and state.
Expand Down
75 changes: 71 additions & 4 deletions eth/catalyst/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ var (
forkchoiceUpdateHeadsTimer = metrics.NewRegisteredTimer("api/engine/forkchoiceUpdate/heads", nil)
getPayloadTimer = metrics.NewRegisteredTimer("api/engine/get/payload", nil)
newPayloadTimer = metrics.NewRegisteredTimer("api/engine/new/payload", nil)
sealPayloadTimer = metrics.NewRegisteredTimer("api/engine/seal/payload", nil)
)

// Register adds the engine API to the full node.
Expand Down Expand Up @@ -99,6 +100,8 @@ var caps = []string{
"engine_getPayloadBodiesByHashV1",
"engine_getPayloadBodiesByRangeV1",
"engine_getClientVersionV1",
"engine_opSealPayloadV2",
"engine_opSealPayloadV3",
}

type ConsensusAPI struct {
Expand Down Expand Up @@ -602,11 +605,17 @@ func (api *ConsensusAPI) newPayload(params engine.ExecutableData, versionedHashe
defer api.newPayloadLock.Unlock()

log.Trace("Engine API request received", "method", "NewPayload", "number", params.Number, "hash", params.BlockHash)
block, err := engine.ExecutableDataToBlock(params, versionedHashes, beaconRoot)
if err != nil {
log.Warn("Invalid NewPayload params", "params", params, "error", err)
return api.invalid(err, nil), nil

block := api.localBlocks.getBlockByHash(params.BlockHash)
if block == nil {
var err error
block, err = engine.ExecutableDataToBlock(params, versionedHashes, beaconRoot)
if err != nil {
log.Warn("Invalid NewPayload params", "params", params, "error", err)
return api.invalid(err, nil), nil
}
}

// Stash away the last update to warn the user if the beacon client goes offline
api.lastNewPayloadLock.Lock()
api.lastNewPayloadUpdate = time.Now()
Expand Down Expand Up @@ -691,6 +700,64 @@ func (api *ConsensusAPI) newPayload(params engine.ExecutableData, versionedHashe
return engine.PayloadStatusV1{Status: engine.VALID, LatestValidHash: &hash}, nil
}

// OpSealPayloadV2 is combination API of payload sealing: getPayload, newPayload, forkchoiceUpdated.
func (api *ConsensusAPI) OpSealPayloadV2(payloadID engine.PayloadID, update engine.ForkchoiceStateV1, needPayload bool) (engine.OpSealPayloadResponse, error) {
return api.opSealPayload(payloadID, update, needPayload, "V2")
}

// OpSealPayloadV3 is combination API of payload sealing: getPayload, newPayload, forkchoiceUpdated.
func (api *ConsensusAPI) OpSealPayloadV3(payloadID engine.PayloadID, update engine.ForkchoiceStateV1, needPayload bool) (engine.OpSealPayloadResponse, error) {
return api.opSealPayload(payloadID, update, needPayload, "V3")
}

func (api *ConsensusAPI) opSealPayload(payloadID engine.PayloadID, update engine.ForkchoiceStateV1, needPayload bool, version string) (engine.OpSealPayloadResponse, error) {
start := time.Now()
defer func() {
sealPayloadTimer.UpdateSince(start)
log.Debug("sealPayloadTimer", "duration", common.PrettyDuration(time.Since(start)), "payloadID", payloadID)
}()
var payloadEnvelope *engine.ExecutionPayloadEnvelope
var err error
if version == "V2" {
payloadEnvelope, err = api.GetPayloadV2(payloadID)
} else if version == "V3" {
payloadEnvelope, err = api.GetPayloadV3(payloadID)
} else {
return engine.OpSealPayloadResponse{ErrStage: engine.GetPayloadStage}, engine.UnsupportedFork.With(errors.New("invalid engine api version"))
}
if err != nil {
log.Error("Seal payload error when get payload", "error", err, "payloadID", payloadID)
return engine.OpSealPayloadResponse{ErrStage: engine.GetPayloadStage}, err
}

var payloadStatus engine.PayloadStatusV1
if version == "V2" {
payloadStatus, err = api.NewPayloadV2(*payloadEnvelope.ExecutionPayload)
} else if version == "V3" {
payloadStatus, err = api.NewPayloadV3(*payloadEnvelope.ExecutionPayload, []common.Hash{}, payloadEnvelope.ParentBeaconBlockRoot)
} else {
return engine.OpSealPayloadResponse{ErrStage: engine.NewPayloadStage}, engine.UnsupportedFork.With(errors.New("invalid engine api version"))
}
if err != nil || payloadStatus.Status != engine.VALID {
log.Error("Seal payload error when new payload", "error", err, "payloadStatus", payloadStatus)
return engine.OpSealPayloadResponse{ErrStage: engine.NewPayloadStage, PayloadStatus: payloadStatus}, err
}

update.HeadBlockHash = payloadEnvelope.ExecutionPayload.BlockHash
updateResponse, err := api.ForkchoiceUpdatedV3(update, nil)
if err != nil || updateResponse.PayloadStatus.Status != engine.VALID {
log.Error("Seal payload error when forkchoiceUpdated", "error", err, "payloadStatus", updateResponse.PayloadStatus)
return engine.OpSealPayloadResponse{ErrStage: engine.ForkchoiceUpdatedStage, PayloadStatus: updateResponse.PayloadStatus}, err
}

log.Info("opSealPayload succeed", "hash", payloadEnvelope.ExecutionPayload.BlockHash, "number", payloadEnvelope.ExecutionPayload.Number, "id", payloadID, "payloadStatus", updateResponse.PayloadStatus)
if needPayload {
return engine.OpSealPayloadResponse{PayloadStatus: updateResponse.PayloadStatus, Payload: payloadEnvelope}, nil
} else {
return engine.OpSealPayloadResponse{PayloadStatus: updateResponse.PayloadStatus}, nil
}
}

// delayPayloadImport stashes the given block away for import at a later time,
// either via a forkchoice update or a sync extension. This method is meant to
// be called by the newpayload command when the block seems to be ok, but some
Expand Down
17 changes: 17 additions & 0 deletions eth/catalyst/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,23 @@ func (q *payloadQueue) has(id engine.PayloadID) bool {
return false
}

// getBlock retrieves block from a previously stored payload or nil if it does not exist.
func (q *payloadQueue) getBlockByHash(hash common.Hash) *types.Block {
q.lock.RLock()
defer q.lock.RUnlock()

for _, item := range q.payloads {
if item == nil {
return nil
}
block := item.payload.GetBlock()
if block != nil && block.Hash() == hash {
return block
}
}
return nil
}

// headerQueueItem represents an hash->header tuple to store until it's retrieved
// or evicted.
type headerQueueItem struct {
Expand Down
9 changes: 9 additions & 0 deletions miner/payload_building.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,15 @@ func (payload *Payload) resolve(onlyFull bool) *engine.ExecutionPayloadEnvelope
return nil
}

func (payload *Payload) GetBlock() *types.Block {
if payload.full != nil {
return payload.full
} else if payload.empty != nil {
return payload.empty
}
return nil
}

// interruptBuilding sets an interrupt for a potentially ongoing
// block building process.
// This will prevent it from adding new transactions to the block, and if it is
Expand Down
Loading