From 6b588da3e7b77f70a199523014eccbec85b85639 Mon Sep 17 00:00:00 2001 From: bnoieh <135800952+bnoieh@users.noreply.github.com> Date: Mon, 30 Sep 2024 17:49:22 +0800 Subject: [PATCH 01/10] feat: add combined engine api and perf opt --- beacon/engine/types.go | 12 ++++++++++ consensus/beacon/consensus.go | 12 +++++++--- core/blockchain.go | 45 ++++++++++++++++++++++++++++++----- eth/catalyst/api.go | 45 +++++++++++++++++++++++++++++++---- eth/catalyst/queue.go | 17 +++++++++++++ miner/payload_building.go | 9 +++++++ 6 files changed, 127 insertions(+), 13 deletions(-) diff --git a/beacon/engine/types.go b/beacon/engine/types.go index 9a3ea8d077..e5843c4cc1 100644 --- a/beacon/engine/types.go +++ b/beacon/engine/types.go @@ -30,6 +30,12 @@ import ( // building of the payload to commence. type PayloadVersion byte +const ( + GetPayloadStage = "getPayload" + NewPayloadStage = "newPayload" + ForkchoiceUpdate = "forkchoiceUpdate" +) + var ( PayloadV1 PayloadVersion = 0x1 PayloadV2 PayloadVersion = 0x2 @@ -181,6 +187,12 @@ type ForkchoiceStateV1 struct { FinalizedBlockHash common.Hash `json:"finalizedBlockHash"` } +type OpSealPayloadResponse struct { + Stage string `json:"stage"` + PayloadStatus PayloadStatusV1 `json:"payloadStatus"` + Payload *ExecutionPayloadEnvelope `json:"payload"` +} + func encodeTransactions(txs []*types.Transaction) [][]byte { var enc = make([][]byte, len(txs)) for i, tx := range txs { diff --git a/consensus/beacon/consensus.go b/consensus/beacon/consensus.go index ba4980b8d4..64eccaf31b 100644 --- a/consensus/beacon/consensus.go +++ b/consensus/beacon/consensus.go @@ -399,11 +399,17 @@ func (beacon *Beacon) FinalizeAndAssemble(chain consensus.ChainHeaderReader, hea // Finalize and assemble the block. beacon.Finalize(chain, header, state, txs, uncles, withdrawals) - // Assign the final state root to header. - header.Root = state.IntermediateRoot(true) + rootCh := make(chan common.Hash) + go func() { + rootCh <- state.IntermediateRoot(true) + }() + + block := types.NewBlockWithWithdrawals(header, txs, uncles, receipts, withdrawals, trie.NewStackTrie(nil)) + headerWithRoot := block.Header() + headerWithRoot.Root = <-rootCh // Assemble and return the final block. - return types.NewBlockWithWithdrawals(header, txs, uncles, receipts, withdrawals, trie.NewStackTrie(nil)), nil + return block.WithSeal(headerWithRoot), nil } // Seal generates a new sealing request for the given input block and pushes diff --git a/core/blockchain.go b/core/blockchain.go index 7e4b81b153..bb9c685441 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1141,6 +1141,13 @@ func (bc *BlockChain) procFutureBlocks() { } } +// CacheBlock cache block in memory +func (bc *BlockChain) CacheBlock(hash common.Hash, block *types.Block) { + bc.hc.numberCache.Add(hash, block.NumberU64()) + bc.hc.headerCache.Add(hash, block.Header()) + bc.blockCache.Add(hash, block) +} + // CacheMiningReceipts cache receipts in memory func (bc *BlockChain) CacheMiningReceipts(hash common.Hash, receipts types.Receipts) { bc.miningReceiptsCache.Add(hash, receipts) @@ -1668,6 +1675,15 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) return 0, nil } + minerMode := false + if len(chain) == 1 { + block := chain[0] + _, receiptExist := bc.miningReceiptsCache.Get(block.Hash()) + _, logExist := bc.miningTxLogsCache.Get(block.Hash()) + _, stateExist := bc.miningStateCache.Get(block.Hash()) + minerMode = receiptExist && logExist && stateExist + } + // Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss) SenderCacher.RecoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number(), chain[0].Time()), chain) @@ -1691,7 +1707,12 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) // Peek the error for the first block to decide the directing import logic it := newInsertIterator(chain, results, bc.validator) - block, err := it.next() + + block := chain[0] + var err error + if !minerMode { + block, err = it.next() + } // Left-trim all the known blocks that don't need to build snapshot if bc.skipBlock(err, it) { @@ -1907,11 +1928,14 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) ptime := time.Since(pstart) vstart := time.Now() - if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil { - bc.reportBlock(block, receipts, err) - followupInterrupt.Store(true) - return it.index, err + if !minerMode { + if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil { + bc.reportBlock(block, receipts, err) + followupInterrupt.Store(true) + return it.index, err + } } + vtime := time.Since(vstart) proctime := time.Since(start) // processing + validation @@ -1946,6 +1970,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) if err != nil { return it.index, err } + bc.CacheBlock(block.Hash(), block) + // Update the metrics touched during block commit accountCommitTimer.Update(statedb.AccountCommits) // Account commits are complete, we can mark them storageCommitTimer.Update(statedb.StorageCommits) // Storage commits are complete, we can mark them @@ -2449,10 +2475,17 @@ func (bc *BlockChain) SetCanonical(head *types.Block) (common.Hash, error) { return common.Hash{}, err } } - bc.writeHeadBlock(head) + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + bc.writeHeadBlock(head) + }() // Emit events logs := bc.collectLogs(head, false) + wg.Wait() + bc.chainFeed.Send(ChainEvent{Block: head, Hash: head.Hash(), Logs: logs}) if len(logs) > 0 { bc.logsFeed.Send(logs) diff --git a/eth/catalyst/api.go b/eth/catalyst/api.go index e8703f7532..e628745f18 100644 --- a/eth/catalyst/api.go +++ b/eth/catalyst/api.go @@ -99,6 +99,7 @@ var caps = []string{ "engine_getPayloadBodiesByHashV1", "engine_getPayloadBodiesByRangeV1", "engine_getClientVersionV1", + "engine_opSealPayload", } type ConsensusAPI struct { @@ -602,11 +603,17 @@ func (api *ConsensusAPI) newPayload(params engine.ExecutableData, versionedHashe defer api.newPayloadLock.Unlock() log.Trace("Engine API request received", "method", "NewPayload", "number", params.Number, "hash", params.BlockHash) - block, err := engine.ExecutableDataToBlock(params, versionedHashes, beaconRoot) - if err != nil { - log.Warn("Invalid NewPayload params", "params", params, "error", err) - return api.invalid(err, nil), nil + + block := api.localBlocks.getBlockByHash(params.BlockHash) + if block == nil { + var err error + block, err = engine.ExecutableDataToBlock(params, versionedHashes, beaconRoot) + if err != nil { + log.Warn("Invalid NewPayload params", "params", params, "error", err) + return api.invalid(err, nil), nil + } } + // Stash away the last update to warn the user if the beacon client goes offline api.lastNewPayloadLock.Lock() api.lastNewPayloadUpdate = time.Now() @@ -691,6 +698,36 @@ func (api *ConsensusAPI) newPayload(params engine.ExecutableData, versionedHashe return engine.PayloadStatusV1{Status: engine.VALID, LatestValidHash: &hash}, nil } +// OpSealPayload is combination API of payload sealing: getPayload, newPayload, forkchoiceUpdated. +// TODO add API version +func (api *ConsensusAPI) OpSealPayload(payloadID engine.PayloadID, update engine.ForkchoiceStateV1) (engine.OpSealPayloadResponse, error) { + return api.opSealPayload(payloadID, update) +} + +func (api *ConsensusAPI) opSealPayload(payloadID engine.PayloadID, update engine.ForkchoiceStateV1) (engine.OpSealPayloadResponse, error) { + payloadEnvelope, err := api.getPayload(payloadID, false) + if err != nil { + log.Error("Seal payload error when get payload", "error", err, "payloadID", payloadID) + return engine.OpSealPayloadResponse{Stage: engine.GetPayloadStage}, err + } + + payloadStatus, err := api.newPayload(*payloadEnvelope.ExecutionPayload, []common.Hash{}, payloadEnvelope.ParentBeaconBlockRoot) + if err != nil || payloadStatus.Status != engine.VALID { + log.Error("Seal payload error when new payload", "error", err, "payloadStatus", payloadStatus) + return engine.OpSealPayloadResponse{Stage: engine.NewPayloadStage, PayloadStatus: payloadStatus}, err + } + + // TODO check update input + updateResponse, err := api.forkchoiceUpdated(update, nil, engine.PayloadV3, false) + if err != nil || updateResponse.PayloadStatus.Status != engine.VALID { + log.Error("Seal payload error when forkchoiceUpdated", "error", err, "payloadStatus", updateResponse.PayloadStatus) + return engine.OpSealPayloadResponse{Stage: engine.ForkchoiceUpdate, PayloadStatus: updateResponse.PayloadStatus}, err + } + + log.Info("Seal payload succeed", "payloadStatus", updateResponse.PayloadStatus) + return engine.OpSealPayloadResponse{PayloadStatus: updateResponse.PayloadStatus, Payload: payloadEnvelope}, nil +} + // delayPayloadImport stashes the given block away for import at a later time, // either via a forkchoice update or a sync extension. This method is meant to // be called by the newpayload command when the block seems to be ok, but some diff --git a/eth/catalyst/queue.go b/eth/catalyst/queue.go index d42904843b..b9c02f2283 100644 --- a/eth/catalyst/queue.go +++ b/eth/catalyst/queue.go @@ -126,6 +126,23 @@ func (q *payloadQueue) has(id engine.PayloadID) bool { return false } +// getBlock retrieves block from a previously stored payload or nil if it does not exist. +func (q *payloadQueue) getBlockByHash(hash common.Hash) *types.Block { + q.lock.RLock() + defer q.lock.RUnlock() + + for _, item := range q.payloads { + if item == nil { + return nil + } + block := item.payload.GetBlock() + if block.Hash() == hash { + return block + } + } + return nil +} + // headerQueueItem represents an hash->header tuple to store until it's retrieved // or evicted. type headerQueueItem struct { diff --git a/miner/payload_building.go b/miner/payload_building.go index f65b0f0fe4..6a17a03a3d 100644 --- a/miner/payload_building.go +++ b/miner/payload_building.go @@ -233,6 +233,15 @@ func (payload *Payload) resolve(onlyFull bool) *engine.ExecutionPayloadEnvelope return nil } +func (payload *Payload) GetBlock() *types.Block { + if payload.full != nil { + return payload.full + } else if payload.empty != nil { + return payload.empty + } + return nil +} + // interruptBuilding sets an interrupt for a potentially ongoing // block building process. // This will prevent it from adding new transactions to the block, and if it is From 6720428696cf127d10c4e333f00d66b1e5de88a5 Mon Sep 17 00:00:00 2001 From: bnoieh <135800952+bnoieh@users.noreply.github.com> Date: Wed, 9 Oct 2024 15:39:01 +0800 Subject: [PATCH 02/10] fix: should update HeadBlockHash when forkchoiceUpdated in OpSealPayload API --- eth/catalyst/api.go | 1 + 1 file changed, 1 insertion(+) diff --git a/eth/catalyst/api.go b/eth/catalyst/api.go index e628745f18..d9f7a035b1 100644 --- a/eth/catalyst/api.go +++ b/eth/catalyst/api.go @@ -718,6 +718,7 @@ func (api *ConsensusAPI) opSealPayload(payloadID engine.PayloadID, update engine } // TODO check update input + update.HeadBlockHash = payloadEnvelope.ExecutionPayload.BlockHash updateResponse, err := api.forkchoiceUpdated(update, nil, engine.PayloadV3, false) if err != nil || updateResponse.PayloadStatus.Status != engine.VALID { log.Error("Seal payload error when forkchoiceUpdated", "error", err, "payloadStatus", updateResponse.PayloadStatus) From fb6c67f54dd64ce0f3ce7a05ae88c01e904438f8 Mon Sep 17 00:00:00 2001 From: bnoieh <135800952+bnoieh@users.noreply.github.com> Date: Fri, 11 Oct 2024 17:24:46 +0800 Subject: [PATCH 03/10] feat: add needPayload option to engine_opSealPayload API --- eth/catalyst/api.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/eth/catalyst/api.go b/eth/catalyst/api.go index d9f7a035b1..b6560aa8bc 100644 --- a/eth/catalyst/api.go +++ b/eth/catalyst/api.go @@ -700,11 +700,12 @@ func (api *ConsensusAPI) newPayload(params engine.ExecutableData, versionedHashe // OpSealPayload is combination API of payload sealing: getPayload, newPayload, forkchoiceUpdated. // TODO add API version -func (api *ConsensusAPI) OpSealPayload(payloadID engine.PayloadID, update engine.ForkchoiceStateV1) (engine.OpSealPayloadResponse, error) { - return api.opSealPayload(payloadID, update) +func (api *ConsensusAPI) OpSealPayload(payloadID engine.PayloadID, update engine.ForkchoiceStateV1, needPayload bool) (engine.OpSealPayloadResponse, error) { + return api.opSealPayload(payloadID, update, needPayload) } -func (api *ConsensusAPI) opSealPayload(payloadID engine.PayloadID, update engine.ForkchoiceStateV1) (engine.OpSealPayloadResponse, error) { +func (api *ConsensusAPI) opSealPayload(payloadID engine.PayloadID, update engine.ForkchoiceStateV1, needPayload bool) (engine.OpSealPayloadResponse, error) { + start := time.Now() payloadEnvelope, err := api.getPayload(payloadID, false) if err != nil { log.Error("Seal payload error when get payload", "error", err, "payloadID", payloadID) @@ -726,7 +727,12 @@ func (api *ConsensusAPI) opSealPayload(payloadID engine.PayloadID, update engine } log.Info("Seal payload succeed", "payloadStatus", updateResponse.PayloadStatus) - return engine.OpSealPayloadResponse{PayloadStatus: updateResponse.PayloadStatus, Payload: payloadEnvelope}, nil + log.Info("perf-trace opSealPayload", "duration", common.PrettyDuration(time.Since(start)), "hash", payloadEnvelope.ExecutionPayload.BlockHash, "number", payloadEnvelope.ExecutionPayload.Number, "id", payloadID) + if needPayload { + return engine.OpSealPayloadResponse{PayloadStatus: updateResponse.PayloadStatus, Payload: payloadEnvelope}, nil + } else { + return engine.OpSealPayloadResponse{PayloadStatus: updateResponse.PayloadStatus}, nil + } } // delayPayloadImport stashes the given block away for import at a later time, From 6700b99342a8f840e39ccfc6f6a7a960a9f80537 Mon Sep 17 00:00:00 2001 From: bnoieh <135800952+bnoieh@users.noreply.github.com> Date: Fri, 18 Oct 2024 16:15:48 +0800 Subject: [PATCH 04/10] perf: add new api engine_opSealPayload --- beacon/engine/types.go | 4 ++-- eth/catalyst/api.go | 48 +++++++++++++++++++++++++++++------------- 2 files changed, 35 insertions(+), 17 deletions(-) diff --git a/beacon/engine/types.go b/beacon/engine/types.go index e5843c4cc1..960c68d5e9 100644 --- a/beacon/engine/types.go +++ b/beacon/engine/types.go @@ -33,7 +33,7 @@ type PayloadVersion byte const ( GetPayloadStage = "getPayload" NewPayloadStage = "newPayload" - ForkchoiceUpdate = "forkchoiceUpdate" + ForkchoiceUpdatedStage = "forkchoiceUpdated" ) var ( @@ -188,7 +188,7 @@ type ForkchoiceStateV1 struct { } type OpSealPayloadResponse struct { - Stage string `json:"stage"` + ErrStage string `json:"errStage"` PayloadStatus PayloadStatusV1 `json:"payloadStatus"` Payload *ExecutionPayloadEnvelope `json:"payload"` } diff --git a/eth/catalyst/api.go b/eth/catalyst/api.go index b6560aa8bc..b0b3d2e3d9 100644 --- a/eth/catalyst/api.go +++ b/eth/catalyst/api.go @@ -99,7 +99,8 @@ var caps = []string{ "engine_getPayloadBodiesByHashV1", "engine_getPayloadBodiesByRangeV1", "engine_getClientVersionV1", - "engine_opSealPayload", + "engine_opSealPayloadV2", + "engine_opSealPayloadV3", } type ConsensusAPI struct { @@ -698,36 +699,53 @@ func (api *ConsensusAPI) newPayload(params engine.ExecutableData, versionedHashe return engine.PayloadStatusV1{Status: engine.VALID, LatestValidHash: &hash}, nil } -// OpSealPayload is combination API of payload sealing: getPayload, newPayload, forkchoiceUpdated. -// TODO add API version -func (api *ConsensusAPI) OpSealPayload(payloadID engine.PayloadID, update engine.ForkchoiceStateV1, needPayload bool) (engine.OpSealPayloadResponse, error) { - return api.opSealPayload(payloadID, update, needPayload) +// OpSealPayloadV2 is combination API of payload sealing: getPayload, newPayload, forkchoiceUpdated. +func (api *ConsensusAPI) OpSealPayloadV2(payloadID engine.PayloadID, update engine.ForkchoiceStateV1, needPayload bool) (engine.OpSealPayloadResponse, error) { + return api.opSealPayload(payloadID, update, needPayload, "V2") } -func (api *ConsensusAPI) opSealPayload(payloadID engine.PayloadID, update engine.ForkchoiceStateV1, needPayload bool) (engine.OpSealPayloadResponse, error) { +// OpSealPayloadV3 is combination API of payload sealing: getPayload, newPayload, forkchoiceUpdated. +func (api *ConsensusAPI) OpSealPayloadV3(payloadID engine.PayloadID, update engine.ForkchoiceStateV1, needPayload bool) (engine.OpSealPayloadResponse, error) { + return api.opSealPayload(payloadID, update, needPayload, "V3") +} + +func (api *ConsensusAPI) opSealPayload(payloadID engine.PayloadID, update engine.ForkchoiceStateV1, needPayload bool, version string) (engine.OpSealPayloadResponse, error) { start := time.Now() - payloadEnvelope, err := api.getPayload(payloadID, false) + var payloadEnvelope *engine.ExecutionPayloadEnvelope + var err error + if version == "V2" { + payloadEnvelope, err = api.GetPayloadV2(payloadID) + } else if version == "V3" { + payloadEnvelope, err = api.GetPayloadV3(payloadID) + } else { + return engine.OpSealPayloadResponse{ErrStage: engine.GetPayloadStage}, engine.UnsupportedFork.With(errors.New("invalid engine api version")) + } if err != nil { log.Error("Seal payload error when get payload", "error", err, "payloadID", payloadID) - return engine.OpSealPayloadResponse{Stage: engine.GetPayloadStage}, err + return engine.OpSealPayloadResponse{ErrStage: engine.GetPayloadStage}, err } - payloadStatus, err := api.newPayload(*payloadEnvelope.ExecutionPayload, []common.Hash{}, payloadEnvelope.ParentBeaconBlockRoot) + var payloadStatus engine.PayloadStatusV1 + if version == "V2" { + payloadStatus, err = api.NewPayloadV2(*payloadEnvelope.ExecutionPayload) + } else if version == "V3" { + payloadStatus, err = api.NewPayloadV3(*payloadEnvelope.ExecutionPayload, []common.Hash{}, payloadEnvelope.ParentBeaconBlockRoot) + } else { + return engine.OpSealPayloadResponse{ErrStage: engine.NewPayloadStage}, engine.UnsupportedFork.With(errors.New("invalid engine api version")) + } if err != nil || payloadStatus.Status != engine.VALID { log.Error("Seal payload error when new payload", "error", err, "payloadStatus", payloadStatus) - return engine.OpSealPayloadResponse{Stage: engine.NewPayloadStage, PayloadStatus: payloadStatus}, err + return engine.OpSealPayloadResponse{ErrStage: engine.NewPayloadStage, PayloadStatus: payloadStatus}, err } - // TODO check update input update.HeadBlockHash = payloadEnvelope.ExecutionPayload.BlockHash - updateResponse, err := api.forkchoiceUpdated(update, nil, engine.PayloadV3, false) + updateResponse, err := api.ForkchoiceUpdatedV3(update, nil) if err != nil || updateResponse.PayloadStatus.Status != engine.VALID { log.Error("Seal payload error when forkchoiceUpdated", "error", err, "payloadStatus", updateResponse.PayloadStatus) - return engine.OpSealPayloadResponse{Stage: engine.ForkchoiceUpdate, PayloadStatus: updateResponse.PayloadStatus}, err + return engine.OpSealPayloadResponse{ErrStage: engine.ForkchoiceUpdatedStage, PayloadStatus: updateResponse.PayloadStatus}, err } - log.Info("Seal payload succeed", "payloadStatus", updateResponse.PayloadStatus) - log.Info("perf-trace opSealPayload", "duration", common.PrettyDuration(time.Since(start)), "hash", payloadEnvelope.ExecutionPayload.BlockHash, "number", payloadEnvelope.ExecutionPayload.Number, "id", payloadID) + log.Info("perf-trace opSealPayload succeed", "duration", common.PrettyDuration(time.Since(start)), "hash", payloadEnvelope.ExecutionPayload.BlockHash, "number", payloadEnvelope.ExecutionPayload.Number, "id", payloadID, "payloadStatus", updateResponse.PayloadStatus) if needPayload { return engine.OpSealPayloadResponse{PayloadStatus: updateResponse.PayloadStatus, Payload: payloadEnvelope}, nil } else { From 4386b9adaa735635da9e934d0c8c1f2a3c650dd7 Mon Sep 17 00:00:00 2001 From: bnoieh <135800952+bnoieh@users.noreply.github.com> Date: Fri, 18 Oct 2024 18:53:04 +0800 Subject: [PATCH 05/10] perf: optimize mining overheads --- core/block_validator.go | 8 +++++--- core/blockchain.go | 38 ++++++++++++++++++++++++++++++++++---- core/blockchain_test.go | 2 +- core/types.go | 2 +- eth/catalyst/queue.go | 2 +- 5 files changed, 42 insertions(+), 10 deletions(-) diff --git a/core/block_validator.go b/core/block_validator.go index 79839d7176..765a202cbe 100644 --- a/core/block_validator.go +++ b/core/block_validator.go @@ -162,7 +162,7 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error { // ValidateState validates the various changes that happen after a state transition, // such as amount of used gas, the receipt roots and the state root itself. -func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateDB, receipts types.Receipts, usedGas uint64) error { +func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateDB, receipts types.Receipts, usedGas uint64, skipRoot bool) error { header := block.Header() if block.GasUsed() != usedGas { return fmt.Errorf("invalid gas used (remote: %d local: %d)", block.GasUsed(), usedGas) @@ -186,14 +186,16 @@ func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateD } return nil }, - func() error { + } + if !skipRoot { + validateFuns = append(validateFuns, func() error { // Validate the state root against the received state root and throw // an error if they don't match. if root := statedb.IntermediateRoot(v.config.IsEIP158(header.Number)); header.Root != root { return fmt.Errorf("invalid merkle root (remote: %x local: %x) dberr: %w", header.Root, root, statedb.Error()) } return nil - }, + }) } validateRes := make(chan error, len(validateFuns)) for _, f := range validateFuns { diff --git a/core/blockchain.go b/core/blockchain.go index bb9c685441..d311bfe82c 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1708,9 +1708,11 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) // Peek the error for the first block to decide the directing import logic it := newInsertIterator(chain, results, bc.validator) - block := chain[0] + var block *types.Block var err error - if !minerMode { + if minerMode { + block = chain[0] + } else { block, err = it.next() } @@ -1868,6 +1870,15 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) continue } + // Async verify header if minerMode + asyncItNextCh := make(chan error) + if minerMode { + go func() { + _, err := it.next() + asyncItNextCh <- err + }() + } + var ( receipts, receiptExist = bc.miningReceiptsCache.Get(block.Hash()) logs, logExist = bc.miningTxLogsCache.Get(block.Hash()) @@ -1928,8 +1939,19 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) ptime := time.Since(pstart) vstart := time.Now() - if !minerMode { - if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil { + // Async validate if minerMode + asyncValidateStateCh := make(chan error) + if minerMode { + header := block.Header() + // Can not validate root concurrently + if root := statedb.IntermediateRoot(bc.chainConfig.IsEIP158(header.Number)); header.Root != root { + panic(fmt.Errorf("self mined block(hash: %x number %v) verify root err(mined: %x expected: %x) dberr: %w", block.Hash(), block.NumberU64(), header.Root, root, statedb.Error())) + } + go func() { + asyncValidateStateCh <- bc.validator.ValidateState(block, statedb, receipts, usedGas, true) + }() + } else { + if err := bc.validator.ValidateState(block, statedb, receipts, usedGas, false); err != nil { bc.reportBlock(block, receipts, err) followupInterrupt.Store(true) return it.index, err @@ -1970,6 +1992,14 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) if err != nil { return it.index, err } + if minerMode { + if err := <-asyncItNextCh; err != nil { + panic(fmt.Errorf("self mined block(hash: %x number %v) async verify header err: %w", block.Hash(), block.NumberU64(), err)) + } + if err := <-asyncValidateStateCh; err != nil { + panic(fmt.Errorf("self mined block(hash: %x number %v) async verify state err: %w", block.Hash(), block.NumberU64(), err)) + } + } bc.CacheBlock(block.Hash(), block) // Update the metrics touched during block commit diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 22db20a23e..092dc15e89 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -169,7 +169,7 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error { blockchain.reportBlock(block, receipts, err) return err } - err = blockchain.validator.ValidateState(block, statedb, receipts, usedGas) + err = blockchain.validator.ValidateState(block, statedb, receipts, usedGas, false) if err != nil { blockchain.reportBlock(block, receipts, err) return err diff --git a/core/types.go b/core/types.go index 36eb0d1ded..c236058634 100644 --- a/core/types.go +++ b/core/types.go @@ -33,7 +33,7 @@ type Validator interface { // ValidateState validates the given statedb and optionally the receipts and // gas used. - ValidateState(block *types.Block, state *state.StateDB, receipts types.Receipts, usedGas uint64) error + ValidateState(block *types.Block, state *state.StateDB, receipts types.Receipts, usedGas uint64, skipRoot bool) error } // Prefetcher is an interface for pre-caching transaction signatures and state. diff --git a/eth/catalyst/queue.go b/eth/catalyst/queue.go index b9c02f2283..f2a8165ddb 100644 --- a/eth/catalyst/queue.go +++ b/eth/catalyst/queue.go @@ -136,7 +136,7 @@ func (q *payloadQueue) getBlockByHash(hash common.Hash) *types.Block { return nil } block := item.payload.GetBlock() - if block.Hash() == hash { + if block != nil && block.Hash() == hash { return block } } From f8f020016c2c6b4ec1f6e602939aeb0f2deb84fe Mon Sep 17 00:00:00 2001 From: bnoieh <135800952+bnoieh@users.noreply.github.com> Date: Mon, 21 Oct 2024 16:30:24 +0800 Subject: [PATCH 06/10] chore: optimize code --- core/blockchain.go | 11 ++++++++--- eth/catalyst/api.go | 7 ++++++- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index d311bfe82c..fd9d2beba5 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1871,7 +1871,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) } // Async verify header if minerMode - asyncItNextCh := make(chan error) + asyncItNextCh := make(chan error, 1) if minerMode { go func() { _, err := it.next() @@ -1940,12 +1940,14 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) vstart := time.Now() // Async validate if minerMode - asyncValidateStateCh := make(chan error) + asyncValidateStateCh := make(chan error, 1) if minerMode { header := block.Header() // Can not validate root concurrently if root := statedb.IntermediateRoot(bc.chainConfig.IsEIP158(header.Number)); header.Root != root { - panic(fmt.Errorf("self mined block(hash: %x number %v) verify root err(mined: %x expected: %x) dberr: %w", block.Hash(), block.NumberU64(), header.Root, root, statedb.Error())) + bc.reportBlock(block, receipts, fmt.Errorf("self mined block(hash: %x number %v) verify root err(mined: %x expected: %x) dberr: %w", block.Hash(), block.NumberU64(), header.Root, root, statedb.Error())) + followupInterrupt.Store(true) + return 0, err } go func() { asyncValidateStateCh <- bc.validator.ValidateState(block, statedb, receipts, usedGas, true) @@ -1990,6 +1992,9 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) } followupInterrupt.Store(true) if err != nil { + if minerMode { + return 0, err + } return it.index, err } if minerMode { diff --git a/eth/catalyst/api.go b/eth/catalyst/api.go index b0b3d2e3d9..7a6f9fdf59 100644 --- a/eth/catalyst/api.go +++ b/eth/catalyst/api.go @@ -45,6 +45,7 @@ var ( forkchoiceUpdateHeadsTimer = metrics.NewRegisteredTimer("api/engine/forkchoiceUpdate/heads", nil) getPayloadTimer = metrics.NewRegisteredTimer("api/engine/get/payload", nil) newPayloadTimer = metrics.NewRegisteredTimer("api/engine/new/payload", nil) + sealPayloadTimer = metrics.NewRegisteredTimer("api/engine/seal/payload", nil) ) // Register adds the engine API to the full node. @@ -711,6 +712,10 @@ func (api *ConsensusAPI) OpSealPayloadV3(payloadID engine.PayloadID, update engi func (api *ConsensusAPI) opSealPayload(payloadID engine.PayloadID, update engine.ForkchoiceStateV1, needPayload bool, version string) (engine.OpSealPayloadResponse, error) { start := time.Now() + defer func() { + sealPayloadTimer.UpdateSince(start) + log.Debug("sealPayloadTimer", "duration", common.PrettyDuration(time.Since(start)), "payloadID", payloadID) + }() var payloadEnvelope *engine.ExecutionPayloadEnvelope var err error if version == "V2" { @@ -745,7 +750,7 @@ func (api *ConsensusAPI) opSealPayload(payloadID engine.PayloadID, update engine return engine.OpSealPayloadResponse{ErrStage: engine.ForkchoiceUpdatedStage, PayloadStatus: updateResponse.PayloadStatus}, err } - log.Info("perf-trace opSealPayload succeed", "duration", common.PrettyDuration(time.Since(start)), "hash", payloadEnvelope.ExecutionPayload.BlockHash, "number", payloadEnvelope.ExecutionPayload.Number, "id", payloadID, "payloadStatus", updateResponse.PayloadStatus) + log.Info("opSealPayload succeed", "hash", payloadEnvelope.ExecutionPayload.BlockHash, "number", payloadEnvelope.ExecutionPayload.Number, "id", payloadID, "payloadStatus", updateResponse.PayloadStatus) if needPayload { return engine.OpSealPayloadResponse{PayloadStatus: updateResponse.PayloadStatus, Payload: payloadEnvelope}, nil } else { From 1858ca7384f8672bcd8632d0dac72d709a498d0d Mon Sep 17 00:00:00 2001 From: bnoieh <135800952+bnoieh@users.noreply.github.com> Date: Mon, 21 Oct 2024 19:57:52 +0800 Subject: [PATCH 07/10] chore: fix ci --- core/blockchain.go | 21 +-------------------- 1 file changed, 1 insertion(+), 20 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index fd9d2beba5..1133228c28 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1707,14 +1707,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) // Peek the error for the first block to decide the directing import logic it := newInsertIterator(chain, results, bc.validator) - - var block *types.Block - var err error - if minerMode { - block = chain[0] - } else { - block, err = it.next() - } + block, err := it.next() // Left-trim all the known blocks that don't need to build snapshot if bc.skipBlock(err, it) { @@ -1870,15 +1863,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) continue } - // Async verify header if minerMode - asyncItNextCh := make(chan error, 1) - if minerMode { - go func() { - _, err := it.next() - asyncItNextCh <- err - }() - } - var ( receipts, receiptExist = bc.miningReceiptsCache.Get(block.Hash()) logs, logExist = bc.miningTxLogsCache.Get(block.Hash()) @@ -1998,9 +1982,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) return it.index, err } if minerMode { - if err := <-asyncItNextCh; err != nil { - panic(fmt.Errorf("self mined block(hash: %x number %v) async verify header err: %w", block.Hash(), block.NumberU64(), err)) - } if err := <-asyncValidateStateCh; err != nil { panic(fmt.Errorf("self mined block(hash: %x number %v) async verify state err: %w", block.Hash(), block.NumberU64(), err)) } From 6b3454049389d99072b41cdc3ed40effa2e2c323 Mon Sep 17 00:00:00 2001 From: bnoieh <135800952+bnoieh@users.noreply.github.com> Date: Mon, 21 Oct 2024 21:19:28 +0800 Subject: [PATCH 08/10] chore: optimize code --- core/blockchain.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index c3d5c7f5e0..53a9f72039 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1708,7 +1708,14 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) // Peek the error for the first block to decide the directing import logic it := newInsertIterator(chain, results, bc.validator) - block, err := it.next() + var block *types.Block + var err error + if minerMode { + block = chain[0] + it.index = 0 + } else { + block, err = it.next() + } // Left-trim all the known blocks that don't need to build snapshot if bc.skipBlock(err, it) { @@ -1932,7 +1939,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) if root := statedb.IntermediateRoot(bc.chainConfig.IsEIP158(header.Number)); header.Root != root { bc.reportBlock(block, receipts, fmt.Errorf("self mined block(hash: %x number %v) verify root err(mined: %x expected: %x) dberr: %w", block.Hash(), block.NumberU64(), header.Root, root, statedb.Error())) followupInterrupt.Store(true) - return 0, err + return it.index, err } go func() { asyncValidateStateCh <- bc.validator.ValidateState(block, statedb, receipts, usedGas, true) @@ -1977,9 +1984,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) } followupInterrupt.Store(true) if err != nil { - if minerMode { - return 0, err - } return it.index, err } if minerMode { From 075c4d52d137db21c05794cc6643078007dadc7f Mon Sep 17 00:00:00 2001 From: bnoieh <135800952+bnoieh@users.noreply.github.com> Date: Tue, 22 Oct 2024 16:42:18 +0800 Subject: [PATCH 09/10] fix: insertChain validateState return error --- core/blockchain.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/blockchain.go b/core/blockchain.go index 53a9f72039..bd4ec4203f 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1937,7 +1937,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) header := block.Header() // Can not validate root concurrently if root := statedb.IntermediateRoot(bc.chainConfig.IsEIP158(header.Number)); header.Root != root { - bc.reportBlock(block, receipts, fmt.Errorf("self mined block(hash: %x number %v) verify root err(mined: %x expected: %x) dberr: %w", block.Hash(), block.NumberU64(), header.Root, root, statedb.Error())) + err := fmt.Errorf("self mined block(hash: %x number %v) verify root err(mined: %x expected: %x) dberr: %w", block.Hash(), block.NumberU64(), header.Root, root, statedb.Error()) + bc.reportBlock(block, receipts, err) followupInterrupt.Store(true) return it.index, err } From 133f855a14baebbea16a8348b84c7ccb9bd4799e Mon Sep 17 00:00:00 2001 From: bnoieh <135800952+bnoieh@users.noreply.github.com> Date: Tue, 5 Nov 2024 18:12:02 +0800 Subject: [PATCH 10/10] perf: insertChain skip report triedb size in miner mode --- core/blockchain.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/core/blockchain.go b/core/blockchain.go index bd4ec4203f..3307ef374b 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2015,7 +2015,12 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) if bc.snaps != nil { snapDiffItems, snapBufItems = bc.snaps.Size() } - trieDiffNodes, trieBufNodes, trieImmutableBufNodes, _ := bc.triedb.Size() + var trieDiffNodes common.StorageSize = 0 + var trieBufNodes common.StorageSize = 0 + var trieImmutableBufNodes common.StorageSize = 0 + if !minerMode { + trieDiffNodes, trieBufNodes, trieImmutableBufNodes, _ = bc.triedb.Size() + } stats.report(chain, it.index, snapDiffItems, snapBufItems, trieDiffNodes, trieBufNodes, trieImmutableBufNodes, setHead) blockGasUsedGauge.Update(int64(block.GasUsed()) / 1000000)