Skip to content

Commit

Permalink
datastream repopulation logic changes and removal of batch partially …
Browse files Browse the repository at this point in the history
…processed

partially processed removed in favour of simply sealing the WIP batch on a restart
  • Loading branch information
hexoscott committed Aug 20, 2024
1 parent 7bf6281 commit 5f015f6
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 103 deletions.
3 changes: 0 additions & 3 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,9 +753,6 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
latestHeader := backend.dataStream.GetHeader()
if latestHeader.TotalEntries == 0 {
log.Info("[dataStream] setting the stream progress to 0")
if err := stages.SaveStageProgress(tx, stages.DataStream, 0); err != nil {
return nil, err
}
backend.preStartTasks.WarmUpDataStream = true
}
}
Expand Down
25 changes: 0 additions & 25 deletions zk/hermez_db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1610,31 +1610,6 @@ func (db *HermezDbReader) GetInvalidBatch(batchNo uint64) (bool, error) {
return len(v) > 0, nil
}

func (db *HermezDb) WriteIsBatchPartiallyProcessed(batchNo uint64) error {
return db.tx.Put(BATCH_PARTIALLY_PROCESSED, Uint64ToBytes(batchNo), []byte{1})
}

func (db *HermezDb) DeleteIsBatchPartiallyProcessed(batchNo uint64) error {
return db.tx.Delete(BATCH_PARTIALLY_PROCESSED, Uint64ToBytes(batchNo))
}

func (db *HermezDbReader) GetIsBatchPartiallyProcessed(batchNo uint64) (bool, error) {
v, err := db.tx.GetOne(BATCH_PARTIALLY_PROCESSED, Uint64ToBytes(batchNo))
if err != nil {
return false, err
}
return len(v) > 0, nil
}

func (db *HermezDb) TruncateIsBatchPartiallyProcessed(fromBatch, toBatch uint64) error {
for batch := fromBatch; batch <= toBatch; batch++ {
if err := db.DeleteIsBatchPartiallyProcessed(batch); err != nil {
return err
}
}
return nil
}

func (db *HermezDb) WriteLocalExitRootForBatchNo(batchNo uint64, root common.Hash) error {
return db.tx.Put(LOCAL_EXIT_ROOTS, Uint64ToBytes(batchNo), root.Bytes())
}
Expand Down
21 changes: 17 additions & 4 deletions zk/stages/stage_dataStreamCatchup.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/ledgerwatch/erigon/zk/datastream/server"
"github.com/ledgerwatch/erigon/zk/hermez_db"
"github.com/ledgerwatch/log/v3"
"github.com/ledgerwatch/erigon/zk/sequencer"
)

type DataStreamCatchupCfg struct {
Expand Down Expand Up @@ -80,12 +81,24 @@ func CatchupDatastream(ctx context.Context, logPrefix string, tx kv.RwTx, stream
srv := server.NewDataStreamServer(stream, chainId)
reader := hermez_db.NewHermezDbReader(tx)

finalBlockNumber, err := stages.GetStageProgress(tx, stages.Execution)
if err != nil {
return 0, err
var (
err error
finalBlockNumber uint64
)

if sequencer.IsSequencer() {
finalBlockNumber, err = stages.GetStageProgress(tx, stages.DataStream)
if err != nil {
return 0, err
}
} else {
finalBlockNumber, err = stages.GetStageProgress(tx, stages.Execution)
if err != nil {
return 0, err
}
}

previousProgress, err := stages.GetStageProgress(tx, stages.DataStream)
previousProgress, err := srv.GetHighestBlockNumber()
if err != nil {
return 0, err
}
Expand Down
70 changes: 53 additions & 17 deletions zk/stages/stage_sequence_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/zk"
"github.com/ledgerwatch/erigon/zk/utils"
"github.com/ledgerwatch/erigon/core/vm"
)

func SpawnSequencingStage(
Expand Down Expand Up @@ -46,11 +47,6 @@ func SpawnSequencingStage(
return err
}

isLastBatchPariallyProcessed, err := sdb.hermezDb.GetIsBatchPartiallyProcessed(lastBatch)
if err != nil {
return err
}

forkId, err := prepareForkId(lastBatch, executionAt, sdb.hermezDb)
if err != nil {
return err
Expand All @@ -66,7 +62,7 @@ func SpawnSequencingStage(
var block *types.Block
runLoopBlocks := true
batchContext := newBatchContext(ctx, &cfg, &historyCfg, s, sdb)
batchState := newBatchState(forkId, prepareBatchNumber(lastBatch, isLastBatchPariallyProcessed), !isLastBatchPariallyProcessed && cfg.zk.HasExecutors(), cfg.zk.L1SyncStartBlock > 0, cfg.txPool)
batchState := newBatchState(forkId, lastBatch+1, cfg.zk.HasExecutors(), cfg.zk.L1SyncStartBlock > 0, cfg.txPool)
blockDataSizeChecker := newBlockDataChecker()
streamWriter := newSequencerBatchStreamWriter(batchContext, batchState, lastBatch) // using lastBatch (rather than batchState.batchNumber) is not mistake

Expand All @@ -79,29 +75,69 @@ func SpawnSequencingStage(
if err = cfg.datastreamServer.WriteWholeBatchToStream(logPrefix, sdb.tx, sdb.hermezDb.HermezDbReader, lastBatch, injectedBatchBatchNumber); err != nil {
return err
}
if err = stages.SaveStageProgress(sdb.tx, stages.DataStream, 1); err != nil {
return err
}

return sdb.tx.Commit()
}

tryHaltSequencer(batchContext, batchState.batchNumber)

if err := utils.UpdateZkEVMBlockCfg(cfg.chainConfig, sdb.hermezDb, logPrefix); err != nil {
return err
if batchState.hasExecutorForThisBatch {
// identify a stream gap i.e. a sequencer restart without an ack back from an executor.
// in this case we need to unwind the state so that we match the datastream height
streamProgress, err := stages.GetStageProgress(sdb.tx, stages.DataStream)
if err != nil {
return err
}
if streamProgress > 0 && streamProgress < executionAt {
block, err := rawdb.ReadBlockByNumber(sdb.tx, streamProgress)
if err != nil {
return err
}
log.Warn(fmt.Sprintf("[%s] Unwinding due to a datastream gap", logPrefix),
"streamHeight", streamProgress,
"sequencerHeight", executionAt,
)
u.UnwindTo(streamProgress, block.Hash())
return nil
}
}

batchCounters, err := prepareBatchCounters(batchContext, batchState, isLastBatchPariallyProcessed)
lastBatchSealed, err := checkIfLastBatchIsSealed(batchContext)
if err != nil {
return err
}

if !isLastBatchPariallyProcessed {
// handle case where batch wasn't closed properly
// close it before starting a new one
// this occurs when sequencer was switched from syncer or sequencer datastream files were deleted
// and datastream was regenerated
if err = finalizeLastBatchInDatastreamIfNotFinalized(batchContext, batchState, executionAt); err != nil {
if !lastBatchSealed {
log.Warn(fmt.Sprintf("[%s] Closing batch early due to partial processing", logPrefix), "batch", lastBatch)

// we are in a state where the sequencer was perhaps restarted or unwound and the last batch
// that was partially processed needed to be closed, and we will have at least one block in it (because the last
// entry wasn't a batch end)
rawCounters, _, err := sdb.hermezDb.GetLatestBatchCounters(lastBatch)
if err != nil {
return err
}
latestCounters := vm.NewCountersFromUsedMap(rawCounters)

endBatchCounters, err := prepareBatchCounters(batchContext, batchState, latestCounters)

if err = runBatchLastSteps(batchContext, lastBatch, executionAt, endBatchCounters); err != nil {
return err
}

return sdb.tx.Commit()
}

tryHaltSequencer(batchContext, batchState.batchNumber)

if err := utils.UpdateZkEVMBlockCfg(cfg.chainConfig, sdb.hermezDb, logPrefix); err != nil {
return err
}

batchCounters, err := prepareBatchCounters(batchContext, batchState, nil)
if err != nil {
return err
}

if batchState.isL1Recovery() {
Expand Down
29 changes: 1 addition & 28 deletions zk/stages/stage_sequence_execute_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,7 @@ import (
"github.com/ledgerwatch/log/v3"
)

func prepareBatchNumber(lastBatch uint64, isLastBatchPariallyProcessed bool) uint64 {
if isLastBatchPariallyProcessed {
return lastBatch
}

return lastBatch + 1
}

func prepareBatchCounters(batchContext *BatchContext, batchState *BatchState, isLastBatchPariallyProcessed bool) (*vm.BatchCounterCollector, error) {
var intermediateUsedCounters *vm.Counters
if isLastBatchPariallyProcessed {
intermediateCountersMap, found, err := batchContext.sdb.hermezDb.GetLatestBatchCounters(batchState.batchNumber)
if err != nil {
return nil, err
}
if found {
intermediateUsedCounters = vm.NewCountersFromUsedMap(intermediateCountersMap)
} else {
log.Warn("intermediate counters not found for batch, initialising with empty counters", "batch", batchState.batchNumber)
}
}

func prepareBatchCounters(batchContext *BatchContext, batchState *BatchState, intermediateUsedCounters *vm.Counters) (*vm.BatchCounterCollector, error) {
return vm.NewBatchCounterCollector(batchContext.sdb.smt.GetDepth(), uint16(batchState.forkId), batchContext.cfg.zk.VirtualCountersSmtReduction, batchContext.cfg.zk.ShouldCountersBeUnlimited(batchState.isL1Recovery()), intermediateUsedCounters), nil
}

Expand Down Expand Up @@ -66,9 +45,6 @@ func doCheckForBadBatch(batchContext *BatchContext, batchState *BatchState, this
if err = batchContext.sdb.hermezDb.WriteBatchCounters(currentBlock.NumberU64(), map[string]int{}); err != nil {
return false, err
}
if err = batchContext.sdb.hermezDb.DeleteIsBatchPartiallyProcessed(batchState.batchNumber); err != nil {
return false, err
}
if err = stages.SaveStageProgress(batchContext.sdb.tx, stages.HighestSeenBatchNumber, batchState.batchNumber); err != nil {
return false, err
}
Expand Down Expand Up @@ -158,9 +134,6 @@ func runBatchLastSteps(
if err = batchContext.sdb.hermezDb.WriteBatchCounters(blockNumber, counters.UsedAsMap()); err != nil {
return err
}
if err := batchContext.sdb.hermezDb.DeleteIsBatchPartiallyProcessed(thisBatch); err != nil {
return err
}

// Local Exit Root (ler): read s/c storage every batch to store the LER for the highest block in the batch
ler, err := utils.GetBatchLocalExitRootFromSCStorage(thisBatch, batchContext.sdb.hermezDb.HermezDbReader, batchContext.sdb.tx)
Expand Down
6 changes: 0 additions & 6 deletions zk/stages/stage_sequence_execute_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,12 +247,6 @@ func finaliseBlock(
return nil, err
}

// write partially processed
err = batchContext.sdb.hermezDb.WriteIsBatchPartiallyProcessed(batchState.batchNumber)
if err != nil {
return nil, err
}

// this is actually account + storage indices stages
quitCh := batchContext.ctx.Done()
from := newNum.Uint64()
Expand Down
32 changes: 23 additions & 9 deletions zk/stages/stage_sequence_execute_data_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
verifier "github.com/ledgerwatch/erigon/zk/legacy_executor_verifier"
"github.com/ledgerwatch/erigon/zk/utils"
"github.com/ledgerwatch/log/v3"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
)

type SequencerBatchStreamWriter struct {
Expand Down Expand Up @@ -62,6 +63,10 @@ func (sbc *SequencerBatchStreamWriter) writeBlockDetailsToDatastream(verifiedBun
return checkedVerifierBundles, err
}

if err = stages.SaveStageProgress(sbc.sdb.tx, stages.DataStream, block.NumberU64()); err != nil {
return checkedVerifierBundles, err
}

// once we have handled the very first block we can update the last batch to be the current batch safely so that
// we don't keep adding batch bookmarks in between blocks
sbc.lastBatch = request.BatchNumber
Expand All @@ -78,29 +83,38 @@ func (sbc *SequencerBatchStreamWriter) writeBlockDetailsToDatastream(verifiedBun
return checkedVerifierBundles, nil
}

func finalizeLastBatchInDatastreamIfNotFinalized(batchContext *BatchContext, batchState *BatchState, thisBlock uint64) error {
func checkIfLastBatchIsSealed(batchContext *BatchContext) (bool, error) {
isLastEntryBatchEnd, err := batchContext.cfg.datastreamServer.IsLastEntryBatchEnd()
if err != nil {
return false, err
}

return isLastEntryBatchEnd, nil
}

func finalizeLastBatchInDatastreamIfNotFinalized(batchContext *BatchContext, batchState *BatchState, thisBlock uint64) (bool, error) {
isLastEntryBatchEnd, err := batchContext.cfg.datastreamServer.IsLastEntryBatchEnd()
if err != nil {
return err
return false, err
}

if isLastEntryBatchEnd {
return nil
return false, nil
}

log.Warn(fmt.Sprintf("[%s] Last batch %d was not closed properly, closing it now...", batchContext.s.LogPrefix(), batchState.batchNumber))
ler, err := utils.GetBatchLocalExitRootFromSCStorage(batchState.batchNumber, batchContext.sdb.hermezDb.HermezDbReader, batchContext.sdb.tx)
log.Warn(fmt.Sprintf("[%s] Last batch %d was not closed properly, closing it now...", batchContext.s.LogPrefix(), batchState.batchNumber-1))
ler, err := utils.GetBatchLocalExitRootFromSCStorage(batchState.batchNumber-1, batchContext.sdb.hermezDb.HermezDbReader, batchContext.sdb.tx)
if err != nil {
return err
return true, err
}

lastBlock, err := rawdb.ReadBlockByNumber(batchContext.sdb.tx, thisBlock)
if err != nil {
return err
return true, err
}
root := lastBlock.Root()
if err = batchContext.cfg.datastreamServer.WriteBatchEnd(batchContext.sdb.hermezDb, batchState.batchNumber-1, &root, &ler); err != nil {
return err
return true, err
}
return nil
return true, nil
}
3 changes: 1 addition & 2 deletions zk/stages/stage_sequence_execute_injected_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ func processInjectedInitialBatch(
return err
}

// deleting the partially processed flag
return batchContext.sdb.hermezDb.DeleteIsBatchPartiallyProcessed(injectedBatchBatchNumber)
return err
}

func handleInjectedBatch(
Expand Down
9 changes: 0 additions & 9 deletions zk/stages/stage_sequence_execute_unwind.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,6 @@ func UnwindSequenceExecutionStageDbWrites(ctx context.Context, u *stagedsync.Unw
if err = hermezDb.DeleteBatchCounters(u.UnwindPoint+1, s.BlockNumber); err != nil {
return fmt.Errorf("truncate block batches error: %v", err)
}
// only seq
if err = hermezDb.TruncateIsBatchPartiallyProcessed(fromBatch, toBatch); err != nil {
return fmt.Errorf("truncate fork id error: %v", err)
}
if lastBatchToKeepBeforeFrom == fromBatch {
if err = hermezDb.WriteIsBatchPartiallyProcessed(lastBatchToKeepBeforeFrom); err != nil {
return fmt.Errorf("truncate fork id error: %v", err)
}
}

return nil
}

0 comments on commit 5f015f6

Please sign in to comment.