Skip to content

Commit

Permalink
refactor: add finished block check in GetLatestFinishedBlockNumber, c…
Browse files Browse the repository at this point in the history
…lean up code
  • Loading branch information
MorettiGeorgiev committed Oct 10, 2024
1 parent 9aca2ce commit 0aa6cfb
Show file tree
Hide file tree
Showing 14 changed files with 55 additions and 89 deletions.
2 changes: 0 additions & 2 deletions cmd/rpcdaemon/commands/erigon_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package commands

import (
"context"
"fmt"

"github.com/gateway-fm/cdk-erigon-lib/common"

Expand Down Expand Up @@ -67,7 +66,6 @@ func (api *ErigonImpl) BlockNumber(ctx context.Context, rpcBlockNumPtr *rpc.Bloc
return 0, err
}
default:
fmt.Println("== BlockNumber default ==")
blockNum, err = rpchelper.GetLatestFinishedBlockNumber(tx)
if err != nil {
return 0, err
Expand Down
3 changes: 0 additions & 3 deletions cmd/rpcdaemon/commands/eth_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ func (api *BaseAPI) blockByNumberWithSenders(tx kv.Tx, number uint64) (*types.Bl
if hashErr != nil {
return nil, hashErr
}

return api.blockWithSenders(tx, hash, number)
}

Expand Down Expand Up @@ -265,12 +264,10 @@ func (api *BaseAPI) pendingBlock() *types.Block {
}

func (api *BaseAPI) blockByRPCNumber(number rpc.BlockNumber, tx kv.Tx) (*types.Block, error) {
fmt.Println("== blockByRPCNumber ==")
n, _, _, err := rpchelper.GetBlockNumber(rpc.BlockNumberOrHashWithNumber(number), tx, api.filters)
if err != nil {
return nil, err
}
fmt.Println("== blockByRPCNumber n ==", n)

block, err := api.blockByNumberWithSenders(tx, n)
return block, err
Expand Down
8 changes: 8 additions & 0 deletions cmd/rpcdaemon/commands/eth_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,14 @@ func (api *APIImpl) GetBlockTransactionCountByNumber(ctx context.Context, blockN
if err != nil {
return nil, err
}
latestBlockNumber, err := rpchelper.GetLatestFinishedBlockNumber(tx)
if err != nil {
return nil, err
}
if blockNum > latestBlockNumber {
// (Compatibility) Every other node just returns `null` for when the block does not exist.
return nil, nil
}

_, txAmount, err := api._blockReader.Body(ctx, tx, blockHash, blockNum)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/rpcdaemon/commands/eth_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ var maxGetProofRewindBlockCount uint64 = 1_000
// GetProof is partially implemented; no Storage proofs, and proofs must be for
// blocks within maxGetProofRewindBlockCount blocks of the head.
func (api *APIImpl) GetProof(ctx context.Context, address libcommon.Address, storageKeys []libcommon.Hash, blockNrOrHash rpc.BlockNumberOrHash) (*accounts.AccProofResult, error) {

tx, err := api.db.BeginRo(ctx)
if err != nil {
return nil, err
Expand All @@ -339,7 +340,6 @@ func (api *APIImpl) GetProof(ctx context.Context, address libcommon.Address, sto
return nil, err
}

// TODO: Getting the latest finish block - using the finish stage
latestBlock, err := rpchelper.GetLatestFinishedBlockNumber(tx)
if err != nil {
return nil, err
Expand Down
7 changes: 3 additions & 4 deletions cmd/rpcdaemon/commands/eth_receipts.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ func (api *APIImpl) getLogsV3(ctx context.Context, tx kv.TemporalTx, begin, end
exec.changeBlock(header)
}

// fmt.Printf("txNum=%d, blockNum=%d, txIndex=%d, maxTxNumInBlock=%d,mixTxNumInBlock=%d\n", txNum, blockNum, txIndex, maxTxNumInBlock, minTxNumInBlock)
//fmt.Printf("txNum=%d, blockNum=%d, txIndex=%d, maxTxNumInBlock=%d,mixTxNumInBlock=%d\n", txNum, blockNum, txIndex, maxTxNumInBlock, minTxNumInBlock)
txn, err := api._txnReader.TxnByIdxInBlock(ctx, tx, blockNum, txIndex)
if err != nil {
return nil, err
Expand Down Expand Up @@ -459,8 +459,8 @@ func (api *APIImpl) getLogsV3(ctx context.Context, tx kv.TemporalTx, begin, end
logs = append(logs, filtered...)
}

// stats := api._agg.GetAndResetStats()
// log.Info("Finished", "duration", time.Since(start), "history queries", stats.HistoryQueries, "ef search duration", stats.EfSearchTime)
//stats := api._agg.GetAndResetStats()
//log.Info("Finished", "duration", time.Since(start), "history queries", stats.HistoryQueries, "ef search duration", stats.EfSearchTime)
return logs, nil
}

Expand Down Expand Up @@ -804,7 +804,6 @@ type MapTxNum2BlockNumIter struct {
func MapTxNum2BlockNum(tx kv.Tx, it iter.U64) *MapTxNum2BlockNumIter {
return &MapTxNum2BlockNumIter{tx: tx, it: it, orderAscend: true}
}

func MapDescendTxNum2BlockNum(tx kv.Tx, it iter.U64) *MapTxNum2BlockNumIter {
return &MapTxNum2BlockNumIter{tx: tx, it: it, orderAscend: false}
}
Expand Down
4 changes: 0 additions & 4 deletions cmd/rpcdaemon/commands/eth_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,19 +217,15 @@ func (b *GasPriceOracleBackend) HeaderByNumber(ctx context.Context, number rpc.B
}
return header, nil
}

func (b *GasPriceOracleBackend) BlockByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Block, error) {
return b.baseApi.blockByRPCNumber(number, b.tx)
}

func (b *GasPriceOracleBackend) ChainConfig() *chain.Config {
return b.cc
}

func (b *GasPriceOracleBackend) GetReceipts(ctx context.Context, hash libcommon.Hash) (types.Receipts, error) {
return rawdb.ReadReceiptsByHash(b.tx, hash)
}

func (b *GasPriceOracleBackend) PendingBlockAndReceipts() (*types.Block, types.Receipts) {
return nil, nil
}
10 changes: 5 additions & 5 deletions cmd/rpcdaemon/commands/trace_filtering_zkevm.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (api *TraceAPIImpl) filterV3(ctx context.Context, dbtx kv.TemporalTx, fromB
}
engine := api.engine()

json := jsoniter.ConfigCompatibleWithStandardLibrary
var json = jsoniter.ConfigCompatibleWithStandardLibrary
stream.WriteArrayStart()
first := true
// Execute all transactions in picked blocks
Expand Down Expand Up @@ -151,7 +151,7 @@ func (api *TraceAPIImpl) filterV3(ctx context.Context, dbtx kv.TemporalTx, fromB
if _, ok := toAddresses[lastHeader.Coinbase]; ok || includeAll {
nSeen++
var tr ParityTrace
rewardAction := &RewardTraceAction{}
var rewardAction = &RewardTraceAction{}
rewardAction.Author = lastHeader.Coinbase
rewardAction.RewardType = "block" // nolint: goconst
rewardAction.Value.ToInt().Set(minerReward.ToBig())
Expand Down Expand Up @@ -226,11 +226,11 @@ func (api *TraceAPIImpl) filterV3(ctx context.Context, dbtx kv.TemporalTx, fromB
}
continue
}
if txIndex == -1 { // is system tx
if txIndex == -1 { //is system tx
continue
}
txIndexU64 := uint64(txIndex)
// fmt.Printf("txNum=%d, blockNum=%d, txIndex=%d\n", txNum, blockNum, txIndex)
//fmt.Printf("txNum=%d, blockNum=%d, txIndex=%d\n", txNum, blockNum, txIndex)
txn, err := api._txnReader.TxnByIdxInBlock(ctx, dbtx, blockNum, txIndex)
if err != nil {
if first {
Expand All @@ -244,7 +244,7 @@ func (api *TraceAPIImpl) filterV3(ctx context.Context, dbtx kv.TemporalTx, fromB
continue
}
if txn == nil {
continue // guess block doesn't have transactions
continue //guess block doesn't have transactions
}
txHash := txn.Hash()
msg, err := txn.AsMessage(*lastSigner, lastHeader.BaseFee, lastRules)
Expand Down
2 changes: 1 addition & 1 deletion cmd/rpcdaemon/commands/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (

// TraceBlockByNumber implements debug_traceBlockByNumber. Returns Geth style block traces.
func (api *PrivateDebugAPIImpl) TraceBlockByNumber(ctx context.Context, blockNum rpc.BlockNumber, config *tracers.TraceConfig_ZkEvm, stream *jsoniter.Stream) error {
fmt.Println("=== TraceBlockByNumber ===")
return api.traceBlock(ctx, rpc.BlockNumberOrHashWithNumber(blockNum), config, stream)
}

Expand Down Expand Up @@ -502,6 +501,7 @@ func (api *PrivateDebugAPIImpl) TraceCallMany_deprecated(ctx context.Context, bu
ibs := evm.IntraBlockState().(*state.IntraBlockState)
ibs.Prepare(common.Hash{}, parent.Hash(), txn_index)
err = transactions.TraceTx(ctx, msg, blockCtx, txCtx, evm.IntraBlockState(), config, chainConfig, stream, api.evmCallTimeout)

if err != nil {
stream.WriteNil()
return err
Expand Down
4 changes: 2 additions & 2 deletions cmd/rpcdaemon/commands/tracing_zkevm.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
)

func (api *PrivateDebugAPIImpl) traceBlock(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash, config *tracers.TraceConfig_ZkEvm, stream *jsoniter.Stream) error {
fmt.Println("=== traceBlock zkevm ===")
tx, err := api.db.BeginRo(ctx)
if err != nil {
stream.WriteNil()
Expand Down Expand Up @@ -297,7 +296,7 @@ func (api *PrivateDebugAPIImpl) TraceCallMany(ctx context.Context, bundles []Bun
// and apply the message.
gp := new(core.GasPool).AddGas(math.MaxUint64)
for idx, txn := range replayTransactions {
// evm = vm.NewEVM(blockCtx, txCtx, evm.IntraBlockState(), chainConfig, vm.Config{Debug: false})
//evm = vm.NewEVM(blockCtx, txCtx, evm.IntraBlockState(), chainConfig, vm.Config{Debug: false})
txHash := txn.Hash()
evm, effectiveGasPricePercentage, err := core.PrepareForTxExecution(chainConfig, &vm.Config{}, &blockCtx, hermezReader, evm.IntraBlockState().(*state.IntraBlockState), block, &txHash, idx)
if err != nil {
Expand Down Expand Up @@ -338,6 +337,7 @@ func (api *PrivateDebugAPIImpl) TraceCallMany(ctx context.Context, bundles []Bun
ibs := evm.IntraBlockState().(*state.IntraBlockState)
ibs.Prepare(common.Hash{}, parent.Hash(), txn_index)
err = transactions.TraceTx(ctx, msg, blockCtx, txCtx, evm.IntraBlockState(), config, chainConfig, stream, api.evmCallTimeout)

if err != nil {
stream.WriteNil()
return err
Expand Down
15 changes: 9 additions & 6 deletions cmd/rpcdaemon/commands/zkevm_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func NewZkEvmAPI(
l1Syncer *syncer.L1Syncer,
l2SequencerUrl string,
) *ZkEvmAPIImpl {

a := &ZkEvmAPIImpl{
ethApi: base,
db: db,
Expand Down Expand Up @@ -354,7 +355,7 @@ func (api *ZkEvmAPIImpl) getOrCalcBatchData(tx kv.Tx, dbReader state.ReadOnlyHer
return nil, err
}

// found in db, do not calculate
//found in db, do not calculate
if len(batchData) != 0 {
return batchData, nil
}
Expand Down Expand Up @@ -907,6 +908,7 @@ func (api *ZkEvmAPIImpl) GetBlockRangeWitness(ctx context.Context, startBlockNrO
}

func (api *ZkEvmAPIImpl) getBatchWitness(ctx context.Context, tx kv.Tx, batchNum uint64, debug bool, mode WitnessMode) (hexutility.Bytes, error) {

// limit in-flight requests by name
semaphore := api.semaphores[getBatchWitness]
if semaphore != nil {
Expand All @@ -928,6 +930,7 @@ func (api *ZkEvmAPIImpl) getBatchWitness(ctx context.Context, tx kv.Tx, batchNum
}

return generator.GetWitnessByBatch(tx, ctx, batchNum, debug, fullWitness)

}

func (api *ZkEvmAPIImpl) buildGenerator(tx kv.Tx, witnessMode WitnessMode) (*witness.Generator, bool, error) {
Expand Down Expand Up @@ -973,6 +976,7 @@ func (api *ZkEvmAPIImpl) getBlockRangeWitness(ctx context.Context, db kv.RoDB, s
}

endBlockNr, _, _, err := rpchelper.GetCanonicalBlockNumber(endBlockNrOrHash, tx, api.ethApi.filters) // DoCall cannot be executed on non-canonical blocks

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1026,8 +1030,9 @@ func (api *ZkEvmAPIImpl) GetBatchWitness(ctx context.Context, batchNumber uint64
}

isWitnessModeNone := checkedMode == WitnessModeNone
rpcModeMatchesNodeMode := checkedMode == WitnessModeFull && api.config.WitnessFull ||
checkedMode == WitnessModeTrimmed && !api.config.WitnessFull
rpcModeMatchesNodeMode :=
checkedMode == WitnessModeFull && api.config.WitnessFull ||
checkedMode == WitnessModeTrimmed && !api.config.WitnessFull
// we only want to check the cache if no special run mode has been supplied.
// or if requested mode matches the node mode
// otherwise regenerate it
Expand Down Expand Up @@ -1306,8 +1311,7 @@ func convertTransactionsReceipts(
txs []eritypes.Transaction,
receipts eritypes.Receipts,
hermezReader hermez_db.HermezDbReader,
block eritypes.Block,
) ([]types.Transaction, error) {
block eritypes.Block) ([]types.Transaction, error) {
if len(txs) != len(receipts) {
return nil, errors.New("transactions and receipts length mismatch")
}
Expand Down Expand Up @@ -1565,7 +1569,6 @@ func (zkapi *ZkEvmAPIImpl) GetProof(ctx context.Context, address common.Address,
return nil, fmt.Errorf("not supported by Erigon3")
}

// TODO: Logic here for getting proof from "finish" stage
blockNr, _, _, err := rpchelper.GetBlockNumber(blockNrOrHash, tx, api.filters)
if err != nil {
return nil, err
Expand Down
28 changes: 15 additions & 13 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ import (
"io/fs"
"math/big"
"net"
"net/url"
"os"
"path"
"path/filepath"
"strconv"
"strings"
Expand Down Expand Up @@ -66,6 +64,9 @@ import (

"github.com/ledgerwatch/erigon/chain"

"net/url"
"path"

"github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer"
log2 "github.com/0xPolygonHermez/zkevm-data-streamer/log"
"github.com/ledgerwatch/erigon/cl/clparams"
Expand Down Expand Up @@ -354,7 +355,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
sentries = append(sentries, sentryClient)
}
} else {
readNodeInfo := func() *eth.NodeInfo {
var readNodeInfo = func() *eth.NodeInfo {
var res *eth.NodeInfo
_ = backend.chainDB.View(context.Background(), func(tx kv.Tx) error {
res = eth.ReadNodeInfo(tx, backend.chainConfig, backend.genesisHash, backend.networkID)
Expand Down Expand Up @@ -438,8 +439,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
}

inMemoryExecution := func(batch kv.RwTx, header *types.Header, body *types.RawBody, unwindPoint uint64, headersChain []*types.Header, bodiesChain []*types.RawBody,
notifications *shards.Notifications,
) error {
notifications *shards.Notifications) error {
// Needs its own notifications to not update RPC daemon and txpool about pending blocks
stateSync, err := stages2.NewInMemoryExecution(backend.sentryCtx, backend.chainDB, config, backend.sentriesClient, dirs, notifications, allSnapshots, backend.agg)
if err != nil {
Expand Down Expand Up @@ -502,11 +502,11 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
if config.DeprecatedTxPool.Disable {
backend.txPool2GrpcServer = &txpool2.GrpcDisabled{}
} else {
// cacheConfig := kvcache.DefaultCoherentCacheConfig
// cacheConfig.MetricsLabel = "txpool"
//cacheConfig := kvcache.DefaultCoherentCacheConfig
//cacheConfig.MetricsLabel = "txpool"

backend.newTxs2 = make(chan types2.Announcements, 1024)
// defer close(newTxs)
//defer close(newTxs)
backend.txPool2DB, backend.txPool2, backend.txPool2Fetch, backend.txPool2Send, backend.txPool2GrpcServer, err = txpooluitl.AllComponents(
ctx, config.TxPool, config, kvcache.NewDummy(), backend.newTxs2, backend.chainDB, backend.sentriesClient.Sentries(), stateDiffClient,
)
Expand Down Expand Up @@ -611,6 +611,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
}
bs, err := clcore.RetrieveBeaconState(ctx, beaconCfg, genesisCfg,
clparams.GetCheckpointSyncEndpoint(clparams.NetworkType(config.NetworkID)))

if err != nil {
return nil, err
}
Expand All @@ -637,6 +638,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
}
backend.notifications.Accumulator.StartChange(currentBlock.NumberU64(), currentBlock.Hash(), nil, false)
backend.notifications.Accumulator.SendAndReset(ctx, backend.notifications.StateChangesConsumer, baseFee, currentBlock.GasLimit())

}()

tx, err := backend.chainDB.BeginRw(ctx)
Expand Down Expand Up @@ -682,9 +684,9 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
}
backend.sentriesClient.Bd.AddToPrefetch(b.Header(), b.RawBody())

// p2p
// backend.sentriesClient.BroadcastNewBlock(context.Background(), b, b.Difficulty())
// rpcdaemon
//p2p
//backend.sentriesClient.BroadcastNewBlock(context.Background(), b, b.Difficulty())
//rpcdaemon
if err := miningRPC.(*privateapi.MiningServer).BroadcastMinedBlock(b); err != nil {
log.Error("txpool rpc mined block broadcast", "err", err)
}
Expand Down Expand Up @@ -1071,12 +1073,12 @@ func (backend *Ethereum) Init(stack *node.Node, config *ethconfig.Config) error
}
}

// eth.APIBackend = &EthAPIBackend{stack.Config().ExtRPCEnabled(), stack.Config().AllowUnprotectedTxs, eth, nil}
//eth.APIBackend = &EthAPIBackend{stack.Config().ExtRPCEnabled(), stack.Config().AllowUnprotectedTxs, eth, nil}
gpoParams := config.GPO
if gpoParams.Default == nil {
gpoParams.Default = config.Miner.GasPrice
}
// eth.APIBackend.gpo = gasprice.NewOracle(eth.APIBackend, gpoParams)
//eth.APIBackend.gpo = gasprice.NewOracle(eth.APIBackend, gpoParams)
if config.Ethstats != "" {
var headCh chan [][]byte
headCh, backend.unsubscribeEthstat = backend.notifications.Events.AddHeaderSubscription()
Expand Down
Loading

0 comments on commit 0aa6cfb

Please sign in to comment.