Skip to content

Commit

Permalink
handle-unwinds-on-seq-restart
Browse files Browse the repository at this point in the history
  • Loading branch information
kstoykov committed Aug 28, 2024
1 parent aa2a2cd commit bd14081
Show file tree
Hide file tree
Showing 10 changed files with 78 additions and 149 deletions.
2 changes: 1 addition & 1 deletion cmd/rpcdaemon/commands/zkevm_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,7 @@ func (api *ZkEvmAPIImpl) GetBatchByNumber(ctx context.Context, batchNumber rpc.B
}

// local exit root
localExitRoot, err := utils.GetBatchLocalExitRoot(batchNo, hermezDb, tx)
localExitRoot, err := utils.GetBatchLocalExitRootFromSCStorageForLatestBlock(batchNo, hermezDb, tx)
if err != nil {
return nil, err
}
Expand Down
5 changes: 2 additions & 3 deletions zk/datastream/server/data_stream_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (

type DbReader interface {
GetL2BlockNosByBatch(batchNo uint64) ([]uint64, error)
GetLocalExitRootForBatchNo(batchNo uint64) (libcommon.Hash, error)
GetBatchGlobalExitRootsProto(lastBatchNumber, batchNumber uint64) ([]types.GerUpdateProto, error)
GetForkId(batchNumber uint64) (uint64, error)
GetBlockGlobalExitRoot(blockNumber uint64) (libcommon.Hash, error)
Expand Down Expand Up @@ -194,7 +193,7 @@ func createBlockWithBatchCheckStreamEntriesProto(
}
// the genesis we insert fully, so we would have to skip closing it
if !shouldSkipBatchEndEntry {
localExitRoot, err := utils.GetBatchLocalExitRootFromSCStorage(batchNumber, reader, tx)
localExitRoot, err := utils.GetBatchLocalExitRootFromSCStorageForLatestBlock(batchNumber, reader, tx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -383,7 +382,7 @@ func BuildWholeBatchStreamEntriesProto(
}

// the genesis we insert fully, so we would have to skip closing it
localExitRoot, err := utils.GetBatchLocalExitRootFromSCStorage(batchNumber, reader, tx)
localExitRoot, err := utils.GetBatchLocalExitRootFromSCStorageForLatestBlock(batchNumber, reader, tx)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion zk/datastream/server/data_stream_server_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func createBatchStartEntriesProto(
}

// seal off the last batch
if localExitRoot, err = utils.GetBatchLocalExitRootFromSCStorage(workingBatch, reader, tx); err != nil {
if localExitRoot, err = utils.GetBatchLocalExitRootFromSCStorageForLatestBlock(workingBatch, reader, tx); err != nil {
return nil, err
}
entries.Add(newBatchEndProto(localExitRoot, root, workingBatch))
Expand Down
2 changes: 1 addition & 1 deletion zk/datastream/server/datastream_populate.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ func (srv *DataStreamServer) WriteGenesisToStream(
l2Block := newL2BlockProto(genesis, genesis.Hash().Bytes(), batchNo, ger, 0, 0, common.Hash{}, 0, common.Hash{})
batchStart := newBatchStartProto(batchNo, srv.chainId, GenesisForkId, datastream.BatchType_BATCH_TYPE_REGULAR)

ler, err := utils.GetBatchLocalExitRoot(0, reader, tx)
ler, err := utils.GetBatchLocalExitRootFromSCStorageForLatestBlock(0, reader, tx)
if err != nil {
return err
}
Expand Down
18 changes: 1 addition & 17 deletions zk/hermez_db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const BLOCK_L1_BLOCK_HASHES = "block_l1_block_hashes" // block
const L1_BLOCK_HASH_GER = "l1_block_hash_ger" // l1 block hash -> GER
const INTERMEDIATE_TX_STATEROOTS = "hermez_intermediate_tx_stateRoots" // l2blockno -> stateRoot
const BATCH_WITNESSES = "hermez_batch_witnesses" // batch number -> witness
const BATCH_COUNTERS = "hermez_batch_counters" // batch number -> counters
const BATCH_COUNTERS = "hermez_batch_counters" // block number -> counters
const L1_BATCH_DATA = "l1_batch_data" // batch number -> l1 batch data from transaction call data
const REUSED_L1_INFO_TREE_INDEX = "reused_l1_info_tree_index" // block number => const 1
const LATEST_USED_GER = "latest_used_ger" // batch number -> GER latest used GER
Expand All @@ -47,8 +47,6 @@ const SMT_DEPTHS = "smt_depths" // block
const L1_INFO_LEAVES = "l1_info_leaves" // l1 info tree index -> l1 info tree leaf
const L1_INFO_ROOTS = "l1_info_roots" // root hash -> l1 info tree index
const INVALID_BATCHES = "invalid_batches" // batch number -> true
const BATCH_PARTIALLY_PROCESSED = "batch_partially_processed" // batch number -> true
const LOCAL_EXIT_ROOTS = "local_exit_roots" // batch number -> local exit root
const ROllUP_TYPES_FORKS = "rollup_types_forks" // rollup type id -> fork id
const FORK_HISTORY = "fork_history" // index -> fork id + last verified batch
const JUST_UNWOUND = "just_unwound" // batch number -> true
Expand Down Expand Up @@ -86,8 +84,6 @@ var HermezDbTables = []string{
L1_INFO_LEAVES,
L1_INFO_ROOTS,
INVALID_BATCHES,
BATCH_PARTIALLY_PROCESSED,
LOCAL_EXIT_ROOTS,
ROllUP_TYPES_FORKS,
FORK_HISTORY,
JUST_UNWOUND,
Expand Down Expand Up @@ -1610,18 +1606,6 @@ func (db *HermezDbReader) GetInvalidBatch(batchNo uint64) (bool, error) {
return len(v) > 0, nil
}

func (db *HermezDb) WriteLocalExitRootForBatchNo(batchNo uint64, root common.Hash) error {
return db.tx.Put(LOCAL_EXIT_ROOTS, Uint64ToBytes(batchNo), root.Bytes())
}

func (db *HermezDbReader) GetLocalExitRootForBatchNo(batchNo uint64) (common.Hash, error) {
v, err := db.tx.GetOne(LOCAL_EXIT_ROOTS, Uint64ToBytes(batchNo))
if err != nil {
return common.Hash{}, err
}
return common.BytesToHash(v), nil
}

func (db *HermezDb) WriteRollupType(rollupType, forkId uint64) error {
return db.tx.Put(ROllUP_TYPES_FORKS, Uint64ToBytes(rollupType), Uint64ToBytes(forkId))
}
Expand Down
12 changes: 0 additions & 12 deletions zk/stages/stage_batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ type HermezDb interface {
WriteBlockL1InfoTreeIndex(blockNumber uint64, l1Index uint64) error
WriteBlockL1InfoTreeIndexProgress(blockNumber uint64, l1Index uint64) error
WriteLatestUsedGer(blockNo uint64, ger common.Hash) error
WriteLocalExitRootForBatchNo(batchNo uint64, localExitRoot common.Hash) error
}

type DatastreamClient interface {
Expand Down Expand Up @@ -256,9 +255,6 @@ LOOP:
if entry.StateRoot != lastBlockRoot {
log.Warn(fmt.Sprintf("[%s] batch end state root mismatches last block's: %x, expected: %x", logPrefix, entry.StateRoot, lastBlockRoot))
}
if err := writeBatchEnd(hermezDb, entry); err != nil {
return fmt.Errorf("write batch end error: %v", err)
}
case *types.FullL2Block:
if cfg.zkCfg.SyncLimit > 0 && entry.L2BlockNumber >= cfg.zkCfg.SyncLimit {
// stop the node going into a crazy loop
Expand Down Expand Up @@ -752,14 +748,6 @@ func PruneBatchesStage(s *stagedsync.PruneState, tx kv.RwTx, cfg BatchesCfg, ctx
return nil
}

func writeBatchEnd(hermezDb HermezDb, batchEnd *types.BatchEnd) (err error) {
// utils.CalculateAccInputHash(oldAccInputHash, batchStart., l1InfoRoot common.Hash, timestampLimit uint64, sequencerAddr common.Address, forcedBlockhashL1 common.Hash)
if batchEnd.LocalExitRoot != emptyHash {
err = hermezDb.WriteLocalExitRootForBatchNo(batchEnd.Number, batchEnd.LocalExitRoot)
}
return
}

// writeL2Block writes L2Block to ErigonDb and HermezDb
// writes header, body, forkId and blockBatch
func writeL2Block(eriDb ErigonDb, hermezDb HermezDb, l2Block *types.FullL2Block, highestL1InfoTreeIndex uint64) error {
Expand Down
25 changes: 22 additions & 3 deletions zk/stages/stage_sequence_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,13 @@ func SpawnSequencingStage(
// if we identify any. During normal operation this function will simply check and move on without performing
// any action.
if !batchState.isAnyRecovery() {
isUnwinding, err := handleBatchEndChecks(batchContext, batchState, executionAt, u)
if err != nil || isUnwinding {
isUnwinding, err := alignExecutionToDatastream(batchContext, batchState, executionAt, u)
if err != nil {
return err
}
if isUnwinding {
return sdb.tx.Commit()
}
}

tryHaltSequencer(batchContext, batchState.batchNumber)
Expand Down Expand Up @@ -375,9 +378,25 @@ func SpawnSequencingStage(
return err
}

if err = runBatchLastSteps(batchContext, batchState.batchNumber, block.NumberU64(), batchCounters); err != nil {
/*
if adding something below that line we must ensure
- it is also handled property in processInjectedInitialBatch
- it is also handled property in alignExecutionToDatastream
- it is also handled property in doCheckForBadBatch
- it is unwound correctly
*/

if err := finalizeLastBatchInDatastream(batchContext, batchState.batchNumber, block.NumberU64()); err != nil {
return err
}

// TODO: It is 99% sure that there is no need to write this in any of processInjectedInitialBatch, alignExecutionToDatastream, doCheckForBadBatch but it is worth double checknig
// the unwind of this value is handed by UnwindExecutionStageDbWrites
if _, err := rawdb.IncrementStateVersionByBlockNumberIfNeeded(batchContext.sdb.tx, block.NumberU64()); err != nil {
return fmt.Errorf("writing plain state version: %w", err)
}

log.Info(fmt.Sprintf("[%s] Finish batch %d...", batchContext.s.LogPrefix(), batchState.batchNumber))

return sdb.tx.Commit()
}
61 changes: 0 additions & 61 deletions zk/stages/stage_sequence_execute_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/ledgerwatch/erigon/core/vm"
"github.com/ledgerwatch/erigon/eth/stagedsync"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/zk/utils"
"github.com/ledgerwatch/log/v3"
)

Expand Down Expand Up @@ -112,63 +111,3 @@ func updateStreamAndCheckRollback(

return false, nil
}

func runBatchLastSteps(
batchContext *BatchContext,
thisBatch uint64,
blockNumber uint64,
batchCounters *vm.BatchCounterCollector,
) error {
l1InfoIndex, err := batchContext.sdb.hermezDb.GetBlockL1InfoTreeIndex(blockNumber)
if err != nil {
return err
}

counters, err := batchCounters.CombineCollectors(l1InfoIndex != 0)
if err != nil {
return err
}

log.Info(fmt.Sprintf("[%s] counters consumed", batchContext.s.LogPrefix()), "batch", thisBatch, "counts", counters.UsedAsString())

if err = batchContext.sdb.hermezDb.WriteBatchCounters(blockNumber, counters.UsedAsMap()); err != nil {
return err
}

// Local Exit Root (ler): read s/c storage every batch to store the LER for the highest block in the batch
ler, err := utils.GetBatchLocalExitRootFromSCStorage(thisBatch, batchContext.sdb.hermezDb.HermezDbReader, batchContext.sdb.tx)
if err != nil {
return err
}
// write ler to hermezdb
if err = batchContext.sdb.hermezDb.WriteLocalExitRootForBatchNo(thisBatch, ler); err != nil {
return err
}

// get the last block number written to batch
// we should match it's state root in batch end entry
// if we get the last block in DB errors may occur since we have DB unwinds AFTER we commit batch end to datastream
// the last block written to the datastream before batch end should be the correct one once we are here
// if it is not, we have a bigger problem
lastBlockNumber, err := batchContext.cfg.datastreamServer.GetHighestBlockNumber()
if err != nil {
return err
}
block, err := rawdb.ReadBlockByNumber(batchContext.sdb.tx, lastBlockNumber)
if err != nil {
return err
}
blockRoot := block.Root()
if err = batchContext.cfg.datastreamServer.WriteBatchEnd(batchContext.sdb.hermezDb, thisBatch, &blockRoot, &ler); err != nil {
return err
}

// the unwind of this value is handed by UnwindExecutionStageDbWrites
if _, err = rawdb.IncrementStateVersionByBlockNumberIfNeeded(batchContext.sdb.tx, lastBlockNumber); err != nil {
return fmt.Errorf("writing plain state version: %w", err)
}

log.Info(fmt.Sprintf("[%s] Finish batch %d...", batchContext.s.LogPrefix(), thisBatch))

return nil
}
72 changes: 39 additions & 33 deletions zk/stages/stage_sequence_execute_data_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ import (
"fmt"

"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/vm"
"github.com/ledgerwatch/erigon/eth/stagedsync"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/zk/datastream/server"
verifier "github.com/ledgerwatch/erigon/zk/legacy_executor_verifier"
"github.com/ledgerwatch/erigon/zk/utils"
"github.com/ledgerwatch/log/v3"
)

Expand Down Expand Up @@ -84,55 +84,61 @@ func (sbc *SequencerBatchStreamWriter) writeBlockDetailsToDatastream(verifiedBun
return checkedVerifierBundles, nil
}

func handleBatchEndChecks(batchContext *BatchContext, batchState *BatchState, thisBlock uint64, u stagedsync.Unwinder) (bool, error) {
isLastEntryBatchEnd, err := batchContext.cfg.datastreamServer.IsLastEntryBatchEnd()
func alignExecutionToDatastream(batchContext *BatchContext, batchState *BatchState, lastExecutedBlock uint64, u stagedsync.Unwinder) (bool, error) {
lastExecutedBatch := batchState.batchNumber - 1

lastDatastreamBatch, err := batchContext.cfg.datastreamServer.GetHighestBatchNumber()
if err != nil {
return false, err
}

if isLastEntryBatchEnd {
return false, nil
lastDatastreamBlock, err := batchContext.cfg.datastreamServer.GetHighestBlockNumber()
if err != nil {
return false, err
}

lastBatch := batchState.batchNumber - 1
if lastExecutedBatch == lastDatastreamBatch && lastExecutedBlock == lastDatastreamBlock {
return false, nil
}

log.Warn(fmt.Sprintf("[%s] Last batch %d was not closed properly, closing it now...", batchContext.s.LogPrefix(), lastBatch))
if err := finalizeLastBatchInDatastreamIfNotFinalized(batchContext, lastDatastreamBatch, lastDatastreamBlock); err != nil {
return false, err
}

rawCounters, _, err := batchContext.sdb.hermezDb.GetLatestBatchCounters(lastBatch)
block, err := rawdb.ReadBlockByNumber(batchContext.sdb.tx, lastDatastreamBlock)
if err != nil {
return false, err
return true, err
}

latestCounters := vm.NewCountersFromUsedMap(rawCounters)
log.Warn(fmt.Sprintf("[%s] Unwinding due to a datastream gap", batchContext.s.LogPrefix()), "streamHeight", lastDatastreamBlock, "sequencerHeight", lastExecutedBlock)
u.UnwindTo(lastDatastreamBlock, block.Hash())
return true, nil
}

endBatchCounters, err := prepareBatchCounters(batchContext, batchState, latestCounters)
func finalizeLastBatchInDatastreamIfNotFinalized(batchContext *BatchContext, batchToClose, blockToCloseAt uint64) error {
isLastEntryBatchEnd, err := batchContext.cfg.datastreamServer.IsLastEntryBatchEnd()
if err != nil {
return false, err
return err
}

if err = runBatchLastSteps(batchContext, lastBatch, thisBlock, endBatchCounters); err != nil {
return false, err
if isLastEntryBatchEnd {
return nil
}
log.Warn(fmt.Sprintf("[%s] Last datastream's batch %d was not closed properly, closing it now...", batchContext.s.LogPrefix(), batchToClose))
return finalizeLastBatchInDatastream(batchContext, batchToClose, blockToCloseAt)
}

// now check if there is a gap in the stream vs the state db
streamProgress, err := stages.GetStageProgress(batchContext.sdb.tx, stages.DataStream)
func finalizeLastBatchInDatastream(batchContext *BatchContext, batchToClose, blockToCloseAt uint64) error {
ler, err := utils.GetBatchLocalExitRootFromSCStorageByBlock(blockToCloseAt, batchContext.sdb.hermezDb.HermezDbReader, batchContext.sdb.tx)
if err != nil {
return false, err
return err
}

unwinding := false
if streamProgress > 0 && streamProgress < thisBlock {
block, err := rawdb.ReadBlockByNumber(batchContext.sdb.tx, streamProgress)
if err != nil {
return true, err
}
log.Warn(fmt.Sprintf("[%s] Unwinding due to a datastream gap", batchContext.s.LogPrefix()),
"streamHeight", streamProgress,
"sequencerHeight", thisBlock,
)
u.UnwindTo(streamProgress, block.Hash())
unwinding = true
lastBlock, err := rawdb.ReadBlockByNumber(batchContext.sdb.tx, blockToCloseAt)
if err != nil {
return err
}

return unwinding, nil
root := lastBlock.Root()
if err = batchContext.cfg.datastreamServer.WriteBatchEnd(batchContext.sdb.hermezDb, batchToClose, &root, &ler); err != nil {
return err
}
return nil
}
Loading

0 comments on commit bd14081

Please sign in to comment.