From 7177a0b8b918b64b284a7ed66969af872c12ac63 Mon Sep 17 00:00:00 2001 From: Valentin Staykov <79150443+V-Staykov@users.noreply.github.com> Date: Mon, 7 Oct 2024 11:43:27 +0300 Subject: [PATCH] refactor: extract stage batches logic into batches processor (#1251) * refactor: extract stage batches logic into batches processor * refactor: rename batches_processor to stage_batches_processor * fix(stage_batches): set new db tx upon progress save * refactor: create datastream routine runner * fix: bug with entry channe closing and hash check * fix: typo --- zk/datastream/client/stream_client.go | 40 +- .../test_datastream_compare.go | 2 +- zk/erigon_db/db.go | 4 + zk/hermez_db/db.go | 5 + zk/stages/stage_batches.go | 507 +++--------------- zk/stages/stage_batches_datastream.go | 108 ++++ zk/stages/stage_batches_processor.go | 485 +++++++++++++++++ zk/stages/stage_batches_test.go | 2 +- zk/stages/stages.go | 2 +- zk/stages/test_utils.go | 4 +- 10 files changed, 706 insertions(+), 453 deletions(-) create mode 100644 zk/stages/stage_batches_datastream.go create mode 100644 zk/stages/stage_batches_processor.go diff --git a/zk/datastream/client/stream_client.go b/zk/datastream/client/stream_client.go index 461b3d9371c..ba85dd8c8ff 100644 --- a/zk/datastream/client/stream_client.go +++ b/zk/datastream/client/stream_client.go @@ -27,6 +27,7 @@ type EntityDefinition struct { const ( versionProto = 2 // converted to proto versionAddedBlockEnd = 3 // Added block end + entryChannelSize = 100000 ) var ( @@ -44,9 +45,10 @@ type StreamClient struct { checkTimeout time.Duration // time to wait for data before reporting an error // atomic - lastWrittenTime atomic.Int64 - streaming atomic.Bool - progress atomic.Uint64 + lastWrittenTime atomic.Int64 + streaming atomic.Bool + progress atomic.Uint64 + stopReadingToChannel atomic.Bool // Channels entryChan chan interface{} @@ -88,8 +90,8 @@ func (c *StreamClient) IsVersion3() bool { return c.version >= versionAddedBlockEnd } -func (c *StreamClient) GetEntryChan() chan interface{} { - return c.entryChan +func (c *StreamClient) GetEntryChan() *chan interface{} { + return &c.entryChan } // GetL2BlockByNumber queries the data stream by sending the L2 block start bookmark for the certain block number @@ -227,7 +229,7 @@ func (c *StreamClient) Stop() { c.conn.Close() c.conn = nil - close(c.entryChan) + c.clearEntryCHannel() } // Command header: Get status @@ -323,12 +325,29 @@ func (c *StreamClient) ExecutePerFile(bookmark *types.BookmarkProto, function fu return nil } +func (c *StreamClient) clearEntryCHannel() { + select { + case <-c.entryChan: + close(c.entryChan) + for range c.entryChan { + } + default: + } +} + +// close old entry chan and read all elements before opening a new one +func (c *StreamClient) renewEntryChannel() { + c.clearEntryCHannel() + c.entryChan = make(chan interface{}, entryChannelSize) +} + func (c *StreamClient) EnsureConnected() (bool, error) { if c.conn == nil { if err := c.tryReConnect(); err != nil { return false, fmt.Errorf("failed to reconnect the datastream client: %w", err) } - c.entryChan = make(chan interface{}, 100000) + + c.renewEntryChannel() } return true, nil @@ -368,9 +387,6 @@ func (c *StreamClient) ReadAllEntriesToChannel() error { c.conn = nil } - // reset the channels as there could be data ahead of the bookmark we want to track here. - // c.resetChannels() - return err2 } @@ -474,6 +490,10 @@ func (c *StreamClient) tryReConnect() error { return err } +func (c *StreamClient) StopReadingToChannel() { + c.stopReadingToChannel.Store(true) +} + type FileEntryIterator interface { NextFileEntry() (*types.FileEntry, error) } diff --git a/zk/datastream/test/data_stream_compare/test_datastream_compare.go b/zk/datastream/test/data_stream_compare/test_datastream_compare.go index d5093a482c9..9cc63c3d348 100644 --- a/zk/datastream/test/data_stream_compare/test_datastream_compare.go +++ b/zk/datastream/test/data_stream_compare/test_datastream_compare.go @@ -81,7 +81,7 @@ func readFromClient(client *client.StreamClient, total int) ([]interface{}, erro LOOP: for { - entry := <-client.GetEntryChan() + entry := <-*client.GetEntryChan() switch entry.(type) { case types.FullL2Block: diff --git a/zk/erigon_db/db.go b/zk/erigon_db/db.go index a944408e22d..b3df0e88709 100644 --- a/zk/erigon_db/db.go +++ b/zk/erigon_db/db.go @@ -29,6 +29,10 @@ func NewErigonDb(tx kv.RwTx) *ErigonDb { } } +func (db *ErigonDb) SetNewTx(tx kv.RwTx) { + db.tx = tx +} + func (db ErigonDb) WriteHeader( blockNo *big.Int, blockHash common.Hash, diff --git a/zk/hermez_db/db.go b/zk/hermez_db/db.go index 429ba5e72c5..bbdee1a8cef 100644 --- a/zk/hermez_db/db.go +++ b/zk/hermez_db/db.go @@ -112,6 +112,11 @@ func NewHermezDb(tx kv.RwTx) *HermezDb { return db } +func (db *HermezDb) SetNewTx(tx kv.RwTx) { + db.tx = tx + db.HermezDbReader.tx = tx +} + func CreateHermezBuckets(tx kv.RwTx) error { for _, t := range HermezDbTables { if err := tx.CreateBucket(t); err != nil { diff --git a/zk/stages/stage_batches.go b/zk/stages/stage_batches.go index 2409178db41..f7cbffad6b1 100644 --- a/zk/stages/stage_batches.go +++ b/zk/stages/stage_batches.go @@ -20,19 +20,18 @@ import ( "github.com/ledgerwatch/erigon/zk/erigon_db" "github.com/ledgerwatch/erigon/zk/hermez_db" "github.com/ledgerwatch/erigon/zk/sequencer" - txtype "github.com/ledgerwatch/erigon/zk/tx" "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/state" "github.com/ledgerwatch/erigon/eth/ethconfig" "github.com/ledgerwatch/erigon/zk/datastream/client" - "github.com/ledgerwatch/erigon/zk/utils" "github.com/ledgerwatch/log/v3" ) const ( - HIGHEST_KNOWN_FORK = 12 - STAGE_PROGRESS_SAVE = 3000000 + HIGHEST_KNOWN_FORK = 12 + STAGE_PROGRESS_SAVE = 3000000 + NEW_BLOCKS_ON_DS_LIMIT = 10000 ) var ( @@ -46,40 +45,23 @@ type ErigonDb interface { } type HermezDb interface { - WriteForkId(batchNumber uint64, forkId uint64) error - WriteForkIdBlockOnce(forkId, blockNum uint64) error - WriteBlockBatch(l2BlockNumber uint64, batchNumber uint64) error - WriteEffectiveGasPricePercentage(txHash common.Hash, effectiveGasPricePercentage uint8) error - DeleteEffectiveGasPricePercentages(txHashes *[]common.Hash) error - - WriteStateRoot(l2BlockNumber uint64, rpcRoot common.Hash) error - DeleteForkIds(fromBatchNum, toBatchNum uint64) error DeleteBlockBatches(fromBlockNum, toBlockNum uint64) error - CheckGlobalExitRootWritten(ger common.Hash) (bool, error) - WriteBlockGlobalExitRoot(l2BlockNo uint64, ger common.Hash) error - WriteGlobalExitRoot(ger common.Hash) error DeleteBlockGlobalExitRoots(fromBlockNum, toBlockNum uint64) error DeleteGlobalExitRoots(l1BlockHashes *[]common.Hash) error - WriteReusedL1InfoTreeIndex(l2BlockNo uint64) error DeleteReusedL1InfoTreeIndexes(fromBlockNum, toBlockNum uint64) error - WriteBlockL1BlockHash(l2BlockNo uint64, l1BlockHash common.Hash) error DeleteBlockL1BlockHashes(fromBlockNum, toBlockNum uint64) error - WriteBatchGlobalExitRoot(batchNumber uint64, ger *types.GerUpdate) error - WriteIntermediateTxStateRoot(l2BlockNumber uint64, txHash common.Hash, rpcRoot common.Hash) error WriteBlockL1InfoTreeIndex(blockNumber uint64, l1Index uint64) error WriteBlockL1InfoTreeIndexProgress(blockNumber uint64, l1Index uint64) error - WriteLatestUsedGer(blockNo uint64, ger common.Hash) error } type DatastreamClient interface { ReadAllEntriesToChannel() error - GetEntryChan() chan interface{} + GetEntryChan() *chan interface{} GetL2BlockByNumber(blockNum uint64) (*types.FullL2Block, int, error) GetLatestL2Block() (*types.FullL2Block, error) - GetLastWrittenTimeAtomic() *atomic.Int64 GetStreamingAtomic() *atomic.Bool GetProgressAtomic() *atomic.Uint64 EnsureConnected() (bool, error) @@ -87,6 +69,12 @@ type DatastreamClient interface { Stop() } +type DatastreamReadRunner interface { + StartRead() + StopRead() + RestartReadFromBlock(fromBlock uint64) +} + type dsClientCreatorHandler func(context.Context, *ethconfig.Zk, uint64) (DatastreamClient, error) type BatchesCfg struct { @@ -129,7 +117,6 @@ func SpawnStageBatches( ctx context.Context, tx kv.RwTx, cfg BatchesCfg, - quiet bool, ) error { logPrefix := s.LogPrefix() log.Info(fmt.Sprintf("[%s] Starting batches stage", logPrefix)) @@ -170,11 +157,6 @@ func SpawnStageBatches( return fmt.Errorf("get batch no by l2 block error: %v", err) } - highestVerifiedBatch, err := stages.GetStageProgress(tx, stages.L1VerificationsBatchNo) - if err != nil { - return errors.New("could not retrieve l1 verifications batch no progress") - } - startSyncTime := time.Now() latestForkId, err := stages.GetStageProgress(tx, stages.ForkId) @@ -198,86 +180,52 @@ func SpawnStageBatches( stageProgressBlockNo = highestDSL2Block.L2BlockNumber } - log.Debug(fmt.Sprintf("[%s] Highest block in datastream", logPrefix), "block", highestDSL2Block.L2BlockNumber) - log.Debug(fmt.Sprintf("[%s] Highest block in db", logPrefix), "block", stageProgressBlockNo) + log.Debug(fmt.Sprintf("[%s] Highest block in db and datastream", logPrefix), "datastreamBlock", highestDSL2Block.L2BlockNumber, "dbBlock", stageProgressBlockNo) dsClientProgress := cfg.dsClient.GetProgressAtomic() dsClientProgress.Store(stageProgressBlockNo) - // start routine to download blocks and push them in a channel - if !cfg.dsClient.GetStreamingAtomic().Load() { - log.Info(fmt.Sprintf("[%s] Starting stream", logPrefix), "startBlock", stageProgressBlockNo) - // this will download all blocks from datastream and push them in a channel - // if no error, break, else continue trying to get them - // Create bookmark - - connected := false - for i := 0; i < 5; i++ { - connected, err = cfg.dsClient.EnsureConnected() - if err != nil { - log.Error(fmt.Sprintf("[%s] Error connecting to datastream", logPrefix), "error", err) - continue - } - if connected { - break - } - } - - go func() { - log.Info(fmt.Sprintf("[%s] Started downloading L2Blocks routine", logPrefix)) - defer log.Info(fmt.Sprintf("[%s] Finished downloading L2Blocks routine", logPrefix)) - - if connected { - if err := cfg.dsClient.ReadAllEntriesToChannel(); err != nil { - log.Error(fmt.Sprintf("[%s] Error downloading blocks from datastream", logPrefix), "error", err) - } - } - }() - } // start a routine to print blocks written progress progressChan, stopProgressPrinter := zk.ProgressPrinterWithoutTotal(fmt.Sprintf("[%s] Downloaded blocks from datastream progress", logPrefix)) defer stopProgressPrinter() - lastBlockHeight := stageProgressBlockNo - highestSeenBatchNo := stageProgressBatchNo - endLoop := false - blocksWritten := uint64(0) - highestHashableL2BlockNo := uint64(0) - _, highestL1InfoTreeIndex, err := hermezDb.GetLatestBlockL1InfoTreeIndexProgress() if err != nil { return fmt.Errorf("failed to get highest used l1 info index, %w", err) } - lastForkId, err := stages.GetStageProgress(tx, stages.ForkId) - if err != nil { - return fmt.Errorf("failed to get last fork id, %w", err) - } - stageExecProgress, err := stages.GetStageProgress(tx, stages.Execution) if err != nil { return fmt.Errorf("failed to get stage exec progress, %w", err) } // just exit the stage early if there is more execution work to do - if stageExecProgress < lastBlockHeight { + if stageExecProgress < stageProgressBlockNo { log.Info(fmt.Sprintf("[%s] Execution behind, skipping stage", logPrefix)) return nil } - lastHash := emptyHash - lastBlockRoot := emptyHash - atLeastOneBlockWritten := false - startTime := time.Now() - log.Info(fmt.Sprintf("[%s] Reading blocks from the datastream.", logPrefix)) + unwindFn := func(unwindBlock uint64) error { + return rollback(logPrefix, eriDb, hermezDb, dsQueryClient, unwindBlock, tx, u) + } + + batchProcessor, err := NewBatchesProcessor(ctx, logPrefix, tx, hermezDb, eriDb, cfg.zkCfg.SyncLimit, cfg.zkCfg.DebugLimit, cfg.zkCfg.DebugStepAfter, cfg.zkCfg.DebugStep, stageProgressBlockNo, stageProgressBatchNo, dsQueryClient, progressChan, unwindFn) + if err != nil { + return err + } + + // start routine to download blocks and push them in a channel + dsClientRunner := NewDatastreamClientRunner(cfg.dsClient, logPrefix) + dsClientRunner.StartRead() + defer dsClientRunner.StartRead() + entryChan := cfg.dsClient.GetEntryChan() - lastWrittenTimeAtomic := cfg.dsClient.GetLastWrittenTimeAtomic() - streamingAtomic := cfg.dsClient.GetStreamingAtomic() - prevAmountBlocksWritten := blocksWritten -LOOP: + prevAmountBlocksWritten, restartDatastreamBlock := uint64(0), uint64(0) + endLoop := false + for { // get batch start and use to update forkid // get block @@ -285,249 +233,48 @@ LOOP: // if download routine finished, should continue to read from channel until it's empty // if both download routine stopped and channel empty - stop loop select { - case entry := <-entryChan: - switch entry := entry.(type) { - case *types.BatchStart: - // check if the batch is invalid so that we can replicate this over in the stream - // when we re-populate it - if entry.BatchType == types.BatchTypeInvalid { - if err = hermezDb.WriteInvalidBatch(entry.Number); err != nil { - return err - } - // we need to write the fork here as well because the batch will never get processed as it is invalid - // but, we need it re-populate our own stream - if err = hermezDb.WriteForkId(entry.Number, entry.ForkId); err != nil { - return err - } - } - case *types.BatchEnd: - if entry.StateRoot != lastBlockRoot { - log.Debug(fmt.Sprintf("[%s] batch end state root mismatches last block's: %x, expected: %x", logPrefix, entry.StateRoot, lastBlockRoot)) - } - // keep a record of the last block processed when we receive the batch end - if err = hermezDb.WriteBatchEnd(lastBlockHeight); err != nil { - return err - } - case *types.FullL2Block: - log.Debug(fmt.Sprintf("[%s] Retrieved %d (%s) block from stream", logPrefix, entry.L2BlockNumber, entry.L2Blockhash.String())) - if cfg.zkCfg.SyncLimit > 0 && entry.L2BlockNumber >= cfg.zkCfg.SyncLimit { - // stop the node going into a crazy loop - time.Sleep(2 * time.Second) - break LOOP - } - - // handle batch boundary changes - we do this here instead of reading the batch start channel because - // channels can be read in random orders which then creates problems in detecting fork changes during - // execution - if entry.BatchNumber > highestSeenBatchNo && lastForkId < entry.ForkId { - if entry.ForkId > HIGHEST_KNOWN_FORK { - message := fmt.Sprintf("unsupported fork id %v received from the data stream", entry.ForkId) - panic(message) - } - err = stages.SaveStageProgress(tx, stages.ForkId, entry.ForkId) - if err != nil { - return fmt.Errorf("save stage progress error: %v", err) - } - lastForkId = entry.ForkId - err = hermezDb.WriteForkId(entry.BatchNumber, entry.ForkId) - if err != nil { - return fmt.Errorf("write fork id error: %v", err) - } - // NOTE (RPC): avoided use of 'writeForkIdBlockOnce' by reading instead batch by forkId, and then lowest block number in batch - } - - // ignore genesis or a repeat of the last block - if entry.L2BlockNumber == 0 { - continue - } - // skip but warn on already processed blocks - if entry.L2BlockNumber <= stageProgressBlockNo { - if entry.L2BlockNumber < stageProgressBlockNo { - // only warn if the block is very old, we expect the very latest block to be requested - // when the stage is fired up for the first time - log.Warn(fmt.Sprintf("[%s] Skipping block %d, already processed", logPrefix, entry.L2BlockNumber)) - } - - dbBatchNum, err := hermezDb.GetBatchNoByL2Block(entry.L2BlockNumber) - if err != nil { - return err - } - - if entry.BatchNumber > dbBatchNum { - // if the batch number is higher than the one we know about, it means that we need to trigger an unwinding of blocks - log.Warn(fmt.Sprintf("[%s] Batch number mismatch detected. Triggering unwind...", logPrefix), - "block", entry.L2BlockNumber, "ds batch", entry.BatchNumber, "db batch", dbBatchNum) - if err := rollback(logPrefix, eriDb, hermezDb, dsQueryClient, entry.L2BlockNumber, tx, u); err != nil { - return err - } - cfg.dsClient.Stop() - return nil - } - continue - } - - var dbParentBlockHash common.Hash - if entry.L2BlockNumber > 0 { - dbParentBlockHash, err = eriDb.ReadCanonicalHash(entry.L2BlockNumber - 1) - if err != nil { - return fmt.Errorf("failed to retrieve parent block hash for datastream block %d: %w", - entry.L2BlockNumber, err) - } - } - - dsParentBlockHash := lastHash - if dsParentBlockHash == emptyHash { - parentBlockDS, _, err := dsQueryClient.GetL2BlockByNumber(entry.L2BlockNumber - 1) - if err != nil { - return err - } - - if parentBlockDS != nil { - dsParentBlockHash = parentBlockDS.L2Blockhash - } - } - - if dbParentBlockHash != dsParentBlockHash { - // unwind/rollback blocks until the latest common ancestor block - log.Warn(fmt.Sprintf("[%s] Parent block hashes mismatch on block %d. Triggering unwind...", logPrefix, entry.L2BlockNumber), - "db parent block hash", dbParentBlockHash, "ds parent block hash", dsParentBlockHash) - if err := rollback(logPrefix, eriDb, hermezDb, dsQueryClient, entry.L2BlockNumber, tx, u); err != nil { - return err - } - cfg.dsClient.Stop() - return nil - } - - // skip if we already have this block - if entry.L2BlockNumber < lastBlockHeight+1 { - log.Warn(fmt.Sprintf("[%s] Skipping block %d, already processed", logPrefix, entry.L2BlockNumber)) - continue - } - - // check for sequential block numbers - if entry.L2BlockNumber > lastBlockHeight+1 { - log.Warn(fmt.Sprintf("[%s] Stream skipped ahead, unwinding to block %d", logPrefix, entry.L2BlockNumber)) - badBlock, err := eriDb.ReadCanonicalHash(entry.L2BlockNumber) - if err != nil { - return fmt.Errorf("failed to get bad block: %v", err) - } - u.UnwindTo(entry.L2BlockNumber, badBlock) - return nil - } - - // batch boundary - record the highest hashable block number (last block in last full batch) - if entry.BatchNumber > highestSeenBatchNo { - highestHashableL2BlockNo = entry.L2BlockNumber - 1 - } - highestSeenBatchNo = entry.BatchNumber - - /////// DEBUG BISECTION /////// - // exit stage when debug bisection flags set and we're at the limit block - if cfg.zkCfg.DebugLimit > 0 && entry.L2BlockNumber > cfg.zkCfg.DebugLimit { - fmt.Printf("[%s] Debug limit reached, stopping stage\n", logPrefix) - endLoop = true - } - - // if we're above StepAfter, and we're at a step, move the stages on - if cfg.zkCfg.DebugStep > 0 && cfg.zkCfg.DebugStepAfter > 0 && entry.L2BlockNumber > cfg.zkCfg.DebugStepAfter { - if entry.L2BlockNumber%cfg.zkCfg.DebugStep == 0 { - fmt.Printf("[%s] Debug step reached, stopping stage\n", logPrefix) - endLoop = true - } - } - /////// END DEBUG BISECTION /////// - - // store our finalized state if this batch matches the highest verified batch number on the L1 - if entry.BatchNumber == highestVerifiedBatch { - rawdb.WriteForkchoiceFinalized(tx, entry.L2Blockhash) - } - - if lastHash != emptyHash { - entry.ParentHash = lastHash - } else { - // first block in the loop so read the parent hash - previousHash, err := eriDb.ReadCanonicalHash(entry.L2BlockNumber - 1) - if err != nil { - return fmt.Errorf("failed to get genesis header: %v", err) - } - entry.ParentHash = previousHash - } - - if err := writeL2Block(eriDb, hermezDb, entry, highestL1InfoTreeIndex); err != nil { - return fmt.Errorf("writeL2Block error: %v", err) - } - dsClientProgress.Store(entry.L2BlockNumber) - - // make sure to capture the l1 info tree index changes so we can store progress - if uint64(entry.L1InfoTreeIndex) > highestL1InfoTreeIndex { - highestL1InfoTreeIndex = uint64(entry.L1InfoTreeIndex) - } - - lastHash = entry.L2Blockhash - lastBlockRoot = entry.StateRoot - - atLeastOneBlockWritten = true - lastBlockHeight = entry.L2BlockNumber - blocksWritten++ - progressChan <- blocksWritten - - if endLoop && cfg.zkCfg.DebugLimit > 0 { - break LOOP - } - case *types.GerUpdate: - if entry.GlobalExitRoot == emptyHash { - log.Warn(fmt.Sprintf("[%s] Skipping GER update with empty root", logPrefix)) - break - } + case entry := <-*entryChan: + if restartDatastreamBlock, endLoop, err = batchProcessor.ProcessEntry(entry); err != nil { + return err + } + dsClientProgress.Store(batchProcessor.LastBlockHeight()) - // NB: we won't get these post Etrog (fork id 7) - if err := hermezDb.WriteBatchGlobalExitRoot(entry.BatchNumber, entry); err != nil { - return fmt.Errorf("write batch global exit root error: %v", err) - } + if restartDatastreamBlock > 0 { + dsClientRunner.RestartReadFromBlock(restartDatastreamBlock) } case <-ctx.Done(): log.Warn(fmt.Sprintf("[%s] Context done", logPrefix)) endLoop = true default: - if atLeastOneBlockWritten { - // first check to see if anything has come in from the stream yet, if it has then wait a little longer - // because there could be more. - // if no blocks available should and time since last block written is > 500ms - // consider that we are at the tip and blocks come in the datastream as they are produced - // stop the current iteration of the stage - lastWrittenTs := lastWrittenTimeAtomic.Load() - timePassedAfterlastBlock := time.Since(time.Unix(0, lastWrittenTs)) - if timePassedAfterlastBlock > cfg.zkCfg.DatastreamNewBlockTimeout { - log.Info(fmt.Sprintf("[%s] No new blocks in %d miliseconds. Ending the stage.", logPrefix, timePassedAfterlastBlock.Milliseconds()), "lastBlockHeight", lastBlockHeight) - endLoop = true - } + time.Sleep(10 * time.Millisecond) + } + + // if ds end reached check again for new blocks in the stream + // if there are too many new blocks get them as well before ending stage + if batchProcessor.LastBlockHeight() >= highestDSL2Block.L2BlockNumber { + newLatestDSL2Block, err := dsQueryClient.GetLatestL2Block() + if err != nil { + return fmt.Errorf("failed to retrieve the latest datastream l2 block: %w", err) + } + if newLatestDSL2Block.L2BlockNumber > highestDSL2Block.L2BlockNumber+NEW_BLOCKS_ON_DS_LIMIT { + highestDSL2Block = newLatestDSL2Block } else { - timePassedAfterlastBlock := time.Since(startTime) - if timePassedAfterlastBlock.Seconds() > 10 { - // if the connection ropped, continue with next stages while it tries to reconnect - // otherwise it will get stuck in "waiting for at least one block to be written" loop - // if !streamingAtomic.Load() { - // endLoop = true - // break - // } - - if !streamingAtomic.Load() { - log.Info(fmt.Sprintf("[%s] Datastream disconnected. Ending the stage.", logPrefix)) - break LOOP - } - - log.Info(fmt.Sprintf("[%s] Waiting for at least one new block.", logPrefix)) - startTime = time.Now() - } + endLoop = true } - time.Sleep(10 * time.Millisecond) } - if blocksWritten != prevAmountBlocksWritten && blocksWritten%STAGE_PROGRESS_SAVE == 0 { - if err = saveStageProgress(tx, logPrefix, highestHashableL2BlockNo, highestSeenBatchNo, lastBlockHeight, lastForkId); err != nil { + if endLoop { + log.Info(fmt.Sprintf("[%s] Total blocks written: %d", logPrefix, batchProcessor.TotalBlocksWritten())) + break + } + + // this can be after the loop break because we save progress at the end of stage anyways. no need to do it twice + // commit progress from time to time + if batchProcessor.TotalBlocksWritten() != prevAmountBlocksWritten && batchProcessor.TotalBlocksWritten()%STAGE_PROGRESS_SAVE == 0 { + if err = saveStageProgress(tx, logPrefix, batchProcessor.HighestHashableL2BlockNo(), batchProcessor.HighestSeenBatchNumber(), batchProcessor.LastBlockHeight(), batchProcessor.LastForkId()); err != nil { return err } - if err := hermezDb.WriteBlockL1InfoTreeIndexProgress(lastBlockHeight, highestL1InfoTreeIndex); err != nil { + if err := hermezDb.WriteBlockL1InfoTreeIndexProgress(batchProcessor.LastBlockHeight(), highestL1InfoTreeIndex); err != nil { return err } @@ -536,36 +283,33 @@ LOOP: return fmt.Errorf("failed to commit tx, %w", err) } - tx, err = cfg.db.BeginRw(ctx) - if err != nil { + if tx, err = cfg.db.BeginRw(ctx); err != nil { return fmt.Errorf("failed to open tx, %w", err) } - hermezDb = hermez_db.NewHermezDb(tx) - eriDb = erigon_db.NewErigonDb(tx) + hermezDb.SetNewTx(tx) + eriDb.SetNewTx(tx) + batchProcessor.SetNewTx(tx) } - prevAmountBlocksWritten = blocksWritten + prevAmountBlocksWritten = batchProcessor.TotalBlocksWritten() } - if endLoop { - log.Info(fmt.Sprintf("[%s] Total blocks read: %d", logPrefix, blocksWritten)) - break - } } - if lastBlockHeight == stageProgressBlockNo { + // no new progress, nothing to save + if batchProcessor.LastBlockHeight() == stageProgressBlockNo { return nil } - if err = saveStageProgress(tx, logPrefix, highestHashableL2BlockNo, highestSeenBatchNo, lastBlockHeight, lastForkId); err != nil { + if err = saveStageProgress(tx, logPrefix, batchProcessor.HighestHashableL2BlockNo(), batchProcessor.HighestSeenBatchNumber(), batchProcessor.LastBlockHeight(), batchProcessor.LastForkId()); err != nil { return err } - if err := hermezDb.WriteBlockL1InfoTreeIndexProgress(lastBlockHeight, highestL1InfoTreeIndex); err != nil { + if err := hermezDb.WriteBlockL1InfoTreeIndexProgress(batchProcessor.LastBlockHeight(), highestL1InfoTreeIndex); err != nil { return err } // stop printing blocks written progress routine elapsed := time.Since(startSyncTime) - log.Info(fmt.Sprintf("[%s] Finished writing blocks", logPrefix), "blocksWritten", blocksWritten, "elapsed", elapsed) + log.Info(fmt.Sprintf("[%s] Finished writing blocks", logPrefix), "blocksWritten", batchProcessor.TotalBlocksWritten(), "elapsed", elapsed) if freshTx { if err := tx.Commit(); err != nil { @@ -845,119 +589,6 @@ func PruneBatchesStage(s *stagedsync.PruneState, tx kv.RwTx, cfg BatchesCfg, ctx return nil } -// writeL2Block writes L2Block to ErigonDb and HermezDb -// writes header, body, forkId and blockBatch -func writeL2Block(eriDb ErigonDb, hermezDb HermezDb, l2Block *types.FullL2Block, highestL1InfoTreeIndex uint64) error { - bn := new(big.Int).SetUint64(l2Block.L2BlockNumber) - txs := make([]ethTypes.Transaction, 0, len(l2Block.L2Txs)) - for _, transaction := range l2Block.L2Txs { - ltx, _, err := txtype.DecodeTx(transaction.Encoded, transaction.EffectiveGasPricePercentage, l2Block.ForkId) - if err != nil { - return fmt.Errorf("decode tx error: %v", err) - } - txs = append(txs, ltx) - - if err := hermezDb.WriteEffectiveGasPricePercentage(ltx.Hash(), transaction.EffectiveGasPricePercentage); err != nil { - return fmt.Errorf("write effective gas price percentage error: %v", err) - } - - if err := hermezDb.WriteStateRoot(l2Block.L2BlockNumber, transaction.IntermediateStateRoot); err != nil { - return fmt.Errorf("write rpc root error: %v", err) - } - - if err := hermezDb.WriteIntermediateTxStateRoot(l2Block.L2BlockNumber, ltx.Hash(), transaction.IntermediateStateRoot); err != nil { - return fmt.Errorf("write rpc root error: %v", err) - } - } - txCollection := ethTypes.Transactions(txs) - txHash := ethTypes.DeriveSha(txCollection) - - gasLimit := utils.GetBlockGasLimitForFork(l2Block.ForkId) - - _, err := eriDb.WriteHeader(bn, l2Block.L2Blockhash, l2Block.StateRoot, txHash, l2Block.ParentHash, l2Block.Coinbase, uint64(l2Block.Timestamp), gasLimit) - if err != nil { - return fmt.Errorf("write header error: %v", err) - } - - didStoreGer := false - l1InfoTreeIndexReused := false - - if l2Block.GlobalExitRoot != emptyHash { - gerWritten, err := hermezDb.CheckGlobalExitRootWritten(l2Block.GlobalExitRoot) - if err != nil { - return fmt.Errorf("get global exit root error: %v", err) - } - - if !gerWritten { - if err := hermezDb.WriteBlockGlobalExitRoot(l2Block.L2BlockNumber, l2Block.GlobalExitRoot); err != nil { - return fmt.Errorf("write block global exit root error: %v", err) - } - - if err := hermezDb.WriteGlobalExitRoot(l2Block.GlobalExitRoot); err != nil { - return fmt.Errorf("write global exit root error: %v", err) - } - didStoreGer = true - } - } - - if l2Block.L1BlockHash != emptyHash { - if err := hermezDb.WriteBlockL1BlockHash(l2Block.L2BlockNumber, l2Block.L1BlockHash); err != nil { - return fmt.Errorf("write block global exit root error: %v", err) - } - } - - if l2Block.L1InfoTreeIndex != 0 { - if err := hermezDb.WriteBlockL1InfoTreeIndex(l2Block.L2BlockNumber, uint64(l2Block.L1InfoTreeIndex)); err != nil { - return err - } - - // if the info tree index of this block is lower than the highest we've seen - // we need to write the GER and l1 block hash regardless of the logic above. - // this can only happen in post etrog blocks, and we need the GER/L1 block hash - // for the stream and also for the block info root to be correct - if uint64(l2Block.L1InfoTreeIndex) <= highestL1InfoTreeIndex { - l1InfoTreeIndexReused = true - if err := hermezDb.WriteBlockGlobalExitRoot(l2Block.L2BlockNumber, l2Block.GlobalExitRoot); err != nil { - return fmt.Errorf("write block global exit root error: %w", err) - } - if err := hermezDb.WriteBlockL1BlockHash(l2Block.L2BlockNumber, l2Block.L1BlockHash); err != nil { - return fmt.Errorf("write block global exit root error: %w", err) - } - if err := hermezDb.WriteReusedL1InfoTreeIndex(l2Block.L2BlockNumber); err != nil { - return fmt.Errorf("write reused l1 info tree index error: %w", err) - } - } - } - - // if we haven't reused the l1 info tree index, and we have also written the GER - // then we need to write the latest used GER for this batch to the table - // we always want the last written GER in this table as it's at the batch level, so it can and should - // be overwritten - if !l1InfoTreeIndexReused && didStoreGer { - if err := hermezDb.WriteLatestUsedGer(l2Block.L2BlockNumber, l2Block.GlobalExitRoot); err != nil { - return fmt.Errorf("write latest used ger error: %w", err) - } - } - - if err := eriDb.WriteBody(bn, l2Block.L2Blockhash, txs); err != nil { - return fmt.Errorf("write body error: %v", err) - } - - if err := hermezDb.WriteForkId(l2Block.BatchNumber, l2Block.ForkId); err != nil { - return fmt.Errorf("write block batch error: %v", err) - } - - if err := hermezDb.WriteForkIdBlockOnce(l2Block.ForkId, l2Block.L2BlockNumber); err != nil { - return fmt.Errorf("write fork id block error: %v", err) - } - - if err := hermezDb.WriteBlockBatch(l2Block.L2BlockNumber, l2Block.BatchNumber); err != nil { - return fmt.Errorf("write block batch error: %v", err) - } - - return nil -} - // rollback performs the unwinding of blocks: // 1. queries the latest common ancestor for datastream and db, // 2. resolves the unwind block (as the latest block in the previous batch, comparing to the found ancestor block) diff --git a/zk/stages/stage_batches_datastream.go b/zk/stages/stage_batches_datastream.go new file mode 100644 index 00000000000..fefd7c17188 --- /dev/null +++ b/zk/stages/stage_batches_datastream.go @@ -0,0 +1,108 @@ +package stages + +import ( + "fmt" + "math/rand" + "sync/atomic" + "time" + + "github.com/ledgerwatch/log/v3" +) + +type DatastreamClientRunner struct { + dsClient DatastreamClient + logPrefix string + stopRunner atomic.Bool + isReading atomic.Bool +} + +func NewDatastreamClientRunner(dsClient DatastreamClient, logPrefix string) *DatastreamClientRunner { + return &DatastreamClientRunner{ + dsClient: dsClient, + logPrefix: logPrefix, + } +} + +func (r *DatastreamClientRunner) StartRead() error { + if r.isReading.Load() { + return fmt.Errorf("tried starting datastream client runner thread while another is running") + } + + go func() { + routineId := rand.Intn(1000000) + + log.Info(fmt.Sprintf("[%s] Started downloading L2Blocks routine ID: %d", r.logPrefix, routineId)) + defer log.Info(fmt.Sprintf("[%s] Ended downloading L2Blocks routine ID: %d", r.logPrefix, routineId)) + + r.isReading.Store(true) + defer r.isReading.Store(false) + + for { + if r.stopRunner.Load() { + log.Info(fmt.Sprintf("[%s] Downloading L2Blocks routine stopped intentionally", r.logPrefix)) + break + } + + // start routine to download blocks and push them in a channel + if !r.dsClient.GetStreamingAtomic().Load() { + log.Info(fmt.Sprintf("[%s] Starting stream", r.logPrefix)) + // this will download all blocks from datastream and push them in a channel + // if no error, break, else continue trying to get them + // Create bookmark + + if err := r.connectDatastream(); err != nil { + log.Error(fmt.Sprintf("[%s] Error connecting to datastream", r.logPrefix), "error", err) + } + + if err := r.dsClient.ReadAllEntriesToChannel(); err != nil { + log.Error(fmt.Sprintf("[%s] Error downloading blocks from datastream", r.logPrefix), "error", err) + } + } + } + }() + + return nil +} + +func (r *DatastreamClientRunner) StopRead() { + r.stopRunner.Store(true) +} + +func (r *DatastreamClientRunner) RestartReadFromBlock(fromBlock uint64) error { + r.StopRead() + + //wait for the old routine to be finished before continuing + counter := 0 + for { + if !r.isReading.Load() { + break + } + counter++ + if counter > 100 { + return fmt.Errorf("failed to stop reader routine correctly") + } + time.Sleep(100 * time.Millisecond) + } + + // set new block + r.dsClient.GetProgressAtomic().Store(fromBlock) + + log.Info(fmt.Sprintf("[%s] Restarting datastream from block %d", r.logPrefix, fromBlock)) + + return r.StartRead() +} + +func (r *DatastreamClientRunner) connectDatastream() (err error) { + var connected bool + for i := 0; i < 5; i++ { + if connected, err = r.dsClient.EnsureConnected(); err != nil { + log.Error(fmt.Sprintf("[%s] Error connecting to datastream", r.logPrefix), "error", err) + continue + } + if connected { + return nil + } + } + + return fmt.Errorf("failed to connect to datastream") +} diff --git a/zk/stages/stage_batches_processor.go b/zk/stages/stage_batches_processor.go new file mode 100644 index 00000000000..9f23270aa78 --- /dev/null +++ b/zk/stages/stage_batches_processor.go @@ -0,0 +1,485 @@ +package stages + +import ( + "context" + "errors" + "fmt" + "math/big" + "sync/atomic" + "time" + + "github.com/gateway-fm/cdk-erigon-lib/common" + "github.com/gateway-fm/cdk-erigon-lib/kv" + "github.com/ledgerwatch/erigon/core/rawdb" + ethTypes "github.com/ledgerwatch/erigon/core/types" + "github.com/ledgerwatch/erigon/eth/stagedsync/stages" + "github.com/ledgerwatch/erigon/zk/datastream/types" + txtype "github.com/ledgerwatch/erigon/zk/tx" + "github.com/ledgerwatch/erigon/zk/utils" + "github.com/ledgerwatch/log/v3" +) + +type ProcessorErigonDb interface { + WriteHeader(batchNo *big.Int, blockHash common.Hash, stateRoot, txHash, parentHash common.Hash, coinbase common.Address, ts, gasLimit uint64) (*ethTypes.Header, error) + WriteBody(batchNo *big.Int, headerHash common.Hash, txs []ethTypes.Transaction) error + ReadCanonicalHash(L2BlockNumber uint64) (common.Hash, error) +} + +type ProcessorHermezDb interface { + WriteForkId(batchNumber uint64, forkId uint64) error + WriteForkIdBlockOnce(forkId, blockNum uint64) error + WriteBlockBatch(l2BlockNumber uint64, batchNumber uint64) error + WriteEffectiveGasPricePercentage(txHash common.Hash, effectiveGasPricePercentage uint8) error + + WriteStateRoot(l2BlockNumber uint64, rpcRoot common.Hash) error + + CheckGlobalExitRootWritten(ger common.Hash) (bool, error) + WriteBlockGlobalExitRoot(l2BlockNo uint64, ger common.Hash) error + WriteGlobalExitRoot(ger common.Hash) error + + WriteReusedL1InfoTreeIndex(l2BlockNo uint64) error + WriteBlockL1BlockHash(l2BlockNo uint64, l1BlockHash common.Hash) error + WriteBatchGlobalExitRoot(batchNumber uint64, ger *types.GerUpdate) error + WriteIntermediateTxStateRoot(l2BlockNumber uint64, txHash common.Hash, rpcRoot common.Hash) error + WriteBlockL1InfoTreeIndex(blockNumber uint64, l1Index uint64) error + WriteLatestUsedGer(blockNo uint64, ger common.Hash) error + WriteInvalidBatch(batchNumber uint64) error + WriteBatchEnd(lastBlockHeight uint64) error + GetBatchNoByL2Block(l2BlockNumber uint64) (uint64, error) +} + +type DsQueryClient interface { + GetL2BlockByNumber(blockNum uint64) (*types.FullL2Block, int, error) + GetProgressAtomic() *atomic.Uint64 +} + +type BatchesProcessor struct { + ctx context.Context + logPrefix string + tx kv.RwTx + hermezDb ProcessorHermezDb + eriDb ProcessorErigonDb + syncBlockLimit, + debugBlockLimit, + debugStepAfter, + debugStep, + stageProgressBlockNo, + lastForkId, + highestHashableL2BlockNo, + lastBlockHeight, + highestSeenBatchNo, + blocksWritten, + highestVerifiedBatch uint64 + highestL1InfoTreeIndex uint32 + lastBlockRoot, + lastBlockHash common.Hash + dsQueryClient DsQueryClient + progressChan chan uint64 + unwindFn func(uint64) error +} + +func NewBatchesProcessor( + ctx context.Context, + logPrefix string, + tx kv.RwTx, + hermezDb ProcessorHermezDb, + eriDb ProcessorErigonDb, + syncBlockLimit, debugBlockLimit, debugStepAfter, debugStep, stageProgressBlockNo, stageProgressBatchNo uint64, + dsQueryClient DsQueryClient, + progressChan chan uint64, + unwindFn func(uint64) error, +) (*BatchesProcessor, error) { + highestVerifiedBatch, err := stages.GetStageProgress(tx, stages.L1VerificationsBatchNo) + if err != nil { + return nil, errors.New("could not retrieve l1 verifications batch no progress") + } + + lastForkId, err := stages.GetStageProgress(tx, stages.ForkId) + if err != nil { + return nil, fmt.Errorf("failed to get last fork id, %w", err) + } + + return &BatchesProcessor{ + ctx: ctx, + logPrefix: logPrefix, + tx: tx, + hermezDb: hermezDb, + eriDb: eriDb, + syncBlockLimit: syncBlockLimit, + debugBlockLimit: debugBlockLimit, + debugStep: debugStep, + debugStepAfter: debugStepAfter, + stageProgressBlockNo: stageProgressBlockNo, + lastBlockHeight: stageProgressBlockNo, + highestSeenBatchNo: stageProgressBatchNo, + highestVerifiedBatch: highestVerifiedBatch, + dsQueryClient: dsQueryClient, + progressChan: progressChan, + lastBlockHash: emptyHash, + lastBlockRoot: emptyHash, + lastForkId: lastForkId, + unwindFn: unwindFn, + }, nil +} + +func (p *BatchesProcessor) ProcessEntry(entry interface{}) (rollbackBlock uint64, endLoop bool, err error) { + switch entry := entry.(type) { + case *types.BatchStart: + return 0, false, p.processBatchStartEntry(entry) + case *types.BatchEnd: + return 0, false, p.processBatchEndEntry(entry) + case *types.FullL2Block: + return p.processFullBlock(entry) + case *types.GerUpdate: + return 0, false, p.processGerUpdate(entry) + default: + return 0, false, fmt.Errorf("unknown entry type: %T", entry) + } +} + +func (p *BatchesProcessor) processGerUpdate(gerUpdate *types.GerUpdate) error { + if gerUpdate.GlobalExitRoot == emptyHash { + log.Warn(fmt.Sprintf("[%s] Skipping GER update with empty root", p.logPrefix)) + return nil + } + + // NB: we won't get these post Etrog (fork id 7) + if err := p.hermezDb.WriteBatchGlobalExitRoot(gerUpdate.BatchNumber, gerUpdate); err != nil { + return fmt.Errorf("write batch global exit root error: %v", err) + } + + return nil +} + +func (p *BatchesProcessor) processBatchEndEntry(batchEnd *types.BatchEnd) (err error) { + if batchEnd.StateRoot != p.lastBlockRoot { + log.Debug(fmt.Sprintf("[%s] batch end state root mismatches last block's: %x, expected: %x", p.logPrefix, batchEnd.StateRoot, p.lastBlockRoot)) + } + // keep a record of the last block processed when we receive the batch end + if err = p.hermezDb.WriteBatchEnd(p.lastBlockHeight); err != nil { + return err + } + return nil +} + +func (p *BatchesProcessor) processBatchStartEntry(batchStart *types.BatchStart) (err error) { + // check if the batch is invalid so that we can replicate this over in the stream + // when we re-populate it + if batchStart.BatchType == types.BatchTypeInvalid { + if err = p.hermezDb.WriteInvalidBatch(batchStart.Number); err != nil { + return err + } + // we need to write the fork here as well because the batch will never get processed as it is invalid + // but, we need it re-populate our own stream + if err = p.hermezDb.WriteForkId(batchStart.Number, batchStart.ForkId); err != nil { + return err + } + } + + return nil +} + +func (p *BatchesProcessor) processFullBlock(blockEntry *types.FullL2Block) (restartStreamFromBlock uint64, endLoop bool, err error) { + log.Debug(fmt.Sprintf("[%s] Retrieved %d (%s) block from stream", p.logPrefix, blockEntry.L2BlockNumber, blockEntry.L2Blockhash.String())) + if p.syncBlockLimit > 0 && blockEntry.L2BlockNumber >= p.syncBlockLimit { + // stop the node going into a crazy loop + time.Sleep(2 * time.Second) + return 0, true, nil + } + + // handle batch boundary changes - we do this here instead of reading the batch start channel because + // channels can be read in random orders which then creates problems in detecting fork changes during + // execution + if blockEntry.BatchNumber > p.highestSeenBatchNo && p.lastForkId < blockEntry.ForkId { + if blockEntry.ForkId > HIGHEST_KNOWN_FORK { + message := fmt.Sprintf("unsupported fork id %v received from the data stream", blockEntry.ForkId) + panic(message) + } + if err = stages.SaveStageProgress(p.tx, stages.ForkId, blockEntry.ForkId); err != nil { + return 0, false, fmt.Errorf("save stage progress error: %v", err) + } + p.lastForkId = blockEntry.ForkId + if err = p.hermezDb.WriteForkId(blockEntry.BatchNumber, blockEntry.ForkId); err != nil { + return 0, false, fmt.Errorf("write fork id error: %v", err) + } + // NOTE (RPC): avoided use of 'writeForkIdBlockOnce' by reading instead batch by forkId, and then lowest block number in batch + } + + // ignore genesis or a repeat of the last block + if blockEntry.L2BlockNumber == 0 { + return 0, false, nil + } + // skip but warn on already processed blocks + if blockEntry.L2BlockNumber <= p.stageProgressBlockNo { + if blockEntry.L2BlockNumber < p.stageProgressBlockNo { + // only warn if the block is very old, we expect the very latest block to be requested + // when the stage is fired up for the first time + log.Warn(fmt.Sprintf("[%s] Skipping block %d, already processed", p.logPrefix, blockEntry.L2BlockNumber)) + } + + dbBatchNum, err := p.hermezDb.GetBatchNoByL2Block(blockEntry.L2BlockNumber) + if err != nil { + return 0, false, err + } + + if blockEntry.BatchNumber > dbBatchNum { + // if the batch number is higher than the one we know about, it means that we need to trigger an unwinding of blocks + log.Warn(fmt.Sprintf("[%s] Batch number mismatch detected. Triggering unwind...", p.logPrefix), + "block", blockEntry.L2BlockNumber, "ds batch", blockEntry.BatchNumber, "db batch", dbBatchNum) + if err := p.unwindFn(blockEntry.L2BlockNumber); err != nil { + return blockEntry.L2BlockNumber, false, err + } + } + return 0, false, nil + } + + var dbParentBlockHash common.Hash + if blockEntry.L2BlockNumber > 1 { + dbParentBlockHash, err = p.eriDb.ReadCanonicalHash(p.lastBlockHeight) + if err != nil { + return 0, false, fmt.Errorf("failed to retrieve parent block hash for datastream block %d: %w", + blockEntry.L2BlockNumber, err) + } + } + + dsParentBlockHash := p.lastBlockHash + dsBlockNumber := p.lastBlockHeight + if dsParentBlockHash == emptyHash { + parentBlockDS, _, err := p.dsQueryClient.GetL2BlockByNumber(blockEntry.L2BlockNumber - 1) + if err != nil { + return 0, false, err + } + + if parentBlockDS != nil { + dsParentBlockHash = parentBlockDS.L2Blockhash + if parentBlockDS.L2BlockNumber > 0 { + dsBlockNumber = parentBlockDS.L2BlockNumber + } + } + } + + if blockEntry.L2BlockNumber > 1 && dbParentBlockHash != dsParentBlockHash { + // unwind/rollback blocks until the latest common ancestor block + log.Warn(fmt.Sprintf("[%s] Parent block hashes mismatch on block %d. Triggering unwind...", p.logPrefix, blockEntry.L2BlockNumber), + "db parent block hash", dbParentBlockHash, + "ds parent block number", dsBlockNumber, + "ds parent block hash", dsParentBlockHash, + "ds parent block number", blockEntry.L2BlockNumber-1, + ) + //parent blockhash is wrong, so unwind to it, then restat stream from it to get the correct one + p.unwindFn(blockEntry.L2BlockNumber - 1) + return blockEntry.L2BlockNumber - 1, false, nil + } + + // skip if we already have this block + if blockEntry.L2BlockNumber < p.lastBlockHeight+1 { + log.Warn(fmt.Sprintf("[%s] Skipping block %d, already processed unwinding...", p.logPrefix, blockEntry.L2BlockNumber)) + p.unwindFn(blockEntry.L2BlockNumber) + } + + // check for sequential block numbers + if blockEntry.L2BlockNumber > p.lastBlockHeight+1 { + log.Warn(fmt.Sprintf("[%s] Stream skipped ahead, restarting datastream to block %d", p.logPrefix, blockEntry.L2BlockNumber)) + return p.lastBlockHeight + 1, false, nil + } + + // batch boundary - record the highest hashable block number (last block in last full batch) + if blockEntry.BatchNumber > p.highestSeenBatchNo { + p.highestHashableL2BlockNo = blockEntry.L2BlockNumber - 1 + } + p.highestSeenBatchNo = blockEntry.BatchNumber + + /////// DEBUG BISECTION /////// + // exit stage when debug bisection flags set and we're at the limit block + if p.debugBlockLimit > 0 && blockEntry.L2BlockNumber > p.debugBlockLimit { + log.Info(fmt.Sprintf("[%s] Debug limit reached, stopping stage\n", p.logPrefix)) + endLoop = true + } + + // if we're above StepAfter, and we're at a step, move the stages on + if p.debugStep > 0 && p.debugStepAfter > 0 && blockEntry.L2BlockNumber > p.debugStepAfter { + if blockEntry.L2BlockNumber%p.debugStep == 0 { + log.Info(fmt.Sprintf("[%s] Debug step reached, stopping stage\n", p.logPrefix)) + endLoop = true + } + } + /////// END DEBUG BISECTION /////// + + // store our finalized state if this batch matches the highest verified batch number on the L1 + if blockEntry.BatchNumber == p.highestVerifiedBatch { + rawdb.WriteForkchoiceFinalized(p.tx, blockEntry.L2Blockhash) + } + + if p.lastBlockHash != emptyHash { + blockEntry.ParentHash = p.lastBlockHash + } else { + // first block in the loop so read the parent hash + previousHash, err := p.eriDb.ReadCanonicalHash(blockEntry.L2BlockNumber - 1) + if err != nil { + return 0, false, fmt.Errorf("failed to get genesis header: %v", err) + } + blockEntry.ParentHash = previousHash + } + + if err := p.writeL2Block(blockEntry); err != nil { + return 0, false, fmt.Errorf("writeL2Block error: %v", err) + } + + p.dsQueryClient.GetProgressAtomic().Store(blockEntry.L2BlockNumber) + + // make sure to capture the l1 info tree index changes so we can store progress + if blockEntry.L1InfoTreeIndex > p.highestL1InfoTreeIndex { + p.highestL1InfoTreeIndex = blockEntry.L1InfoTreeIndex + } + + p.lastBlockHash = blockEntry.L2Blockhash + p.lastBlockRoot = blockEntry.StateRoot + + p.lastBlockHeight = blockEntry.L2BlockNumber + p.blocksWritten++ + p.progressChan <- p.blocksWritten + + if p.debugBlockLimit == 0 { + endLoop = false + } + return 0, endLoop, nil +} + +// writeL2Block writes L2Block to ErigonDb and HermezDb +// writes header, body, forkId and blockBatch +func (p *BatchesProcessor) writeL2Block(l2Block *types.FullL2Block) error { + bn := new(big.Int).SetUint64(l2Block.L2BlockNumber) + txs := make([]ethTypes.Transaction, 0, len(l2Block.L2Txs)) + for _, transaction := range l2Block.L2Txs { + ltx, _, err := txtype.DecodeTx(transaction.Encoded, transaction.EffectiveGasPricePercentage, l2Block.ForkId) + if err != nil { + return fmt.Errorf("decode tx error: %v", err) + } + txs = append(txs, ltx) + + if err := p.hermezDb.WriteEffectiveGasPricePercentage(ltx.Hash(), transaction.EffectiveGasPricePercentage); err != nil { + return fmt.Errorf("write effective gas price percentage error: %v", err) + } + + if err := p.hermezDb.WriteStateRoot(l2Block.L2BlockNumber, transaction.IntermediateStateRoot); err != nil { + return fmt.Errorf("write rpc root error: %v", err) + } + + if err := p.hermezDb.WriteIntermediateTxStateRoot(l2Block.L2BlockNumber, ltx.Hash(), transaction.IntermediateStateRoot); err != nil { + return fmt.Errorf("write rpc root error: %v", err) + } + } + txCollection := ethTypes.Transactions(txs) + txHash := ethTypes.DeriveSha(txCollection) + + gasLimit := utils.GetBlockGasLimitForFork(l2Block.ForkId) + + if _, err := p.eriDb.WriteHeader(bn, l2Block.L2Blockhash, l2Block.StateRoot, txHash, l2Block.ParentHash, l2Block.Coinbase, uint64(l2Block.Timestamp), gasLimit); err != nil { + return fmt.Errorf("write header error: %v", err) + } + + didStoreGer := false + l1InfoTreeIndexReused := false + + if l2Block.GlobalExitRoot != emptyHash { + gerWritten, err := p.hermezDb.CheckGlobalExitRootWritten(l2Block.GlobalExitRoot) + if err != nil { + return fmt.Errorf("get global exit root error: %v", err) + } + + if !gerWritten { + if err := p.hermezDb.WriteBlockGlobalExitRoot(l2Block.L2BlockNumber, l2Block.GlobalExitRoot); err != nil { + return fmt.Errorf("write block global exit root error: %v", err) + } + + if err := p.hermezDb.WriteGlobalExitRoot(l2Block.GlobalExitRoot); err != nil { + return fmt.Errorf("write global exit root error: %v", err) + } + didStoreGer = true + } + } + + if l2Block.L1BlockHash != emptyHash { + if err := p.hermezDb.WriteBlockL1BlockHash(l2Block.L2BlockNumber, l2Block.L1BlockHash); err != nil { + return fmt.Errorf("write block global exit root error: %v", err) + } + } + + if l2Block.L1InfoTreeIndex != 0 { + if err := p.hermezDb.WriteBlockL1InfoTreeIndex(l2Block.L2BlockNumber, uint64(l2Block.L1InfoTreeIndex)); err != nil { + return err + } + + // if the info tree index of this block is lower than the highest we've seen + // we need to write the GER and l1 block hash regardless of the logic above. + // this can only happen in post etrog blocks, and we need the GER/L1 block hash + // for the stream and also for the block info root to be correct + if l2Block.L1InfoTreeIndex <= p.highestL1InfoTreeIndex { + l1InfoTreeIndexReused = true + if err := p.hermezDb.WriteBlockGlobalExitRoot(l2Block.L2BlockNumber, l2Block.GlobalExitRoot); err != nil { + return fmt.Errorf("write block global exit root error: %w", err) + } + if err := p.hermezDb.WriteBlockL1BlockHash(l2Block.L2BlockNumber, l2Block.L1BlockHash); err != nil { + return fmt.Errorf("write block global exit root error: %w", err) + } + if err := p.hermezDb.WriteReusedL1InfoTreeIndex(l2Block.L2BlockNumber); err != nil { + return fmt.Errorf("write reused l1 info tree index error: %w", err) + } + } + } + + // if we haven't reused the l1 info tree index, and we have also written the GER + // then we need to write the latest used GER for this batch to the table + // we always want the last written GER in this table as it's at the batch level, so it can and should + // be overwritten + if !l1InfoTreeIndexReused && didStoreGer { + if err := p.hermezDb.WriteLatestUsedGer(l2Block.L2BlockNumber, l2Block.GlobalExitRoot); err != nil { + return fmt.Errorf("write latest used ger error: %w", err) + } + } + + if err := p.eriDb.WriteBody(bn, l2Block.L2Blockhash, txs); err != nil { + return fmt.Errorf("write body error: %v", err) + } + + if err := p.hermezDb.WriteForkId(l2Block.BatchNumber, l2Block.ForkId); err != nil { + return fmt.Errorf("write block batch error: %v", err) + } + + if err := p.hermezDb.WriteForkIdBlockOnce(l2Block.ForkId, l2Block.L2BlockNumber); err != nil { + return fmt.Errorf("write fork id block error: %v", err) + } + + if err := p.hermezDb.WriteBlockBatch(l2Block.L2BlockNumber, l2Block.BatchNumber); err != nil { + return fmt.Errorf("write block batch error: %v", err) + } + + return nil +} + +func (p *BatchesProcessor) AtLeastOneBlockWritten() bool { + return p.lastBlockHeight > 0 +} + +func (p *BatchesProcessor) LastBlockHeight() uint64 { + return p.lastBlockHeight +} + +func (p *BatchesProcessor) HighestSeenBatchNumber() uint64 { + return p.highestSeenBatchNo +} +func (p *BatchesProcessor) LastForkId() uint64 { + return p.lastForkId +} + +func (p *BatchesProcessor) TotalBlocksWritten() uint64 { + return p.blocksWritten +} + +func (p *BatchesProcessor) HighestHashableL2BlockNo() uint64 { + return p.highestHashableL2BlockNo +} + +func (p *BatchesProcessor) SetNewTx(tx kv.RwTx) { + p.tx = tx +} diff --git a/zk/stages/stage_batches_test.go b/zk/stages/stage_batches_test.go index 1bc39ca895f..e81fcc051c4 100644 --- a/zk/stages/stage_batches_test.go +++ b/zk/stages/stage_batches_test.go @@ -75,7 +75,7 @@ func TestUnwindBatches(t *testing.T) { ///////// // ACT // ///////// - err = SpawnStageBatches(s, u, ctx, tx, cfg, true) + err = SpawnStageBatches(s, u, ctx, tx, cfg) require.NoError(t, err) tx.Commit() diff --git a/zk/stages/stages.go b/zk/stages/stages.go index 4fc497f0f09..b22c7e7f1b4 100644 --- a/zk/stages/stages.go +++ b/zk/stages/stages.go @@ -292,7 +292,7 @@ func DefaultZkStages( if badBlockUnwind { return nil } - return SpawnStageBatches(s, u, ctx, tx, batchesCfg, test) + return SpawnStageBatches(s, u, ctx, tx, batchesCfg) }, Unwind: func(firstCycle bool, u *stages.UnwindState, s *stages.StageState, tx kv.RwTx) error { return UnwindBatchesStage(u, tx, batchesCfg, ctx) diff --git a/zk/stages/test_utils.go b/zk/stages/test_utils.go index df250ebf717..af9ec190587 100644 --- a/zk/stages/test_utils.go +++ b/zk/stages/test_utils.go @@ -46,8 +46,8 @@ func (c *TestDatastreamClient) ReadAllEntriesToChannel() error { return nil } -func (c *TestDatastreamClient) GetEntryChan() chan interface{} { - return c.entriesChan +func (c *TestDatastreamClient) GetEntryChan() *chan interface{} { + return &c.entriesChan } func (c *TestDatastreamClient) GetErrChan() chan error {