Skip to content

Commit

Permalink
fix: finish processing blocks on last entry reached
Browse files Browse the repository at this point in the history
  • Loading branch information
V-Staykov committed Oct 29, 2024
1 parent bab79a0 commit 4e17fbf
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 11 deletions.
21 changes: 19 additions & 2 deletions zk/datastream/client/stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ LOOP:
case *types.BatchEnd:
case *types.FullL2Block:
parsedProto.ForkId = c.currentFork
log.Trace("writing block to channel", "blockNumber", parsedProto.L2BlockNumber, "batchNumber", parsedProto.BatchNumber)
log.Trace("[Datastream client] writing block to channel", "blockNumber", parsedProto.L2BlockNumber, "batchNumber", parsedProto.BatchNumber)
default:
return fmt.Errorf("unexpected entry type: %v", parsedProto)
}
Expand All @@ -555,10 +555,26 @@ LOOP:
}

if c.header.TotalEntries == entryNum+1 {
log.Trace("reached the end of the stream", "header_totalEntries", c.header.TotalEntries, "entryNum", entryNum)
log.Trace("[Datastream client] reached the end of the stream", "header_totalEntries", c.header.TotalEntries, "entryNum", entryNum)
if err = c.sendStopCmd(); err != nil {
return fmt.Errorf("send stop command: %w", err)
}

retries := 0
INTERNAL_LOOP:
for {
select {
case c.entryChan <- nil:
break INTERNAL_LOOP
default:
if retries > 5 {
return errors.New("[Datastream client] failed to write final entry to channel after 5 retries")
}
retries++
log.Warn("[Datastream client] Channel is full, waiting to write nil and end stream client read")
time.Sleep(1 * time.Second)
}
}
break LOOP
}
}
Expand Down Expand Up @@ -676,6 +692,7 @@ func ReadParsedProto(iterator FileEntryIterator) (
return
case types.EntryTypeL2BlockEnd:
log.Debug(fmt.Sprintf("retrieved EntryTypeL2BlockEnd: %+v", file))
parsedEntry, err = types.UnmarshalL2BlockEnd(file.Data)
return
case types.EntryTypeL2Tx:
err = errors.New("unexpected L2 tx entry, found outside of block")
Expand Down
7 changes: 0 additions & 7 deletions zk/stages/stage_batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,13 +287,6 @@ func SpawnStageBatches(
time.Sleep(1 * time.Second)
}

// if ds end reached check again for new blocks in the stream
// if there are too many new blocks get them as well before ending stage
if batchProcessor.LastBlockHeight() >= highestDSL2Block.L2BlockNumber {
log.Info(fmt.Sprintf("[%s] Reached the end of the datastream", logPrefix), "datastreamBlock", highestDSL2Block.L2BlockNumber, "lastWrittenBlock", batchProcessor.LastBlockHeight())
endLoop = true
}

if endLoop {
log.Info(fmt.Sprintf("[%s] Total blocks written: %d", logPrefix, batchProcessor.TotalBlocksWritten()))
break
Expand Down
4 changes: 2 additions & 2 deletions zk/stages/stage_batches_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ func (p *BatchesProcessor) ProcessEntry(entry interface{}) (endLoop bool, err er
return p.processFullBlock(entry)
case *types.GerUpdate:
return false, p.processGerUpdate(entry)
case nil:
return false, nil
case nil: // we use nil to indicate the end of stream read
return true, nil
default:
return false, fmt.Errorf("unknown entry type: %T", entry)
}
Expand Down

0 comments on commit 4e17fbf

Please sign in to comment.