diff --git a/zk/datastream/client/stream_client.go b/zk/datastream/client/stream_client.go index 3072ff5841e..4305ccb2b36 100644 --- a/zk/datastream/client/stream_client.go +++ b/zk/datastream/client/stream_client.go @@ -428,9 +428,6 @@ func (c *StreamClient) ReadAllEntriesToChannel() (err error) { return fmt.Errorf("[Datastream client] Context done - stopping") default: } - if count > 5 { - return errors.New("failed to read all entries within 5 attempts") - } if connected { if err, socketErr = c.readAllEntriesToChannel(); err != nil { return err @@ -457,6 +454,7 @@ func (c *StreamClient) handleSocketError(socketErr error) bool { return false } + c.streaming.Store(false) c.RenewEntryChannel() return true diff --git a/zk/stages/stage_batches.go b/zk/stages/stage_batches.go index 030803d32f3..73aa3ae93dd 100644 --- a/zk/stages/stage_batches.go +++ b/zk/stages/stage_batches.go @@ -212,12 +212,16 @@ func SpawnStageBatches( time.Sleep(1 * time.Second) } + log.Debug(fmt.Sprintf("[%s] Highest block in db and datastream", logPrefix), "datastreamBlock", highestDSL2Block.L2BlockNumber, "dbBlock", stageProgressBlockNo) + unwindFn := func(unwindBlock uint64) (uint64, error) { + return rollback(logPrefix, eriDb, hermezDb, dsQueryClient, unwindBlock, tx, u) + } if highestDSL2Block.L2BlockNumber < stageProgressBlockNo { - stageProgressBlockNo = highestDSL2Block.L2BlockNumber + log.Info(fmt.Sprintf("[%s] Datastream behind, unwinding", logPrefix)) + unwindFn(highestDSL2Block.L2BlockNumber) + return nil } - log.Debug(fmt.Sprintf("[%s] Highest block in db and datastream", logPrefix), "datastreamBlock", highestDSL2Block.L2BlockNumber, "dbBlock", stageProgressBlockNo) - dsClientProgress := dsQueryClient.GetProgressAtomic() dsClientProgress.Swap(stageProgressBlockNo) @@ -243,10 +247,6 @@ func SpawnStageBatches( log.Info(fmt.Sprintf("[%s] Reading blocks from the datastream.", logPrefix)) - unwindFn := func(unwindBlock uint64) (uint64, error) { - return rollback(logPrefix, eriDb, hermezDb, dsQueryClient, unwindBlock, tx, u) - } - lastProcessedBlockHash, err := eriDb.ReadCanonicalHash(stageProgressBlockNo) if err != nil { return fmt.Errorf("failed to read canonical hash for block %d: %w", stageProgressBlockNo, err) diff --git a/zk/stages/stage_batches_processor.go b/zk/stages/stage_batches_processor.go index 58b363df3de..f2028d7c1f1 100644 --- a/zk/stages/stage_batches_processor.go +++ b/zk/stages/stage_batches_processor.go @@ -234,26 +234,25 @@ func (p *BatchesProcessor) processFullBlock(blockEntry *types.FullL2Block) (endL } // skip but warn on already processed blocks if blockEntry.L2BlockNumber <= p.stageProgressBlockNo { - if blockEntry.L2BlockNumber < p.stageProgressBlockNo { + dbBatchNum, err := p.hermezDb.GetBatchNoByL2Block(blockEntry.L2BlockNumber) + if err != nil { + return false, err + } + + if blockEntry.L2BlockNumber == p.stageProgressBlockNo && dbBatchNum == blockEntry.BatchNumber { // only warn if the block is very old, we expect the very latest block to be requested // when the stage is fired up for the first time log.Warn(fmt.Sprintf("[%s] Skipping block %d, already processed", p.logPrefix, blockEntry.L2BlockNumber)) + return false, nil } - dbBatchNum, err := p.hermezDb.GetBatchNoByL2Block(blockEntry.L2BlockNumber) - if err != nil { + // if the block is older or the batch number is different, we need to unwind because the block has definately changed + log.Warn(fmt.Sprintf("[%s] Block already processed. Triggering unwind...", p.logPrefix), + "block", blockEntry.L2BlockNumber, "ds batch", blockEntry.BatchNumber, "db batch", dbBatchNum) + if _, err := p.unwind(blockEntry.L2BlockNumber); err != nil { return false, err } - - if blockEntry.BatchNumber > dbBatchNum { - // if the batch number is higher than the one we know about, it means that we need to trigger an unwinding of blocks - log.Warn(fmt.Sprintf("[%s] Batch number mismatch detected. Triggering unwind...", p.logPrefix), - "block", blockEntry.L2BlockNumber, "ds batch", blockEntry.BatchNumber, "db batch", dbBatchNum) - if _, err := p.unwind(blockEntry.L2BlockNumber); err != nil { - return false, err - } - } - return false, nil + return false, ErrorTriggeredUnwind } var dbParentBlockHash common.Hash