Skip to content

Commit

Permalink
feat(sequencer): add-checked-regarding-ds-execution (#1249)
Browse files Browse the repository at this point in the history
  • Loading branch information
kstoykov authored Oct 1, 2024
1 parent 3d0109a commit c3ad9bd
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 34 deletions.
40 changes: 6 additions & 34 deletions zk/stages/stage_sequence_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,40 +47,7 @@ func SpawnSequencingStage(
}

if lastBatch < highestBatchInDs {
if !cfg.zk.SequencerResequence {
panic(fmt.Sprintf("[%s] The node need re-sequencing but this option is disabled.", s.LogPrefix()))
}

log.Info(fmt.Sprintf("[%s] Last batch %d is lower than highest batch in datastream %d, resequencing...", s.LogPrefix(), lastBatch, highestBatchInDs))

batches, err := cfg.datastreamServer.ReadBatches(lastBatch+1, highestBatchInDs)
if err != nil {
return err
}

if err = cfg.datastreamServer.UnwindToBatchStart(lastBatch + 1); err != nil {
return err
}

log.Info(fmt.Sprintf("[%s] Resequence from batch %d to %d in data stream", s.LogPrefix(), lastBatch+1, highestBatchInDs))
for _, batch := range batches {
batchJob := NewResequenceBatchJob(batch)
subBatchCount := 0
for batchJob.HasMoreBlockToProcess() {
if err = sequencingBatchStep(s, u, ctx, cfg, historyCfg, batchJob); err != nil {
return err
}

subBatchCount += 1
}

log.Info(fmt.Sprintf("[%s] Resequenced original batch %d with %d batches", s.LogPrefix(), batchJob.batchToProcess[0].BatchNumber, subBatchCount))
if cfg.zk.SequencerResequenceStrict && subBatchCount != 1 {
return fmt.Errorf("strict mode enabled, but resequenced batch %d has %d sub-batches", batchJob.batchToProcess[0].BatchNumber, subBatchCount)
}
}

return nil
return resequence(s, u, ctx, cfg, historyCfg, lastBatch, highestBatchInDs)
}

if cfg.zk.SequencerResequence {
Expand All @@ -104,6 +71,11 @@ func sequencingBatchStep(
log.Info(fmt.Sprintf("[%s] Starting sequencing stage", logPrefix))
defer log.Info(fmt.Sprintf("[%s] Finished sequencing stage", logPrefix))

// at this point of time the datastream could not be ahead of the executor
if err = validateIfDatastreamIsAheadOfExecution(s, ctx, cfg); err != nil {
return err
}

sdb, err := newStageDb(ctx, cfg.db)
if err != nil {
return err
Expand Down
54 changes: 54 additions & 0 deletions zk/stages/stage_sequence_execute_resequence.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package stages

import (
"context"
"fmt"

"github.com/ledgerwatch/log/v3"

"github.com/ledgerwatch/erigon/eth/stagedsync"
)

func resequence(
s *stagedsync.StageState,
u stagedsync.Unwinder,
ctx context.Context,
cfg SequenceBlockCfg,
historyCfg stagedsync.HistoryCfg,
lastBatch, highestBatchInDs uint64,
) (err error) {
if !cfg.zk.SequencerResequence {
panic(fmt.Sprintf("[%s] The node need re-sequencing but this option is disabled.", s.LogPrefix()))
}

log.Info(fmt.Sprintf("[%s] Last batch %d is lower than highest batch in datastream %d, resequencing...", s.LogPrefix(), lastBatch, highestBatchInDs))

batches, err := cfg.datastreamServer.ReadBatches(lastBatch+1, highestBatchInDs)
if err != nil {
return err
}

if err = cfg.datastreamServer.UnwindToBatchStart(lastBatch + 1); err != nil {
return err
}

log.Info(fmt.Sprintf("[%s] Resequence from batch %d to %d in data stream", s.LogPrefix(), lastBatch+1, highestBatchInDs))
for _, batch := range batches {
batchJob := NewResequenceBatchJob(batch)
subBatchCount := 0
for batchJob.HasMoreBlockToProcess() {
if err = sequencingBatchStep(s, u, ctx, cfg, historyCfg, batchJob); err != nil {
return err
}

subBatchCount += 1
}

log.Info(fmt.Sprintf("[%s] Resequenced original batch %d with %d batches", s.LogPrefix(), batchJob.batchToProcess[0].BatchNumber, subBatchCount))
if cfg.zk.SequencerResequenceStrict && subBatchCount != 1 {
return fmt.Errorf("strict mode enabled, but resequenced batch %d has %d sub-batches", batchJob.batchToProcess[0].BatchNumber, subBatchCount)
}
}

return nil
}
31 changes: 31 additions & 0 deletions zk/stages/stage_sequence_execute_utils.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package stages

import (
"context"
"time"

"github.com/c2h5oh/datasize"
Expand Down Expand Up @@ -159,6 +160,36 @@ func (sCfg *SequenceBlockCfg) toErigonExecuteBlockCfg() stagedsync.ExecuteBlockC
)
}

func validateIfDatastreamIsAheadOfExecution(
s *stagedsync.StageState,
// u stagedsync.Unwinder,
ctx context.Context,
cfg SequenceBlockCfg,
// historyCfg stagedsync.HistoryCfg,
) error {
roTx, err := cfg.db.BeginRo(ctx)
if err != nil {
return err
}
defer roTx.Rollback()

executionAt, err := s.ExecutionAt(roTx)
if err != nil {
return err
}

lastDatastreamBlock, err := cfg.datastreamServer.GetHighestBlockNumber()
if err != nil {
return err
}

if executionAt < lastDatastreamBlock {
panic(fmt.Errorf("[%s] Last block in the datastream (%d) is higher than last executed block (%d)", s.LogPrefix(), lastDatastreamBlock, executionAt))
}

return nil
}

type forkDb interface {
GetAllForkHistory() ([]uint64, []uint64, error)
GetLatestForkHistory() (uint64, uint64, error)
Expand Down

0 comments on commit c3ad9bd

Please sign in to comment.