Skip to content

Commit

Permalink
Address CR comments and fix a few bugs found when testing in kurtosis…
Browse files Browse the repository at this point in the history
… environment
  • Loading branch information
cffls committed Aug 2, 2024
1 parent eb36a04 commit 0bdc8c0
Show file tree
Hide file tree
Showing 14 changed files with 143 additions and 69 deletions.
3 changes: 2 additions & 1 deletion cmd/integration/commands/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var (
pruneTBefore, pruneCBefore uint64
experiments []string
chain string // Which chain to use (mainnet, rinkeby, goerli, etc.)
config string

commitmentMode string
commitmentTrie string
Expand All @@ -49,7 +50,7 @@ func must(err error) {
}

func withConfig(cmd *cobra.Command) {
cmd.Flags().String("config", "", "yaml/toml config file location")
cmd.Flags().StringVar(&config, "config", "", "yaml/toml config file location")
}

func withMining(cmd *cobra.Command) {
Expand Down
8 changes: 2 additions & 6 deletions cmd/integration/commands/stage_stages_zkevm.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,8 @@ import (

common2 "github.com/gateway-fm/cdk-erigon-lib/common"
"github.com/gateway-fm/cdk-erigon-lib/kv"
"github.com/ledgerwatch/erigon/core"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
smtdb "github.com/ledgerwatch/erigon/smt/pkg/db"
erigoncli "github.com/ledgerwatch/erigon/turbo/cli"
"github.com/ledgerwatch/erigon/zk/hermez_db"
"github.com/ledgerwatch/log/v3"
"github.com/spf13/cobra"
Expand All @@ -28,9 +25,6 @@ state_stages_zkevm --datadir=/datadirs/hermez-mainnet --unwind-batch-no=2 --chai
Example: "go run ./cmd/integration state_stages_zkevm --config=... --verbosity=3 --unwind-batch-no=100",
Run: func(cmd *cobra.Command, args []string) {
ctx, _ := common2.RootContext()
ethConfig := &ethconfig.Defaults
ethConfig.Genesis = core.GenesisBlockByChainName(chain)
erigoncli.ApplyFlagsForEthConfigCobra(cmd.Flags(), ethConfig)
db := openDB(dbCfg(kv.ChainDB, chaindata), true)
defer db.Close()

Expand Down Expand Up @@ -101,6 +95,8 @@ func unwindZk(ctx context.Context, db kv.RwDB) error {
return err
}

stages.SaveStageProgress(tx, stages.HighestSeenBatchNumber, unwindBatchNo)

if err := tx.Commit(); err != nil {
return err
}
Expand Down
40 changes: 39 additions & 1 deletion cmd/integration/commands/stages_zkevm.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@ package commands

import (
"context"
"encoding/json"
"math/big"
"os"
"path"
"path/filepath"
"strings"

"github.com/c2h5oh/datasize"
chain3 "github.com/gateway-fm/cdk-erigon-lib/chain"
Expand All @@ -10,11 +16,14 @@ import (
"github.com/gateway-fm/cdk-erigon-lib/kv/kvcfg"
"github.com/ledgerwatch/erigon/cmd/hack/tool/fromdb"
"github.com/ledgerwatch/erigon/cmd/sentry/sentry"
"github.com/ledgerwatch/erigon/cmd/utils"
"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon/core"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/core/vm"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/eth/stagedsync"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/turbo/shards"
stages2 "github.com/ledgerwatch/erigon/turbo/stages"
"github.com/ledgerwatch/erigon/zk/sequencer"
Expand All @@ -26,7 +35,36 @@ func newSyncZk(ctx context.Context, db kv.RwDB) (consensus.Engine, *vm.Config, *

vmConfig := &vm.Config{}

genesis := core.GenesisBlockByChainName(chain)
var genesis *types.Genesis

if strings.HasPrefix(chain, "dynamic") {
if config == "" {
panic("Config file is required for dynamic chain")
}

params.DynamicChainConfigPath = filepath.Dir(config)
genesis = core.GenesisBlockByChainName(chain)
filename := path.Join(params.DynamicChainConfigPath, chain+"-conf.json")

dConf := utils.DynamicConfig{}

if _, err := os.Stat(filename); err == nil {
dConfBytes, err := os.ReadFile(filename)
if err != nil {
panic(err)
}
if err := json.Unmarshal(dConfBytes, &dConf); err != nil {
panic(err)
}
}

genesis.Timestamp = dConf.Timestamp
genesis.GasLimit = dConf.GasLimit
genesis.Difficulty = big.NewInt(dConf.Difficulty)
} else {
genesis = core.GenesisBlockByChainName(chain)
}

chainConfig, genesisBlock, genesisErr := core.CommitGenesisBlock(db, genesis, "")
if _, ok := genesisErr.(*chain3.ConfigCompatError); genesisErr != nil && !ok {
panic(genesisErr)
Expand Down
4 changes: 2 additions & 2 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,8 +481,8 @@ var (
Usage: "Halt the sequencer on this batch number",
Value: 0,
}
ResequenceStrictMode = cli.BoolFlag{
Name: "zkevm.resequence-strict",
SequencerResequenceStrict = cli.BoolFlag{
Name: "zkevm.sequencer-resequence-strict",
Usage: "Strictly resequence the rolledback batches",
Value: true,
}
Expand Down
2 changes: 1 addition & 1 deletion eth/ethconfig/config_zkevm.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type Zk struct {
SequencerBatchSealTime time.Duration
SequencerNonEmptyBatchSealTime time.Duration
SequencerHaltOnBatchNumber uint64
ResequenceStrictMode bool
SequencerResequenceStrict bool
ExecutorUrls []string
ExecutorStrictMode bool
ExecutorRequestTimeout time.Duration
Expand Down
2 changes: 1 addition & 1 deletion turbo/cli/default_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ var DefaultFlags = []cli.Flag{
&utils.SequencerBatchSealTime,
&utils.SequencerNonEmptyBatchSealTime,
&utils.SequencerHaltOnBatchNumber,
&utils.ResequenceStrictMode,
&utils.SequencerResequenceStrict,
&utils.ExecutorUrls,
&utils.ExecutorStrictMode,
&utils.ExecutorRequestTimeout,
Expand Down
2 changes: 1 addition & 1 deletion turbo/cli/flags_zkevm.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func ApplyFlagsForZkConfig(ctx *cli.Context, cfg *ethconfig.Config) {
SequencerBatchSealTime: sequencerBatchSealTime,
SequencerNonEmptyBatchSealTime: sequencerNonEmptyBatchSealTime,
SequencerHaltOnBatchNumber: ctx.Uint64(utils.SequencerHaltOnBatchNumber.Name),
ResequenceStrictMode: ctx.Bool(utils.ResequenceStrictMode.Name),
SequencerResequenceStrict: ctx.Bool(utils.SequencerResequenceStrict.Name),
ExecutorUrls: strings.Split(strings.ReplaceAll(ctx.String(utils.ExecutorUrls.Name), " ", ""), ","),
ExecutorStrictMode: ctx.Bool(utils.ExecutorStrictMode.Name),
ExecutorRequestTimeout: ctx.Duration(utils.ExecutorRequestTimeout.Name),
Expand Down
29 changes: 13 additions & 16 deletions zk/datastream/client/stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,25 +370,22 @@ func (c *StreamClient) ReadBatches(start uint64, end uint64) ([][]*types.FullL2B

batches := make([][]*types.FullL2Block, end-start+1)

for i := start; i <= end; i++ {
for {
block, _, batchEnd, _, _, _, err := c.readFullBlockProto()
block, batchStart, batchEnd, _, _, _, err := c.readFullBlockProto()
if err != nil {
return nil, err
}
for {
block, batchStart, batchEnd, _, _, _, err := c.readFullBlockProto()
if err != nil {
return nil, err
}

if batchEnd != nil && batchEnd.Number == end {
break
}
if batchEnd != nil && batchEnd.Number == end {
break
}

if batchStart != nil {
batches[block.BatchNumber] = []*types.FullL2Block{}
}
if batchStart != nil {
batches[batchStart.Number-start] = []*types.FullL2Block{}
}

if block != nil {
batches[block.BatchNumber] = append(batches[block.BatchNumber], block)
}
if block != nil {
batches[block.BatchNumber-start] = append(batches[block.BatchNumber-start], block)
}
}

Expand Down
1 change: 1 addition & 0 deletions zk/stages/stage_batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type HermezDb interface {

type DatastreamClient interface {
ReadAllEntriesToChannel() error
ReadBatches(start uint64, end uint64) ([][]*types.FullL2Block, error)
GetL2BlockChan() chan types.FullL2Block
GetL2TxChan() chan types.L2TransactionProto
GetBatchStartChan() chan types.BatchStart
Expand Down
78 changes: 53 additions & 25 deletions zk/stages/stage_sequence_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,13 @@ func SpawnSequencingStage(
cfg SequenceBlockCfg,
quiet bool,
) (err error) {
freshTx := tx == nil
if freshTx {
tx, err = cfg.db.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
roTx, err := cfg.db.BeginRo(ctx)
if err != nil {
return err
}
defer roTx.Rollback()

lastBatch, err := stages.GetStageProgress(tx, stages.HighestSeenBatchNumber)
lastBatch, err := stages.GetStageProgress(roTx, stages.HighestSeenBatchNumber)
if err != nil {
return err
}
Expand All @@ -62,19 +59,26 @@ func SpawnSequencingStage(
} else {
log.Info(fmt.Sprintf("[%s] Last batch %d is lower than highest batch in datastream %d, resequencing...", s.LogPrefix(), lastBatch, highestBatchInDS))

latestForkId, err := stages.GetStageProgress(tx, stages.ForkId)
latestForkId, err := stages.GetStageProgress(roTx, stages.ForkId)
if err != nil {
return err
}
dsClient := client.NewClient(ctx, fmt.Sprintf("%s:%d", cfg.zk.DataStreamHost, cfg.zk.DataStreamPort), cfg.zk.DatastreamVersion, cfg.zk.L2DataStreamerTimeout, uint16(latestForkId))

dsClient.Start()

batches, err := dsClient.ReadBatches(lastBatch+1, highestBatchInDS)
if err != nil {
return err
}

dsClient.Stop()

err = cfg.datastreamServer.UnwindToBatchStart(lastBatch + 1)
if err != nil {
return err
}

log.Info(fmt.Sprintf("[%s] Resequence from batch %d to %d in data stream", s.LogPrefix(), lastBatch+1, highestBatchInDS))

for _, batch := range batches {
Expand All @@ -85,24 +89,14 @@ func SpawnSequencingStage(
return err
}

if batchJob.StartBlockIndex < len(batchJob.batchToProcess) {
subBatchCount += 1
}
subBatchCount += 1
}

log.Info(fmt.Sprintf("[%s] Resequenced original batch %d with %d batches", s.LogPrefix(), batchJob.batchToProcess[0].BatchNumber, subBatchCount))
if cfg.zk.ResequenceStrictMode && subBatchCount != 1 {
if cfg.zk.SequencerResequenceStrict && subBatchCount != 1 {
return fmt.Errorf("strict mode enabled, but resequenced batch %d has %d sub-batches", batchJob.batchToProcess[0].BatchNumber, subBatchCount)
}
}

cfg.datastreamServer.UnwindToBatchStart(lastBatch)
}

if freshTx {
if err = tx.Commit(); err != nil {
return err
}
}

return nil
Expand All @@ -121,6 +115,15 @@ func sequencingStageStep(
log.Info(fmt.Sprintf("[%s] Starting sequencing stage", logPrefix))
defer log.Info(fmt.Sprintf("[%s] Finished sequencing stage", logPrefix))

freshTx := tx == nil
if freshTx {
tx, err = cfg.db.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
}

sdb := newStageDb(tx)

l1Recovery := cfg.zk.L1SyncStartBlock > 0
Expand Down Expand Up @@ -180,6 +183,12 @@ func sequencingStageStep(
return err
}

if freshTx {
if err = tx.Commit(); err != nil {
return err
}
}

return nil
}

Expand Down Expand Up @@ -323,6 +332,11 @@ func sequencingStageStep(
if err = sdb.hermezDb.WriteForkId(thisBatch, forkId); err != nil {
return err
}
if freshTx {
if err = tx.Commit(); err != nil {
return err
}
}
return nil
}
}
Expand Down Expand Up @@ -361,6 +375,7 @@ func sequencingStageStep(
}

forcedHeaderTimestamp = uint64(batchToProcess.CurrentBlock().Timestamp)
decodedBlock.L1InfoTreeIndex = batchToProcess.CurrentBlock().L1InfoTreeIndex
}

l1InfoIndex, err := sdb.hermezDb.GetBlockL1InfoTreeIndex(lastStartedBn)
Expand Down Expand Up @@ -399,7 +414,7 @@ func sequencingStageStep(
break
}

infoTreeIndexProgress, l1TreeUpdate, l1TreeUpdateIndex, l1BlockHash, ger, shouldWriteGerToContract, err := prepareL1AndInfoTreeRelatedStuff(sdb, &decodedBlock, l1Recovery, header.Time)
infoTreeIndexProgress, l1TreeUpdate, l1TreeUpdateIndex, l1BlockHash, ger, shouldWriteGerToContract, err := prepareL1AndInfoTreeRelatedStuff(sdb, &decodedBlock, l1Recovery || (resequence && cfg.zk.SequencerResequenceStrict), header.Time)
if err != nil {
return err
}
Expand Down Expand Up @@ -468,7 +483,7 @@ func sequencingStageStep(
return err
}
cfg.txPool.UnlockFlusher()
} else if !l1Recovery || !resequence {
} else if !l1Recovery && !resequence {
cfg.txPool.LockFlusher()
blockTransactions, err = getNextPoolTransactions(ctx, cfg, executionAt, forkId, yielded)
if err != nil {
Expand Down Expand Up @@ -509,7 +524,7 @@ func sequencingStageStep(
}

if resequence {
if cfg.zk.ResequenceStrictMode {
if cfg.zk.SequencerResequenceStrict {
return fmt.Errorf("strict mode enabled, but resequenced batch %d failed to add transaction %s: %v", thisBatch, txHash, err)
} else {
log.Warn(fmt.Sprintf("[%s] error adding transaction to batch during resequence: %v", logPrefix, err),
Expand Down Expand Up @@ -554,7 +569,7 @@ func sequencingStageStep(
}
}

if resequence && cfg.zk.ResequenceStrictMode {
if resequence && cfg.zk.SequencerResequenceStrict {
return fmt.Errorf("strict mode enabled, but resequenced batch %d overflowed counters on block %d", thisBatch, blockNumber)
}

Expand Down Expand Up @@ -582,7 +597,18 @@ func sequencingStageStep(

if resequence {
if len(blockTransactions) == 0 {
// We need to jump to the next block here if there are no transactions in current block
batchToProcess.UpdateLastProcessedTx(batchToProcess.CurrentBlock().L2Blockhash)
break LOOP_TRANSACTIONS
}

if batchToProcess.AtNewBlockBoundary() {
// We need to jump to the next block here if we are at the end of the current block
break LOOP_TRANSACTIONS
} else {
if cfg.zk.SequencerResequenceStrict {
return fmt.Errorf("strict mode enabled, but resequenced batch %d has transactions that overflowed counters or failed transactions", thisBatch)
}
}
}

Expand Down Expand Up @@ -701,7 +727,9 @@ func sequencingStageStep(
if err = cfg.datastreamServer.WriteBatchEnd(sdb.hermezDb, thisBatch, lastBatch, &blockRoot, &ler); err != nil {
return err
}
}

if freshTx {
if err = tx.Commit(); err != nil {
return err
}
Expand Down
Loading

0 comments on commit 0bdc8c0

Please sign in to comment.