From 4e17fbf6a289a7956cfbfba81ae0ec4cdaac3cc3 Mon Sep 17 00:00:00 2001 From: Valentin Staykov Date: Tue, 29 Oct 2024 12:39:09 +0000 Subject: [PATCH] fix: finish processing blocks on last entry reached --- zk/datastream/client/stream_client.go | 21 +++++++++++++++++++-- zk/stages/stage_batches.go | 7 ------- zk/stages/stage_batches_processor.go | 4 ++-- 3 files changed, 21 insertions(+), 11 deletions(-) diff --git a/zk/datastream/client/stream_client.go b/zk/datastream/client/stream_client.go index 080c004c58c..692e5c2ef94 100644 --- a/zk/datastream/client/stream_client.go +++ b/zk/datastream/client/stream_client.go @@ -543,7 +543,7 @@ LOOP: case *types.BatchEnd: case *types.FullL2Block: parsedProto.ForkId = c.currentFork - log.Trace("writing block to channel", "blockNumber", parsedProto.L2BlockNumber, "batchNumber", parsedProto.BatchNumber) + log.Trace("[Datastream client] writing block to channel", "blockNumber", parsedProto.L2BlockNumber, "batchNumber", parsedProto.BatchNumber) default: return fmt.Errorf("unexpected entry type: %v", parsedProto) } @@ -555,10 +555,26 @@ LOOP: } if c.header.TotalEntries == entryNum+1 { - log.Trace("reached the end of the stream", "header_totalEntries", c.header.TotalEntries, "entryNum", entryNum) + log.Trace("[Datastream client] reached the end of the stream", "header_totalEntries", c.header.TotalEntries, "entryNum", entryNum) if err = c.sendStopCmd(); err != nil { return fmt.Errorf("send stop command: %w", err) } + + retries := 0 + INTERNAL_LOOP: + for { + select { + case c.entryChan <- nil: + break INTERNAL_LOOP + default: + if retries > 5 { + return errors.New("[Datastream client] failed to write final entry to channel after 5 retries") + } + retries++ + log.Warn("[Datastream client] Channel is full, waiting to write nil and end stream client read") + time.Sleep(1 * time.Second) + } + } break LOOP } } @@ -676,6 +692,7 @@ func ReadParsedProto(iterator FileEntryIterator) ( return case types.EntryTypeL2BlockEnd: log.Debug(fmt.Sprintf("retrieved EntryTypeL2BlockEnd: %+v", file)) + parsedEntry, err = types.UnmarshalL2BlockEnd(file.Data) return case types.EntryTypeL2Tx: err = errors.New("unexpected L2 tx entry, found outside of block") diff --git a/zk/stages/stage_batches.go b/zk/stages/stage_batches.go index e52e0996055..478dc6f3a9b 100644 --- a/zk/stages/stage_batches.go +++ b/zk/stages/stage_batches.go @@ -287,13 +287,6 @@ func SpawnStageBatches( time.Sleep(1 * time.Second) } - // if ds end reached check again for new blocks in the stream - // if there are too many new blocks get them as well before ending stage - if batchProcessor.LastBlockHeight() >= highestDSL2Block.L2BlockNumber { - log.Info(fmt.Sprintf("[%s] Reached the end of the datastream", logPrefix), "datastreamBlock", highestDSL2Block.L2BlockNumber, "lastWrittenBlock", batchProcessor.LastBlockHeight()) - endLoop = true - } - if endLoop { log.Info(fmt.Sprintf("[%s] Total blocks written: %d", logPrefix, batchProcessor.TotalBlocksWritten())) break diff --git a/zk/stages/stage_batches_processor.go b/zk/stages/stage_batches_processor.go index 4fadc012a63..5f5558a1be5 100644 --- a/zk/stages/stage_batches_processor.go +++ b/zk/stages/stage_batches_processor.go @@ -147,8 +147,8 @@ func (p *BatchesProcessor) ProcessEntry(entry interface{}) (endLoop bool, err er return p.processFullBlock(entry) case *types.GerUpdate: return false, p.processGerUpdate(entry) - case nil: - return false, nil + case nil: // we use nil to indicate the end of stream read + return true, nil default: return false, fmt.Errorf("unknown entry type: %T", entry) }