Skip to content

Commit

Permalink
Enable a sequencer to re-sequence batches
Browse files Browse the repository at this point in the history
  • Loading branch information
cffls committed Aug 7, 2024
1 parent 057a284 commit 154c504
Show file tree
Hide file tree
Showing 17 changed files with 745 additions and 129 deletions.
3 changes: 2 additions & 1 deletion cmd/integration/commands/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var (
pruneTBefore, pruneCBefore uint64
experiments []string
chain string // Which chain to use (mainnet, rinkeby, goerli, etc.)
config string

commitmentMode string
commitmentTrie string
Expand All @@ -49,7 +50,7 @@ func must(err error) {
}

func withConfig(cmd *cobra.Command) {
cmd.Flags().String("config", "", "yaml/toml config file location")
cmd.Flags().StringVar(&config, "config", "", "yaml/toml config file location")
}

func withMining(cmd *cobra.Command) {
Expand Down
8 changes: 2 additions & 6 deletions cmd/integration/commands/stage_stages_zkevm.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,8 @@ import (

common2 "github.com/gateway-fm/cdk-erigon-lib/common"
"github.com/gateway-fm/cdk-erigon-lib/kv"
"github.com/ledgerwatch/erigon/core"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
smtdb "github.com/ledgerwatch/erigon/smt/pkg/db"
erigoncli "github.com/ledgerwatch/erigon/turbo/cli"
"github.com/ledgerwatch/erigon/zk/hermez_db"
"github.com/ledgerwatch/log/v3"
"github.com/spf13/cobra"
Expand All @@ -28,9 +25,6 @@ state_stages_zkevm --datadir=/datadirs/hermez-mainnet --unwind-batch-no=2 --chai
Example: "go run ./cmd/integration state_stages_zkevm --config=... --verbosity=3 --unwind-batch-no=100",
Run: func(cmd *cobra.Command, args []string) {
ctx, _ := common2.RootContext()
ethConfig := &ethconfig.Defaults
ethConfig.Genesis = core.GenesisBlockByChainName(chain)
erigoncli.ApplyFlagsForEthConfigCobra(cmd.Flags(), ethConfig)
db := openDB(dbCfg(kv.ChainDB, chaindata), true)
defer db.Close()

Expand Down Expand Up @@ -101,6 +95,8 @@ func unwindZk(ctx context.Context, db kv.RwDB) error {
return err
}

stages.SaveStageProgress(tx, stages.HighestSeenBatchNumber, unwindBatchNo)

if err := tx.Commit(); err != nil {
return err
}
Expand Down
40 changes: 39 additions & 1 deletion cmd/integration/commands/stages_zkevm.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@ package commands

import (
"context"
"encoding/json"
"math/big"
"os"
"path"
"path/filepath"
"strings"

"github.com/c2h5oh/datasize"
chain3 "github.com/gateway-fm/cdk-erigon-lib/chain"
Expand All @@ -10,11 +16,14 @@ import (
"github.com/gateway-fm/cdk-erigon-lib/kv/kvcfg"
"github.com/ledgerwatch/erigon/cmd/hack/tool/fromdb"
"github.com/ledgerwatch/erigon/cmd/sentry/sentry"
"github.com/ledgerwatch/erigon/cmd/utils"
"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon/core"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/core/vm"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/eth/stagedsync"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/turbo/shards"
stages2 "github.com/ledgerwatch/erigon/turbo/stages"
"github.com/ledgerwatch/erigon/zk/sequencer"
Expand All @@ -26,7 +35,36 @@ func newSyncZk(ctx context.Context, db kv.RwDB) (consensus.Engine, *vm.Config, *

vmConfig := &vm.Config{}

genesis := core.GenesisBlockByChainName(chain)
var genesis *types.Genesis

if strings.HasPrefix(chain, "dynamic") {
if config == "" {
panic("Config file is required for dynamic chain")
}

params.DynamicChainConfigPath = filepath.Dir(config)
genesis = core.GenesisBlockByChainName(chain)
filename := path.Join(params.DynamicChainConfigPath, chain+"-conf.json")

dConf := utils.DynamicConfig{}

if _, err := os.Stat(filename); err == nil {
dConfBytes, err := os.ReadFile(filename)
if err != nil {
panic(err)
}
if err := json.Unmarshal(dConfBytes, &dConf); err != nil {
panic(err)
}
}

genesis.Timestamp = dConf.Timestamp
genesis.GasLimit = dConf.GasLimit
genesis.Difficulty = big.NewInt(dConf.Difficulty)
} else {
genesis = core.GenesisBlockByChainName(chain)
}

chainConfig, genesisBlock, genesisErr := core.CommitGenesisBlock(db, genesis, "")
if _, ok := genesisErr.(*chain3.ConfigCompatError); genesisErr != nil && !ok {
panic(genesisErr)
Expand Down
10 changes: 10 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,16 @@ var (
Usage: "Halt the sequencer on this batch number",
Value: 0,
}
SequencerResequence = cli.BoolFlag{
Name: "zkevm.sequencer-resequence",
Usage: "When enabled, the sequencer will automatically resequence unseen batches stored in data stream",
Value: false,
}
SequencerResequenceStrict = cli.BoolFlag{
Name: "zkevm.sequencer-resequence-strict",
Usage: "Strictly resequence the rolledback batches",
Value: true,
}
ExecutorUrls = cli.StringFlag{
Name: "zkevm.executor-urls",
Usage: "A comma separated list of grpc addresses that host executors",
Expand Down
2 changes: 2 additions & 0 deletions eth/ethconfig/config_zkevm.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type Zk struct {
SequencerBatchSealTime time.Duration
SequencerBatchVerificationTimeout time.Duration
SequencerHaltOnBatchNumber uint64
SequencerResequence bool
SequencerResequenceStrict bool
ExecutorUrls []string
ExecutorStrictMode bool
ExecutorRequestTimeout time.Duration
Expand Down
2 changes: 2 additions & 0 deletions turbo/cli/default_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ var DefaultFlags = []cli.Flag{
&utils.SequencerBatchSealTime,
&utils.SequencerBatchVerificationTimeout,
&utils.SequencerHaltOnBatchNumber,
&utils.SequencerResequence,
&utils.SequencerResequenceStrict,
&utils.ExecutorUrls,
&utils.ExecutorStrictMode,
&utils.ExecutorRequestTimeout,
Expand Down
2 changes: 2 additions & 0 deletions turbo/cli/flags_zkevm.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ func ApplyFlagsForZkConfig(ctx *cli.Context, cfg *ethconfig.Config) {
SequencerBatchSealTime: sequencerBatchSealTime,
SequencerBatchVerificationTimeout: sequencerBatchVerificationTimeout,
SequencerHaltOnBatchNumber: ctx.Uint64(utils.SequencerHaltOnBatchNumber.Name),
SequencerResequence: ctx.Bool(utils.SequencerResequence.Name),
SequencerResequenceStrict: ctx.Bool(utils.SequencerResequenceStrict.Name),
ExecutorUrls: strings.Split(strings.ReplaceAll(ctx.String(utils.ExecutorUrls.Name), " ", ""), ","),
ExecutorStrictMode: ctx.Bool(utils.ExecutorStrictMode.Name),
ExecutorRequestTimeout: ctx.Duration(utils.ExecutorRequestTimeout.Name),
Expand Down
115 changes: 4 additions & 111 deletions zk/datastream/client/stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (c *StreamClient) ExecutePerFile(bookmark *types.BookmarkProto, function fu
if c.Header.TotalEntries == count {
break
}
file, err := c.readFileEntry()
file, err := c.NextFileEntry()
if err != nil {
return fmt.Errorf("error reading file entry: %v", err)
}
Expand Down Expand Up @@ -321,7 +321,7 @@ LOOP:
c.conn.SetReadDeadline(time.Now().Add(c.checkTimeout))
}

fullBlock, batchStart, batchEnd, gerUpdate, batchBookmark, blockBookmark, localErr := c.readFullBlockProto()
fullBlock, batchStart, batchEnd, gerUpdate, batchBookmark, blockBookmark, localErr := types.FullBlockProto(c)
if localErr != nil {
err = localErr
break
Expand All @@ -344,7 +344,7 @@ LOOP:
}

if batchEnd != nil {
// this check was inside c.readFullBlockProto() but it is better to move it here
// this check was inside types.FullBlockProto(c) but it is better to move it here
c.batchEndChan <- *batchEnd
}

Expand Down Expand Up @@ -378,116 +378,9 @@ func (c *StreamClient) tryReConnect() error {
return err
}

func (c *StreamClient) readFullBlockProto() (
l2Block *types.FullL2Block,
batchStart *types.BatchStart,
batchEnd *types.BatchEnd,
gerUpdate *types.GerUpdate,
batchBookmark *types.BookmarkProto,
blockBookmark *types.BookmarkProto,
err error,
) {
file, err := c.readFileEntry()
if err != nil {
err = fmt.Errorf("read file entry error: %v", err)
return
}

switch file.EntryType {
case types.BookmarkEntryType:
var bookmark *types.BookmarkProto
if bookmark, err = types.UnmarshalBookmark(file.Data); err != nil {
return
}
if bookmark.BookmarkType() == datastream.BookmarkType_BOOKMARK_TYPE_BATCH {
batchBookmark = bookmark
return
} else {
blockBookmark = bookmark
return
}
case types.EntryTypeGerUpdate:
if gerUpdate, err = types.DecodeGerUpdateProto(file.Data); err != nil {
return
}
log.Trace("ger update", "ger", gerUpdate)
return
case types.EntryTypeBatchStart:
if batchStart, err = types.UnmarshalBatchStart(file.Data); err != nil {
return
}
return
case types.EntryTypeBatchEnd:
if batchEnd, err = types.UnmarshalBatchEnd(file.Data); err != nil {
return
}
return
case types.EntryTypeL2Block:
if l2Block, err = types.UnmarshalL2Block(file.Data); err != nil {
return
}

txs := []types.L2TransactionProto{}

var innerFile *types.FileEntry
var l2Tx *types.L2TransactionProto
LOOP:
for {
if innerFile, err = c.readFileEntry(); err != nil {
return
}

if innerFile.IsL2Tx() {
if l2Tx, err = types.UnmarshalTx(innerFile.Data); err != nil {
return
}
txs = append(txs, *l2Tx)
} else if innerFile.IsL2BlockEnd() {
var l2BlockEnd *types.L2BlockEndProto
if l2BlockEnd, err = types.UnmarshalL2BlockEnd(innerFile.Data); err != nil {
return
}
if l2BlockEnd.GetBlockNumber() != l2Block.L2BlockNumber {
err = fmt.Errorf("block end number (%d) not equal to block number (%d)", l2BlockEnd.GetBlockNumber(), l2Block.L2BlockNumber)
return
}
break LOOP
} else if innerFile.IsBookmark() {
var bookmark *types.BookmarkProto
if bookmark, err = types.UnmarshalBookmark(innerFile.Data); err != nil || bookmark == nil {
return
}
if bookmark.BookmarkType() == datastream.BookmarkType_BOOKMARK_TYPE_L2_BLOCK {
break LOOP
} else {
err = fmt.Errorf("unexpected bookmark type inside block: %v", bookmark.Type())
return
}
} else if innerFile.IsBatchEnd() {
if batchEnd, err = types.UnmarshalBatchEnd(file.Data); err != nil {
return
}
break LOOP
} else {
err = fmt.Errorf("unexpected entry type inside a block: %d", innerFile.EntryType)
return
}
}

l2Block.L2Txs = txs
return
case types.EntryTypeL2Tx:
err = fmt.Errorf("unexpected l2Tx out of block")
return
default:
err = fmt.Errorf("unexpected entry type: %d", file.EntryType)
return
}
}

// reads file bytes from socket and tries to parse them
// returns the parsed FileEntry
func (c *StreamClient) readFileEntry() (file *types.FileEntry, err error) {
func (c *StreamClient) NextFileEntry() (file *types.FileEntry, err error) {
// Read packet type
packet, err := readBuffer(c.conn, 1)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion zk/datastream/client/stream_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func Test_readFileEntry(t *testing.T) {
server.Close()
}()

result, err := c.readFileEntry()
result, err := c.NextFileEntry()
require.Equal(t, testCase.expectedError, err)
assert.DeepEqual(t, testCase.expectedResult, result)
})
Expand Down
73 changes: 73 additions & 0 deletions zk/datastream/server/data_stream_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,3 +568,76 @@ func (srv *DataStreamServer) getLastEntryOfType(entryType datastreamer.EntryType

return emtryEntry, false, nil
}

type dataStreamServerIterator struct {
stream *datastreamer.StreamServer
curEntryNum uint64
}

func newDataStreamServerIterator(stream *datastreamer.StreamServer, start uint64) *dataStreamServerIterator {
return &dataStreamServerIterator{
stream: stream,
curEntryNum: start,
}
}

func (it *dataStreamServerIterator) NextFileEntry() (entry *types.FileEntry, err error) {
var fileEntry datastreamer.FileEntry
fileEntry, err = it.stream.GetEntry(it.curEntryNum)
if err != nil {
return nil, err
}

it.curEntryNum += 1

return &types.FileEntry{
PacketType: uint8(fileEntry.Type),
Length: fileEntry.Length,
EntryType: types.EntryType(fileEntry.Type),
EntryNum: fileEntry.Number,
Data: fileEntry.Data,
}, nil
}

func (srv *DataStreamServer) ReadBatches(start uint64, end uint64) ([][]*types.FullL2Block, error) {
bookmark := types.NewBookmarkProto(start, datastream.BookmarkType_BOOKMARK_TYPE_BATCH)
marshalled, err := bookmark.Marshal()
if err != nil {
return nil, err
}

entryNum, err := srv.stream.GetBookmark(marshalled)

if err != nil {
return nil, err
}

iterator := newDataStreamServerIterator(srv.stream, entryNum)

return ReadBatches(iterator, start, end)
}

func ReadBatches(iterator types.FileEntryIterator, start uint64, end uint64) ([][]*types.FullL2Block, error) {
batches := make([][]*types.FullL2Block, end-start+1)

for {
block, batchStart, batchEnd, _, _, _, err := types.FullBlockProto(iterator)
if err != nil {
return nil, err
}

if batchEnd != nil && batchEnd.Number == end {
break
}

if batchStart != nil {
batches[batchStart.Number-start] = []*types.FullL2Block{}
}

if block != nil {
batches[block.BatchNumber-start] = append(batches[block.BatchNumber-start], block)
}
}

return batches, nil
}
Loading

0 comments on commit 154c504

Please sign in to comment.