Skip to content

Commit

Permalink
fix: unwind on ds block unwind
Browse files Browse the repository at this point in the history
  • Loading branch information
V-Staykov committed Oct 30, 2024
1 parent 48c769a commit d5f3b6e
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 23 deletions.
4 changes: 1 addition & 3 deletions zk/datastream/client/stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,9 +428,6 @@ func (c *StreamClient) ReadAllEntriesToChannel() (err error) {
return fmt.Errorf("[Datastream client] Context done - stopping")
default:
}
if count > 5 {
return errors.New("failed to read all entries within 5 attempts")
}
if connected {
if err, socketErr = c.readAllEntriesToChannel(); err != nil {
return err
Expand All @@ -457,6 +454,7 @@ func (c *StreamClient) handleSocketError(socketErr error) bool {
return false
}

c.streaming.Store(false)
c.RenewEntryChannel()

return true
Expand Down
14 changes: 7 additions & 7 deletions zk/stages/stage_batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,12 +212,16 @@ func SpawnStageBatches(
time.Sleep(1 * time.Second)
}

log.Debug(fmt.Sprintf("[%s] Highest block in db and datastream", logPrefix), "datastreamBlock", highestDSL2Block.L2BlockNumber, "dbBlock", stageProgressBlockNo)
unwindFn := func(unwindBlock uint64) (uint64, error) {
return rollback(logPrefix, eriDb, hermezDb, dsQueryClient, unwindBlock, tx, u)
}
if highestDSL2Block.L2BlockNumber < stageProgressBlockNo {
stageProgressBlockNo = highestDSL2Block.L2BlockNumber
log.Info(fmt.Sprintf("[%s] Datastream behind, unwinding", logPrefix))
unwindFn(highestDSL2Block.L2BlockNumber)
return nil
}

log.Debug(fmt.Sprintf("[%s] Highest block in db and datastream", logPrefix), "datastreamBlock", highestDSL2Block.L2BlockNumber, "dbBlock", stageProgressBlockNo)

dsClientProgress := dsQueryClient.GetProgressAtomic()
dsClientProgress.Swap(stageProgressBlockNo)

Expand All @@ -243,10 +247,6 @@ func SpawnStageBatches(

log.Info(fmt.Sprintf("[%s] Reading blocks from the datastream.", logPrefix))

unwindFn := func(unwindBlock uint64) (uint64, error) {
return rollback(logPrefix, eriDb, hermezDb, dsQueryClient, unwindBlock, tx, u)
}

lastProcessedBlockHash, err := eriDb.ReadCanonicalHash(stageProgressBlockNo)
if err != nil {
return fmt.Errorf("failed to read canonical hash for block %d: %w", stageProgressBlockNo, err)
Expand Down
25 changes: 12 additions & 13 deletions zk/stages/stage_batches_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,26 +234,25 @@ func (p *BatchesProcessor) processFullBlock(blockEntry *types.FullL2Block) (endL
}
// skip but warn on already processed blocks
if blockEntry.L2BlockNumber <= p.stageProgressBlockNo {
if blockEntry.L2BlockNumber < p.stageProgressBlockNo {
dbBatchNum, err := p.hermezDb.GetBatchNoByL2Block(blockEntry.L2BlockNumber)
if err != nil {
return false, err
}

if blockEntry.L2BlockNumber == p.stageProgressBlockNo && dbBatchNum == blockEntry.BatchNumber {
// only warn if the block is very old, we expect the very latest block to be requested
// when the stage is fired up for the first time
log.Warn(fmt.Sprintf("[%s] Skipping block %d, already processed", p.logPrefix, blockEntry.L2BlockNumber))
return false, nil
}

dbBatchNum, err := p.hermezDb.GetBatchNoByL2Block(blockEntry.L2BlockNumber)
if err != nil {
// if the block is older or the batch number is different, we need to unwind because the block has definately changed
log.Warn(fmt.Sprintf("[%s] Block already processed. Triggering unwind...", p.logPrefix),
"block", blockEntry.L2BlockNumber, "ds batch", blockEntry.BatchNumber, "db batch", dbBatchNum)
if _, err := p.unwind(blockEntry.L2BlockNumber); err != nil {
return false, err
}

if blockEntry.BatchNumber > dbBatchNum {
// if the batch number is higher than the one we know about, it means that we need to trigger an unwinding of blocks
log.Warn(fmt.Sprintf("[%s] Batch number mismatch detected. Triggering unwind...", p.logPrefix),
"block", blockEntry.L2BlockNumber, "ds batch", blockEntry.BatchNumber, "db batch", dbBatchNum)
if _, err := p.unwind(blockEntry.L2BlockNumber); err != nil {
return false, err
}
}
return false, nil
return false, ErrorTriggeredUnwind
}

var dbParentBlockHash common.Hash
Expand Down

0 comments on commit d5f3b6e

Please sign in to comment.