From 33a6d4c66b35c1c2533353a44cb0babd22b9b45d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20Ram=C3=ADrez?= <58293609+ToniRamirezM@users.noreply.github.com> Date: Mon, 28 Oct 2024 10:12:34 +0100 Subject: [PATCH] feat: remove DS from aggregator (#138) * feat: remove DS from aggregator * feat: unit tests * fix: rust * fix: seq-sender tests * fix: local_config script * fix: remove unused file * fix: default config * fix: test config * fix: nil l1inforoot * feat: improve coverage * fix: comments * feat: remove DS lib --- aggregator/aggregator.go | 654 +++--------------- aggregator/aggregator_test.go | 531 ++++++-------- aggregator/config.go | 20 +- aggregator/db/migrations/0004.sql | 23 + aggregator/interfaces.go | 38 +- aggregator/mocks/mock_StreamClient.go | 247 ------- aggregator/mocks/mock_rpc.go | 87 +++ aggregator/mocks/mock_state.go | 84 --- config/default.go | 10 +- crates/cdk-config/src/aggregator.rs | 30 +- crates/cdk/src/config_render.rs | 2 - go.mod | 1 - go.sum | 4 - sequencesender/rpc.go => rpc/batch.go | 78 ++- rpc/batch_test.go | 265 +++++++ .../rpcbatch => rpc/types}/rpcbatch.go | 53 +- scripts/local_config | 2 +- sequencesender.json | 1 - sequencesender/config.go | 8 - sequencesender/mocks/mock_etherman.go | 2 +- sequencesender/mocks/mock_ethtxmanager.go | 28 +- sequencesender/mocks/mock_rpc.go | 88 +++ sequencesender/rpc_test.go | 115 --- sequencesender/sequencesender.go | 21 +- sequencesender/sequencesender_test.go | 13 +- state/datastream.go | 12 - state/interfaces.go | 4 - state/pgstatestorage/batch.go | 66 -- test/Makefile | 7 +- .../kurtosis-cdk-node-config.toml.template | 3 +- test/config/test.config.toml | 9 +- 31 files changed, 933 insertions(+), 1573 deletions(-) create mode 100644 aggregator/db/migrations/0004.sql delete mode 100644 aggregator/mocks/mock_StreamClient.go create mode 100644 aggregator/mocks/mock_rpc.go rename sequencesender/rpc.go => rpc/batch.go (52%) create mode 100644 rpc/batch_test.go rename {sequencesender/seqsendertypes/rpcbatch => rpc/types}/rpcbatch.go (70%) delete mode 100644 sequencesender.json create mode 100644 sequencesender/mocks/mock_rpc.go delete mode 100644 sequencesender/rpc_test.go delete mode 100644 state/datastream.go delete mode 100644 state/pgstatestorage/batch.go diff --git a/aggregator/aggregator.go b/aggregator/aggregator.go index 7106b615..1998e842 100644 --- a/aggregator/aggregator.go +++ b/aggregator/aggregator.go @@ -14,7 +14,6 @@ import ( "time" "unicode" - "github.com/0xPolygon/cdk-rpc/rpc" cdkTypes "github.com/0xPolygon/cdk-rpc/types" "github.com/0xPolygon/cdk/aggregator/agglayer" ethmanTypes "github.com/0xPolygon/cdk/aggregator/ethmantypes" @@ -23,13 +22,11 @@ import ( "github.com/0xPolygon/cdk/config/types" "github.com/0xPolygon/cdk/l1infotree" "github.com/0xPolygon/cdk/log" + "github.com/0xPolygon/cdk/rpc" "github.com/0xPolygon/cdk/state" - "github.com/0xPolygon/cdk/state/datastream" "github.com/0xPolygon/zkevm-ethtx-manager/ethtxmanager" ethtxlog "github.com/0xPolygon/zkevm-ethtx-manager/log" ethtxtypes "github.com/0xPolygon/zkevm-ethtx-manager/types" - "github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer" - streamlog "github.com/0xPolygonHermez/zkevm-data-streamer/log" synclog "github.com/0xPolygonHermez/zkevm-synchronizer-l1/log" "github.com/0xPolygonHermez/zkevm-synchronizer-l1/state/entities" "github.com/0xPolygonHermez/zkevm-synchronizer-l1/synchronizer" @@ -39,20 +36,14 @@ import ( "google.golang.org/grpc" grpchealth "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/peer" - "google.golang.org/protobuf/proto" ) const ( - dataStreamType = 1 mockedStateRoot = "0x090bcaf734c4f06c93954a827b45a6e8c67b8e0fd1e0a35a1c5982d6961828f9" mockedLocalExitRoot = "0x17c04c3760510b48c6012742c540a81aba4bca2f78b9d14bfd2f123e2e53ea3e" maxDBBigIntValue = 9223372036854775807 ) -var ( - errBusy = errors.New("witness server is busy") -) - type finalProofMsg struct { proverName string proverID string @@ -70,24 +61,15 @@ type Aggregator struct { state StateInterface etherman Etherman ethTxManager EthTxManagerClient - streamClient StreamClient l1Syncr synchronizer.Synchronizer halted atomic.Bool - streamClientMutex *sync.Mutex - profitabilityChecker aggregatorTxProfitabilityChecker timeSendFinalProof time.Time timeCleanupLockedProofs types.Duration stateDBMutex *sync.Mutex timeSendFinalProofMutex *sync.RWMutex - // Data stream handling variables - currentBatchStreamData []byte - currentStreamBatch state.Batch - currentStreamBatchRaw state.BatchRawV2 - currentStreamL2Block state.L2BlockRaw - finalProof chan finalProofMsg verifyingProof bool @@ -99,6 +81,8 @@ type Aggregator struct { sequencerPrivateKey *ecdsa.PrivateKey aggLayerClient agglayer.AgglayerClientInterface + + rpcClient RPCInterface } // New creates a new aggregator. @@ -132,24 +116,6 @@ func New( logger.Fatalf("error creating ethtxmanager client: %v", err) } - var streamClient *datastreamer.StreamClient - - if !cfg.SyncModeOnlyEnabled { - // Data stream client logs - streamLogConfig := streamlog.Config{ - Environment: streamlog.LogEnvironment(cfg.Log.Environment), - Level: cfg.Log.Level, - Outputs: cfg.Log.Outputs, - } - - logger.Info("Creating data stream client....") - streamClient, err = datastreamer.NewClientWithLogsConfig(cfg.StreamClient.Server, dataStreamType, streamLogConfig) - if err != nil { - logger.Fatalf("failed to create stream client, error: %v", err) - } - logger.Info("Data stream client created.") - } - // Synchonizer logs syncLogConfig := synclog.Config{ Environment: synclog.LogEnvironment(cfg.Log.Environment), @@ -188,18 +154,16 @@ func New( state: stateInterface, etherman: etherman, ethTxManager: ethTxManager, - streamClient: streamClient, - streamClientMutex: &sync.Mutex{}, l1Syncr: l1Syncr, profitabilityChecker: profitabilityChecker, stateDBMutex: &sync.Mutex{}, timeSendFinalProofMutex: &sync.RWMutex{}, timeCleanupLockedProofs: cfg.CleanupLockedProofsInterval, finalProof: make(chan finalProofMsg), - currentBatchStreamData: []byte{}, aggLayerClient: aggLayerClient, sequencerPrivateKey: sequencerPrivateKey, witnessRetrievalChan: make(chan state.DBBatch), + rpcClient: rpc.NewBatchEndpoints(cfg.RPCURL), } if a.ctx == nil { @@ -208,7 +172,6 @@ func New( // Set function to handle the batches from the data stream if !cfg.SyncModeOnlyEnabled { - a.streamClient.SetProcessEntryFunc(a.handleReceivedDataStream) a.l1Syncr.SetCallbackOnReorgDone(a.handleReorg) a.l1Syncr.SetCallbackOnRollbackBatches(a.handleRollbackBatches) } @@ -216,51 +179,6 @@ func New( return a, nil } -func (a *Aggregator) resetCurrentBatchData() { - a.currentBatchStreamData = []byte{} - a.currentStreamBatchRaw = state.BatchRawV2{ - Blocks: make([]state.L2BlockRaw, 0), - } - a.currentStreamL2Block = state.L2BlockRaw{} -} - -func (a *Aggregator) retrieveWitness() { - var success bool - for { - dbBatch := <-a.witnessRetrievalChan - inner: - for !success { - var err error - // Get Witness - dbBatch.Witness, err = a.getWitness(dbBatch.Batch.BatchNumber, a.cfg.WitnessURL, a.cfg.UseFullWitness) - if err != nil { - if errors.Is(err, errBusy) { - a.logger.Debugf( - "Witness server is busy, retrying get witness for batch %d in %v", - dbBatch.Batch.BatchNumber, a.cfg.RetryTime.Duration, - ) - } else { - a.logger.Errorf("Failed to get witness for batch %d, err: %v", dbBatch.Batch.BatchNumber, err) - } - time.Sleep(a.cfg.RetryTime.Duration) - - continue inner - } - - err = a.state.AddBatch(a.ctx, &dbBatch, nil) - if err != nil { - a.logger.Errorf("Error adding batch: %v", err) - time.Sleep(a.cfg.RetryTime.Duration) - - continue inner - } - success = true - } - - success = false - } -} - func (a *Aggregator) handleReorg(reorgData synchronizer.ReorgExecutionResult) { a.logger.Warnf("Reorg detected, reorgData: %+v", reorgData) @@ -269,9 +187,20 @@ func (a *Aggregator) handleReorg(reorgData synchronizer.ReorgExecutionResult) { if err != nil { a.logger.Errorf("Error getting last virtual batch number: %v", err) } else { - err = a.state.DeleteBatchesNewerThanBatchNumber(a.ctx, lastVBatchNumber, nil) + // Delete wip proofs + err = a.state.DeleteUngeneratedProofs(a.ctx, nil) if err != nil { - a.logger.Errorf("Error deleting batches newer than batch number %d: %v", lastVBatchNumber, err) + a.logger.Errorf("Error deleting ungenerated proofs: %v", err) + } else { + a.logger.Info("Deleted ungenerated proofs") + } + + // Delete any proof for the batches that have been rolled back + err = a.state.DeleteGeneratedProofs(a.ctx, lastVBatchNumber+1, maxDBBigIntValue, nil) + if err != nil { + a.logger.Errorf("Error deleting generated proofs: %v", err) + } else { + a.logger.Infof("Deleted generated proofs for batches newer than %d", lastVBatchNumber) } } @@ -289,33 +218,12 @@ func (a *Aggregator) handleReorg(reorgData synchronizer.ReorgExecutionResult) { func (a *Aggregator) handleRollbackBatches(rollbackData synchronizer.RollbackBatchesData) { a.logger.Warnf("Rollback batches event, rollbackBatchesData: %+v", rollbackData) - a.streamClientMutex.Lock() - defer a.streamClientMutex.Unlock() - - dsClientWasRunning := a.streamClient.IsStarted() - var err error - if dsClientWasRunning { - // Disable the process entry function to avoid processing the data stream - a.streamClient.ResetProcessEntryFunc() - - // Stop Reading the data stream - err = a.streamClient.ExecCommandStop() - if err != nil { - a.logger.Errorf("failed to stop data stream: %v.", err) - } else { - a.logger.Info("Data stream client stopped") - } - } - // Get new last verified batch number from L1 - var lastVerifiedBatchNumber uint64 - if err == nil { - lastVerifiedBatchNumber, err = a.etherman.GetLatestVerifiedBatchNum() - if err != nil { - a.logger.Errorf("Error getting latest verified batch number: %v", err) - } + lastVerifiedBatchNumber, err := a.etherman.GetLatestVerifiedBatchNum() + if err != nil { + a.logger.Errorf("Error getting latest verified batch number: %v", err) } // Check lastVerifiedBatchNumber makes sense @@ -326,26 +234,6 @@ func (a *Aggregator) handleRollbackBatches(rollbackData synchronizer.RollbackBat ) } - // Delete invalidated batches - if err == nil { - err = a.state.DeleteBatchesNewerThanBatchNumber(a.ctx, rollbackData.LastBatchNumber, nil) - if err != nil { - a.logger.Errorf("Error deleting batches newer than batch number %d: %v", rollbackData.LastBatchNumber, err) - } else { - a.logger.Infof("Deleted batches newer than batch number %d", rollbackData.LastBatchNumber) - } - } - - // Older batches data can also be deleted - if err == nil { - err = a.state.DeleteBatchesOlderThanBatchNumber(a.ctx, rollbackData.LastBatchNumber, nil) - if err != nil { - a.logger.Errorf("Error deleting batches older than batch number %d: %v", rollbackData.LastBatchNumber, err) - } else { - a.logger.Infof("Deleted batches older than batch number %d", rollbackData.LastBatchNumber) - } - } - // Delete wip proofs if err == nil { err = a.state.DeleteUngeneratedProofs(a.ctx, nil) @@ -366,42 +254,6 @@ func (a *Aggregator) handleRollbackBatches(rollbackData synchronizer.RollbackBat } } - if err == nil { - // Reset current batch data previously read from the data stream - a.resetCurrentBatchData() - a.currentStreamBatch = state.Batch{} - a.logger.Info("Current batch data reset") - - var marshalledBookMark []byte - // Reset the data stream reading point - bookMark := &datastream.BookMark{ - Type: datastream.BookmarkType_BOOKMARK_TYPE_BATCH, - Value: rollbackData.LastBatchNumber + 1, - } - - marshalledBookMark, err = proto.Marshal(bookMark) - //nolint:gocritic - if err != nil { - a.logger.Error("failed to marshal bookmark: %v", err) - } else { - // Restart the stream client if needed - if dsClientWasRunning { - a.streamClient.SetProcessEntryFunc(a.handleReceivedDataStream) - err = a.streamClient.Start() - if err != nil { - a.logger.Errorf("failed to start stream client, error: %v", err) - } else { - // Resume data stream reading - err = a.streamClient.ExecCommandStartBookmark(marshalledBookMark) - if err != nil { - a.logger.Errorf("failed to connect to data stream: %v", err) - } - a.logger.Info("Data stream client resumed") - } - } - } - } - if err == nil { a.logger.Info("Handling rollback batches event finished successfully") } else { @@ -414,255 +266,6 @@ func (a *Aggregator) handleRollbackBatches(rollbackData synchronizer.RollbackBat } } -func (a *Aggregator) handleReceivedDataStream( - entry *datastreamer.FileEntry, client *datastreamer.StreamClient, server *datastreamer.StreamServer, -) error { - forcedBlockhashL1 := common.Hash{} - - if !a.halted.Load() { - if entry.Type != datastreamer.EntryType(datastreamer.EtBookmark) { - a.currentBatchStreamData = append(a.currentBatchStreamData, entry.Encode()...) - - switch entry.Type { - case datastreamer.EntryType(datastream.EntryType_ENTRY_TYPE_BATCH_START): - // Check currentStreamBatchRaw is empty as sanity check - if len(a.currentStreamBatchRaw.Blocks) > 0 { - a.logger.Errorf("currentStreamBatchRaw should be empty, "+ - "but it contains %v blocks", len(a.currentStreamBatchRaw.Blocks)) - a.resetCurrentBatchData() - } - batch := &datastream.BatchStart{} - err := proto.Unmarshal(entry.Data, batch) - if err != nil { - a.logger.Errorf("Error unmarshalling batch: %v", err) - - return err - } - - a.currentStreamBatch.BatchNumber = batch.Number - a.currentStreamBatch.ChainID = batch.ChainId - a.currentStreamBatch.ForkID = batch.ForkId - a.currentStreamBatch.Type = batch.Type - case datastreamer.EntryType(datastream.EntryType_ENTRY_TYPE_BATCH_END): - batch := &datastream.BatchEnd{} - err := proto.Unmarshal(entry.Data, batch) - if err != nil { - a.logger.Errorf("Error unmarshalling batch: %v", err) - - return err - } - - a.currentStreamBatch.LocalExitRoot = common.BytesToHash(batch.LocalExitRoot) - a.currentStreamBatch.StateRoot = common.BytesToHash(batch.StateRoot) - - // Add last block (if any) to the current batch - if a.currentStreamL2Block.BlockNumber != 0 { - a.currentStreamBatchRaw.Blocks = append(a.currentStreamBatchRaw.Blocks, a.currentStreamL2Block) - } - - // Save Current Batch - if a.currentStreamBatch.BatchNumber != 0 { - var batchl2Data []byte - - // Get batchl2Data from L1 - virtualBatch, err := a.l1Syncr.GetVirtualBatchByBatchNumber(a.ctx, a.currentStreamBatch.BatchNumber) - if err != nil && !errors.Is(err, entities.ErrNotFound) { - a.logger.Errorf("Error getting virtual batch: %v", err) - - return err - } - - for errors.Is(err, entities.ErrNotFound) { - a.logger.Debug("Waiting for virtual batch to be available") - time.Sleep(a.cfg.RetryTime.Duration) - virtualBatch, err = a.l1Syncr.GetVirtualBatchByBatchNumber(a.ctx, a.currentStreamBatch.BatchNumber) - - if err != nil && !errors.Is(err, entities.ErrNotFound) { - a.logger.Errorf("Error getting virtual batch: %v", err) - - return err - } - } - - // Encode batch - if a.currentStreamBatch.Type != datastream.BatchType_BATCH_TYPE_INVALID && - a.currentStreamBatch.Type != datastream.BatchType_BATCH_TYPE_INJECTED { - batchl2Data, err = state.EncodeBatchV2(&a.currentStreamBatchRaw) - if err != nil { - a.logger.Errorf("Error encoding batch: %v", err) - - return err - } - } - - // If the batch is marked as Invalid in the DS we enforce retrieve the data from L1 - if a.cfg.UseL1BatchData || - a.currentStreamBatch.Type == datastream.BatchType_BATCH_TYPE_INVALID || - a.currentStreamBatch.Type == datastream.BatchType_BATCH_TYPE_INJECTED { - a.currentStreamBatch.BatchL2Data = virtualBatch.BatchL2Data - } else { - a.currentStreamBatch.BatchL2Data = batchl2Data - } - - // Compare BatchL2Data from L1 and DataStream - if common.Bytes2Hex(batchl2Data) != common.Bytes2Hex(virtualBatch.BatchL2Data) && - a.currentStreamBatch.Type != datastream.BatchType_BATCH_TYPE_INJECTED { - a.logger.Warnf("BatchL2Data from L1 and data stream are different for batch %d", a.currentStreamBatch.BatchNumber) - - if a.currentStreamBatch.Type == datastream.BatchType_BATCH_TYPE_INVALID { - a.logger.Warnf("Batch is marked as invalid in data stream") - } else { - a.logger.Warnf("DataStream BatchL2Data:%v", common.Bytes2Hex(batchl2Data)) - } - a.logger.Warnf("L1 BatchL2Data:%v", common.Bytes2Hex(virtualBatch.BatchL2Data)) - } - - // Get L1InfoRoot - sequence, err := a.l1Syncr.GetSequenceByBatchNumber(a.ctx, a.currentStreamBatch.BatchNumber) - if err != nil { - a.logger.Errorf("Error getting sequence: %v", err) - - return err - } - - for sequence == nil { - a.logger.Debug("Waiting for sequence to be available") - time.Sleep(a.cfg.RetryTime.Duration) - sequence, err = a.l1Syncr.GetSequenceByBatchNumber(a.ctx, a.currentStreamBatch.BatchNumber) - if err != nil { - a.logger.Errorf("Error getting sequence: %v", err) - - return err - } - } - - a.currentStreamBatch.L1InfoRoot = sequence.L1InfoRoot - a.currentStreamBatch.Timestamp = sequence.Timestamp - - // Calculate Acc Input Hash - oldDBBatch, err := a.state.GetBatch(a.ctx, a.currentStreamBatch.BatchNumber-1, nil) - if err != nil { - a.logger.Errorf("Error getting batch %d: %v", a.currentStreamBatch.BatchNumber-1, err) - - return err - } - - // Injected Batch - if a.currentStreamBatch.BatchNumber == 1 { - l1Block, err := a.l1Syncr.GetL1BlockByNumber(a.ctx, virtualBatch.BlockNumber) - if err != nil { - a.logger.Errorf("Error getting L1 block: %v", err) - - return err - } - - forcedBlockhashL1 = l1Block.ParentHash - a.currentStreamBatch.L1InfoRoot = a.currentStreamBatch.GlobalExitRoot - } - - accInputHash := cdkcommon.CalculateAccInputHash( - a.logger, - oldDBBatch.Batch.AccInputHash, - a.currentStreamBatch.BatchL2Data, - a.currentStreamBatch.L1InfoRoot, - uint64(a.currentStreamBatch.Timestamp.Unix()), - a.currentStreamBatch.Coinbase, - forcedBlockhashL1, - ) - a.currentStreamBatch.AccInputHash = accInputHash - - dbBatch := state.DBBatch{ - Batch: a.currentStreamBatch, - Datastream: a.currentBatchStreamData, - Witness: nil, - } - - // Check if the batch is already in the DB to keep its witness - wDBBatch, err := a.state.GetBatch(a.ctx, a.currentStreamBatch.BatchNumber, nil) - if err != nil { - if !errors.Is(err, state.ErrNotFound) { - a.logger.Errorf("Error getting batch %d: %v", a.currentStreamBatch.BatchNumber, err) - - return err - } - } - - if wDBBatch != nil && wDBBatch.Witness != nil && len(wDBBatch.Witness) > 0 { - dbBatch.Witness = wDBBatch.Witness - } - - // Store batch in the DB - err = a.state.AddBatch(a.ctx, &dbBatch, nil) - if err != nil { - a.logger.Errorf("Error adding batch: %v", err) - - return err - } - - // Retrieve the witness - if len(dbBatch.Witness) == 0 { - a.witnessRetrievalChan <- dbBatch - } - } - - // Reset current batch data - a.resetCurrentBatchData() - - case datastreamer.EntryType(datastream.EntryType_ENTRY_TYPE_L2_BLOCK): - // Add previous block (if any) to the current batch - if a.currentStreamL2Block.BlockNumber != 0 { - a.currentStreamBatchRaw.Blocks = append(a.currentStreamBatchRaw.Blocks, a.currentStreamL2Block) - } - // "Open" the new block - l2Block := &datastream.L2Block{} - err := proto.Unmarshal(entry.Data, l2Block) - if err != nil { - a.logger.Errorf("Error unmarshalling L2Block: %v", err) - - return err - } - - header := state.ChangeL2BlockHeader{ - DeltaTimestamp: l2Block.DeltaTimestamp, - IndexL1InfoTree: l2Block.L1InfotreeIndex, - } - - a.currentStreamL2Block.ChangeL2BlockHeader = header - a.currentStreamL2Block.Transactions = make([]state.L2TxRaw, 0) - a.currentStreamL2Block.BlockNumber = l2Block.Number - a.currentStreamBatch.L1InfoTreeIndex = l2Block.L1InfotreeIndex - a.currentStreamBatch.Coinbase = common.BytesToAddress(l2Block.Coinbase) - a.currentStreamBatch.GlobalExitRoot = common.BytesToHash(l2Block.GlobalExitRoot) - - case datastreamer.EntryType(datastream.EntryType_ENTRY_TYPE_TRANSACTION): - l2Tx := &datastream.Transaction{} - err := proto.Unmarshal(entry.Data, l2Tx) - if err != nil { - a.logger.Errorf("Error unmarshalling L2Tx: %v", err) - - return err - } - // New Tx raw - tx, err := state.DecodeTx(common.Bytes2Hex(l2Tx.Encoded)) - if err != nil { - a.logger.Errorf("Error decoding tx: %v", err) - - return err - } - - l2TxRaw := state.L2TxRaw{ - EfficiencyPercentage: uint8(l2Tx.EffectiveGasPricePercentage), - TxAlreadyEncoded: false, - Tx: tx, - } - a.currentStreamL2Block.Transactions = append(a.currentStreamL2Block.Transactions, l2TxRaw) - } - } - } - - return nil -} - // Start starts the aggregator func (a *Aggregator) Start() error { // Initial L1 Sync blocking @@ -700,39 +303,13 @@ func (a *Aggregator) Start() error { return err } - // Cleanup data base - err = a.state.DeleteBatchesOlderThanBatchNumber(a.ctx, lastVerifiedBatchNumber, nil) - if err != nil { - return err - } - // Delete ungenerated recursive proofs err = a.state.DeleteUngeneratedProofs(a.ctx, nil) if err != nil { return fmt.Errorf("failed to initialize proofs cache %w", err) } - accInputHash, err := a.getVerifiedBatchAccInputHash(a.ctx, lastVerifiedBatchNumber) - if err != nil { - return err - } - a.logger.Infof("Last Verified Batch Number:%v", lastVerifiedBatchNumber) - a.logger.Infof("Starting AccInputHash:%v", accInputHash.String()) - - // Store Acc Input Hash of the latest verified batch - dummyDBBatch := state.DBBatch{ - Batch: state.Batch{ - BatchNumber: lastVerifiedBatchNumber, - AccInputHash: *accInputHash, - }, - Datastream: []byte{0}, - Witness: []byte{0}, - } - err = a.state.AddBatch(a.ctx, &dummyDBBatch, nil) - if err != nil { - return err - } a.resetVerifyProofTime() @@ -740,35 +317,6 @@ func (a *Aggregator) Start() error { go a.sendFinalProof() go a.ethTxManager.Start() - // Witness retrieval workers - for i := 0; i < a.cfg.MaxWitnessRetrievalWorkers; i++ { - go a.retrieveWitness() - } - - // Start stream client - a.streamClientMutex.Lock() - defer a.streamClientMutex.Unlock() - - err = a.streamClient.Start() - if err != nil { - a.logger.Fatalf("failed to start stream client, error: %v", err) - } - - bookMark := &datastream.BookMark{ - Type: datastream.BookmarkType_BOOKMARK_TYPE_BATCH, - Value: lastVerifiedBatchNumber + 1, - } - - marshalledBookMark, err := proto.Marshal(bookMark) - if err != nil { - a.logger.Fatalf("failed to marshal bookmark: %v", err) - } - - err = a.streamClient.ExecCommandStartBookmark(marshalledBookMark) - if err != nil { - a.logger.Fatalf("failed to connect to data stream: %v", err) - } - // A this point everything is ready, so start serving go func() { a.logger.Infof("Server listening on port %d", a.cfg.Port) @@ -891,18 +439,18 @@ func (a *Aggregator) sendFinalProof() { a.startProofVerification() - finalDBBatch, err := a.state.GetBatch(ctx, proof.BatchNumberFinal, nil) + // Get Batch from RPC + rpcFinalBatch, err := a.rpcClient.GetBatch(proof.BatchNumberFinal) if err != nil { - tmpLogger.Errorf("Failed to retrieve batch with number [%d]: %v", proof.BatchNumberFinal, err) + a.logger.Errorf("error getting batch %d from RPC: %v.", proof.BatchNumberFinal, err) a.endProofVerification() - continue } inputs := ethmanTypes.FinalProofInputs{ FinalProof: msg.finalProof, - NewLocalExitRoot: finalDBBatch.Batch.LocalExitRoot.Bytes(), - NewStateRoot: finalDBBatch.Batch.StateRoot.Bytes(), + NewLocalExitRoot: rpcFinalBatch.LocalExitRoot().Bytes(), + NewStateRoot: rpcFinalBatch.StateRoot().Bytes(), } switch a.cfg.SettlementBackend { @@ -1052,15 +600,16 @@ func (a *Aggregator) buildFinalProof( string(finalProof.Public.NewLocalExitRoot) == mockedLocalExitRoot { // This local exit root and state root come from the mock // prover, use the one captured by the executor instead - finalDBBatch, err := a.state.GetBatch(ctx, proof.BatchNumberFinal, nil) + rpcFinalBatch, err := a.rpcClient.GetBatch(proof.BatchNumberFinal) if err != nil { - return nil, fmt.Errorf("failed to retrieve batch with number [%d]", proof.BatchNumberFinal) + return nil, fmt.Errorf("error getting batch %d from RPC: %w", proof.BatchNumberFinal, err) } + tmpLogger.Warnf( "NewLocalExitRoot and NewStateRoot look like a mock values, using values from executor instead: LER: %v, SR: %v", - finalDBBatch.Batch.LocalExitRoot.TerminalString(), finalDBBatch.Batch.StateRoot.TerminalString()) - finalProof.Public.NewStateRoot = finalDBBatch.Batch.StateRoot.Bytes() - finalProof.Public.NewLocalExitRoot = finalDBBatch.Batch.LocalExitRoot.Bytes() + rpcFinalBatch.LocalExitRoot().TerminalString(), rpcFinalBatch.StateRoot().TerminalString()) + finalProof.Public.NewStateRoot = rpcFinalBatch.StateRoot().Bytes() + finalProof.Public.NewLocalExitRoot = rpcFinalBatch.LocalExitRoot().Bytes() } return finalProof, nil @@ -1459,15 +1008,6 @@ func (a *Aggregator) tryAggregateProofs(ctx context.Context, prover ProverInterf return true, nil } -func (a *Aggregator) getVerifiedBatchAccInputHash(ctx context.Context, batchNumber uint64) (*common.Hash, error) { - accInputHash, err := a.etherman.GetBatchAccInputHash(ctx, batchNumber) - if err != nil { - return nil, err - } - - return &accInputHash, nil -} - func (a *Aggregator) getAndLockBatchToProve( ctx context.Context, prover ProverInterface, ) (*state.Batch, []byte, *state.Proof, error) { @@ -1511,7 +1051,8 @@ func (a *Aggregator) getAndLockBatchToProve( // Not found, so it it not possible to verify the batch yet if sequence == nil || errors.Is(err, entities.ErrNotFound) { - tmpLogger.Infof("No sequence found for batch %d", batchNumberToVerify) + tmpLogger.Infof("Sequencing event for batch %d has not been synced yet, "+ + "so it is not possible to verify it yet. Waiting...", batchNumberToVerify) return nil, nil, nil, state.ErrNotFound } @@ -1521,23 +1062,73 @@ func (a *Aggregator) getAndLockBatchToProve( ToBatchNumber: sequence.ToBatchNumber, } - // Check if the batch is already in the DB - dbBatch, err := a.state.GetBatch(ctx, batchNumberToVerify, nil) - if err != nil { - if errors.Is(err, state.ErrNotFound) { - tmpLogger.Infof("Batch (%d) is not yet in DB", batchNumberToVerify) - } - + // Get Batch from L1 Syncer + virtualBatch, err := a.l1Syncr.GetVirtualBatchByBatchNumber(a.ctx, batchNumberToVerify) + if err != nil && !errors.Is(err, entities.ErrNotFound) { + a.logger.Errorf("Error getting virtual batch: %v", err) return nil, nil, nil, err + } else if errors.Is(err, entities.ErrNotFound) { + a.logger.Infof("Virtual batch %d has not been synced yet, "+ + "so it is not possible to verify it yet. Waiting...", batchNumberToVerify) + return nil, nil, nil, state.ErrNotFound } - // Check if the witness is already in the DB - if len(dbBatch.Witness) == 0 { - tmpLogger.Infof("Witness for batch %d is not yet in DB", batchNumberToVerify) + // Get Batch from RPC + rpcBatch, err := a.rpcClient.GetBatch(batchNumberToVerify) + if err != nil { + a.logger.Errorf("error getting batch %d from RPC: %v.", batchNumberToVerify, err) + return nil, nil, nil, err + } - return nil, nil, nil, state.ErrNotFound + // Compare BatchL2Data from virtual batch and rpcBatch (skipping injected batch (1)) + if batchNumberToVerify != 1 && (common.Bytes2Hex(virtualBatch.BatchL2Data) != common.Bytes2Hex(rpcBatch.L2Data())) { + a.logger.Warnf("BatchL2Data from virtual batch %d does not match the one from RPC", batchNumberToVerify) + a.logger.Warnf("VirtualBatch BatchL2Data:%v", common.Bytes2Hex(virtualBatch.BatchL2Data)) + a.logger.Warnf("RPC BatchL2Data:%v", common.Bytes2Hex(rpcBatch.L2Data())) + } + + l1InfoRoot := common.Hash{} + + if virtualBatch.L1InfoRoot == nil { + log.Debugf("L1InfoRoot is nil for batch %d", batchNumberToVerify) + virtualBatch.L1InfoRoot = &l1InfoRoot + } + + // Create state batch + stateBatch := &state.Batch{ + BatchNumber: rpcBatch.BatchNumber(), + Coinbase: rpcBatch.LastCoinbase(), + // Use L1 batch data + BatchL2Data: virtualBatch.BatchL2Data, + StateRoot: rpcBatch.StateRoot(), + LocalExitRoot: rpcBatch.LocalExitRoot(), + AccInputHash: rpcBatch.AccInputHash(), + L1InfoTreeIndex: rpcBatch.L1InfoTreeIndex(), + L1InfoRoot: *virtualBatch.L1InfoRoot, + Timestamp: time.Unix(int64(rpcBatch.LastL2BLockTimestamp()), 0), + GlobalExitRoot: rpcBatch.GlobalExitRoot(), + ChainID: a.cfg.ChainID, + ForkID: a.cfg.ForkId, + } + + // Request the witness from the server, if it is busy just keep looping until it is available + start := time.Now() + witness, err := a.rpcClient.GetWitness(batchNumberToVerify, a.cfg.UseFullWitness) + for err != nil { + if errors.Is(err, rpc.ErrBusy) { + a.logger.Debugf( + "Witness server is busy, retrying get witness for batch %d in %v", + batchNumberToVerify, a.cfg.RetryTime.Duration, + ) + } else { + a.logger.Errorf("Failed to get witness for batch %d, err: %v", batchNumberToVerify, err) + } + time.Sleep(a.cfg.RetryTime.Duration) } + end := time.Now() + a.logger.Debugf("Time to get witness for batch %d: %v", batchNumberToVerify, end.Sub(start)) + // Store the sequence in aggregator DB err = a.state.AddSequence(ctx, stateSequence, nil) if err != nil { tmpLogger.Infof("Error storing sequence for batch %d", batchNumberToVerify) @@ -1546,8 +1137,8 @@ func (a *Aggregator) getAndLockBatchToProve( } // All the data required to generate a proof is ready - tmpLogger.Infof("Found virtual batch %d pending to generate proof", dbBatch.Batch.BatchNumber) - tmpLogger = tmpLogger.WithFields("batch", dbBatch.Batch.BatchNumber) + tmpLogger.Infof("Found virtual batch %d pending to generate proof", virtualBatch.BatchNumber) + tmpLogger = tmpLogger.WithFields("batch", virtualBatch.BatchNumber) tmpLogger.Info("Checking profitability to aggregate batch") @@ -1567,8 +1158,8 @@ func (a *Aggregator) getAndLockBatchToProve( now := time.Now().Round(time.Microsecond) proof := &state.Proof{ - BatchNumber: dbBatch.Batch.BatchNumber, - BatchNumberFinal: dbBatch.Batch.BatchNumber, + BatchNumber: virtualBatch.BatchNumber, + BatchNumberFinal: virtualBatch.BatchNumber, Prover: &proverName, ProverID: &proverID, GeneratingSince: &now, @@ -1582,7 +1173,7 @@ func (a *Aggregator) getAndLockBatchToProve( return nil, nil, nil, err } - return &dbBatch.Batch, dbBatch.Witness, proof, nil + return stateBatch, witness, proof, nil } func (a *Aggregator) tryGenerateBatchProof(ctx context.Context, prover ProverInterface) (bool, error) { @@ -1820,16 +1411,11 @@ func (a *Aggregator) buildInputProver( forcedBlockhashL1 = l1Block.ParentHash l1InfoRoot = batchToVerify.GlobalExitRoot.Bytes() - } /*else { - forcedBlockhashL1, err = a.state.GetForcedBatchParentHash(ctx, *batchToVerify.ForcedBatchNum, nil) - if err != nil { - return nil, err - } - }*/ + } } // Get Old Acc Input Hash - oldDBBatch, err := a.state.GetBatch(ctx, batchToVerify.BatchNumber-1, nil) + rpcOldBatch, err := a.rpcClient.GetBatch(batchToVerify.BatchNumber - 1) if err != nil { return nil, err } @@ -1837,7 +1423,7 @@ func (a *Aggregator) buildInputProver( inputProver := &prover.StatelessInputProver{ PublicInputs: &prover.StatelessPublicInputs{ Witness: witness, - OldAccInputHash: oldDBBatch.Batch.AccInputHash.Bytes(), + OldAccInputHash: rpcOldBatch.AccInputHash().Bytes(), OldBatchNum: batchToVerify.BatchNumber - 1, ChainId: batchToVerify.ChainID, ForkId: batchToVerify.ForkID, @@ -1855,42 +1441,6 @@ func (a *Aggregator) buildInputProver( return inputProver, nil } -func (a *Aggregator) getWitness(batchNumber uint64, url string, fullWitness bool) ([]byte, error) { - var ( - witness string - response rpc.Response - err error - ) - - witnessType := "trimmed" - if fullWitness { - witnessType = "full" - } - - a.logger.Infof("Requesting witness for batch %d of type %s", batchNumber, witnessType) - - response, err = rpc.JSONRPCCall(url, "zkevm_getBatchWitness", batchNumber, witnessType) - if err != nil { - return nil, err - } - - // Check if the response is an error - if response.Error != nil { - if response.Error.Message == "busy" { - return nil, errBusy - } - - return nil, fmt.Errorf("error from witness for batch %d: %v", batchNumber, response.Error) - } - - err = json.Unmarshal(response.Result, &witness) - if err != nil { - return nil, err - } - - return common.FromHex(witness), nil -} - func printInputProver(logger *log.Logger, inputProver *prover.StatelessInputProver) { if !logger.IsEnabledLogLevel(zapcore.DebugLevel) { return diff --git a/aggregator/aggregator_test.go b/aggregator/aggregator_test.go index 657a34cf..f6e27b0f 100644 --- a/aggregator/aggregator_test.go +++ b/aggregator/aggregator_test.go @@ -20,16 +20,14 @@ import ( "github.com/0xPolygon/cdk/aggregator/prover" "github.com/0xPolygon/cdk/config/types" "github.com/0xPolygon/cdk/log" + rpctypes "github.com/0xPolygon/cdk/rpc/types" "github.com/0xPolygon/cdk/state" - "github.com/0xPolygon/cdk/state/datastream" - "github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer" "github.com/0xPolygonHermez/zkevm-synchronizer-l1/synchronizer" "github.com/ethereum/go-ethereum/common" ethTypes "github.com/ethereum/go-ethereum/core/types" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "google.golang.org/protobuf/proto" ) var ( @@ -57,6 +55,7 @@ type mox struct { proverMock *mocks.ProverInterfaceMock aggLayerClientMock *mocks.AgglayerClientInterfaceMock synchronizerMock *mocks.SynchronizerInterfaceMock + rpcMock *mocks.RPCInterfaceMock } func WaitUntil(t *testing.T, wg *sync.WaitGroup, timeout time.Duration) { @@ -75,28 +74,39 @@ func WaitUntil(t *testing.T, wg *sync.WaitGroup, timeout time.Duration) { } } -func Test_resetCurrentBatchData(t *testing.T) { - t.Parallel() +func Test_Start(t *testing.T) { + mockState := new(mocks.StateInterfaceMock) + mockL1Syncr := new(mocks.SynchronizerInterfaceMock) + mockEtherman := new(mocks.EthermanMock) + mockEthTxManager := new(mocks.EthTxManagerClientMock) - a := Aggregator{ - currentBatchStreamData: []byte("test"), - currentStreamBatchRaw: state.BatchRawV2{ - Blocks: []state.L2BlockRaw{ - { - BlockNumber: 1, - ChangeL2BlockHeader: state.ChangeL2BlockHeader{}, - Transactions: []state.L2TxRaw{}, - }, - }, - }, - currentStreamL2Block: state.L2BlockRaw{}, - } + mockL1Syncr.On("Sync", mock.Anything).Return(nil) + mockEtherman.On("GetLatestVerifiedBatchNum").Return(uint64(90), nil).Once() + mockState.On("DeleteUngeneratedProofs", mock.Anything, nil).Return(nil).Once() + mockState.On("CleanupLockedProofs", mock.Anything, "", nil).Return(int64(0), nil) - a.resetCurrentBatchData() + mockEthTxManager.On("Start").Return(nil) - assert.Equal(t, []byte{}, a.currentBatchStreamData) - assert.Equal(t, state.BatchRawV2{Blocks: make([]state.L2BlockRaw, 0)}, a.currentStreamBatchRaw) - assert.Equal(t, state.L2BlockRaw{}, a.currentStreamL2Block) + ctx := context.Background() + a := &Aggregator{ + state: mockState, + logger: log.GetDefaultLogger(), + halted: atomic.Bool{}, + l1Syncr: mockL1Syncr, + etherman: mockEtherman, + ethTxManager: mockEthTxManager, + ctx: ctx, + stateDBMutex: &sync.Mutex{}, + timeSendFinalProofMutex: &sync.RWMutex{}, + timeCleanupLockedProofs: types.Duration{Duration: 5 * time.Second}, + } + go func() { + err := a.Start() + require.NoError(t, err) + }() + time.Sleep(time.Second) + a.ctx.Done() + time.Sleep(time.Second) } func Test_handleReorg(t *testing.T) { @@ -115,7 +125,8 @@ func Test_handleReorg(t *testing.T) { } mockL1Syncr.On("GetLastestVirtualBatchNumber", mock.Anything).Return(uint64(100), nil).Once() - mockState.On("DeleteBatchesNewerThanBatchNumber", mock.Anything, uint64(100), mock.Anything).Return(nil).Once() + mockState.On("DeleteGeneratedProofs", mock.Anything, mock.Anything, mock.Anything, nil).Return(nil).Once() + mockState.On("DeleteUngeneratedProofs", mock.Anything, nil).Return(nil).Once() go a.handleReorg(reorgData) time.Sleep(3 * time.Second) @@ -128,7 +139,6 @@ func Test_handleReorg(t *testing.T) { func Test_handleRollbackBatches(t *testing.T) { t.Parallel() - mockStreamClient := new(mocks.StreamClientMock) mockEtherman := new(mocks.EthermanMock) mockState := new(mocks.StateInterfaceMock) @@ -137,241 +147,84 @@ func Test_handleRollbackBatches(t *testing.T) { LastBatchNumber: 100, } - mockStreamClient.On("IsStarted").Return(true).Once() - mockStreamClient.On("ResetProcessEntryFunc").Return().Once() - mockStreamClient.On("SetProcessEntryFunc", mock.Anything).Return().Once() - mockStreamClient.On("ExecCommandStop").Return(nil).Once() - mockStreamClient.On("Start").Return(nil).Once() - mockStreamClient.On("ExecCommandStartBookmark", mock.Anything).Return(nil).Once() mockEtherman.On("GetLatestVerifiedBatchNum").Return(uint64(90), nil).Once() - mockState.On("DeleteBatchesNewerThanBatchNumber", mock.Anything, rollbackData.LastBatchNumber, nil).Return(nil).Once() - mockState.On("DeleteBatchesOlderThanBatchNumber", mock.Anything, rollbackData.LastBatchNumber, nil).Return(nil).Once() - mockState.On("DeleteUngeneratedProofs", mock.Anything, nil).Return(nil).Once() - mockState.On("DeleteGeneratedProofs", mock.Anything, rollbackData.LastBatchNumber+1, mock.AnythingOfType("uint64"), nil).Return(nil).Once() + mockState.On("DeleteUngeneratedProofs", mock.Anything, mock.Anything).Return(nil).Once() + mockState.On("DeleteGeneratedProofs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() a := Aggregator{ - ctx: context.Background(), - streamClient: mockStreamClient, - etherman: mockEtherman, - state: mockState, - logger: log.GetDefaultLogger(), - halted: atomic.Bool{}, - streamClientMutex: &sync.Mutex{}, - currentBatchStreamData: []byte{}, - currentStreamBatchRaw: state.BatchRawV2{}, - currentStreamL2Block: state.L2BlockRaw{}, + ctx: context.Background(), + etherman: mockEtherman, + state: mockState, + logger: log.GetDefaultLogger(), + halted: atomic.Bool{}, } a.halted.Store(false) a.handleRollbackBatches(rollbackData) assert.False(t, a.halted.Load()) - mockStreamClient.AssertExpectations(t) mockEtherman.AssertExpectations(t) mockState.AssertExpectations(t) } -func Test_handleReceivedDataStream_BatchStart(t *testing.T) { +func Test_handleRollbackBatchesHalt(t *testing.T) { t.Parallel() + mockEtherman := new(mocks.EthermanMock) mockState := new(mocks.StateInterfaceMock) - mockL1Syncr := new(mocks.SynchronizerInterfaceMock) - agg := Aggregator{ - state: mockState, - l1Syncr: mockL1Syncr, - logger: log.GetDefaultLogger(), - halted: atomic.Bool{}, - currentStreamBatch: state.Batch{}, - } - - // Prepare a FileEntry for Batch Start - batchStartData, err := proto.Marshal(&datastream.BatchStart{ - Number: 1, - ChainId: 2, - ForkId: 3, - Type: datastream.BatchType_BATCH_TYPE_REGULAR, - }) - assert.NoError(t, err) - - batchStartEntry := &datastreamer.FileEntry{ - Type: datastreamer.EntryType(datastream.EntryType_ENTRY_TYPE_BATCH_START), - Data: batchStartData, - } - - // Test the handleReceivedDataStream for Batch Start - err = agg.handleReceivedDataStream(batchStartEntry, nil, nil) - assert.NoError(t, err) - - assert.Equal(t, agg.currentStreamBatch.BatchNumber, uint64(1)) - assert.Equal(t, agg.currentStreamBatch.ChainID, uint64(2)) - assert.Equal(t, agg.currentStreamBatch.ForkID, uint64(3)) - assert.Equal(t, agg.currentStreamBatch.Type, datastream.BatchType_BATCH_TYPE_REGULAR) -} -func Test_handleReceivedDataStream_BatchEnd(t *testing.T) { - t.Parallel() + mockEtherman.On("GetLatestVerifiedBatchNum").Return(uint64(110), nil).Once() + mockState.On("DeleteUngeneratedProofs", mock.Anything, mock.Anything).Return(nil).Once() + mockState.On("DeleteGeneratedProofs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() - mockState := new(mocks.StateInterfaceMock) - mockL1Syncr := new(mocks.SynchronizerInterfaceMock) - a := Aggregator{ - state: mockState, - l1Syncr: mockL1Syncr, - logger: log.GetDefaultLogger(), - halted: atomic.Bool{}, - currentStreamBatch: state.Batch{ - BatchNumber: uint64(2), - Type: datastream.BatchType_BATCH_TYPE_REGULAR, - Coinbase: common.Address{}, - }, - currentStreamL2Block: state.L2BlockRaw{ - BlockNumber: uint64(10), - }, - currentStreamBatchRaw: state.BatchRawV2{ - Blocks: []state.L2BlockRaw{ - { - BlockNumber: uint64(9), - ChangeL2BlockHeader: state.ChangeL2BlockHeader{}, - Transactions: []state.L2TxRaw{}, - }, - }, - }, - cfg: Config{ - UseL1BatchData: false, - }, + // Test data + rollbackData := synchronizer.RollbackBatchesData{ + LastBatchNumber: 100, } - batchEndData, err := proto.Marshal(&datastream.BatchEnd{ - Number: 1, - LocalExitRoot: []byte{1, 2, 3}, - StateRoot: []byte{4, 5, 6}, - Debug: nil, - }) - assert.NoError(t, err) - - batchEndEntry := &datastreamer.FileEntry{ - Type: datastreamer.EntryType(datastream.EntryType_ENTRY_TYPE_BATCH_END), - Data: batchEndData, + a := Aggregator{ + ctx: context.Background(), + etherman: mockEtherman, + state: mockState, + logger: log.GetDefaultLogger(), + halted: atomic.Bool{}, } - mockState.On("GetBatch", mock.Anything, a.currentStreamBatch.BatchNumber-1, nil). - Return(&state.DBBatch{ - Batch: state.Batch{ - AccInputHash: common.Hash{}, - }, - }, nil).Once() - mockState.On("GetBatch", mock.Anything, a.currentStreamBatch.BatchNumber, nil). - Return(&state.DBBatch{ - Witness: []byte("test_witness"), - }, nil).Once() - mockState.On("AddBatch", mock.Anything, mock.Anything, nil).Return(nil).Once() - mockL1Syncr.On("GetVirtualBatchByBatchNumber", mock.Anything, a.currentStreamBatch.BatchNumber). - Return(&synchronizer.VirtualBatch{BatchL2Data: []byte{1, 2, 3}}, nil).Once() - mockL1Syncr.On("GetSequenceByBatchNumber", mock.Anything, a.currentStreamBatch.BatchNumber). - Return(&synchronizer.SequencedBatches{ - L1InfoRoot: common.Hash{}, - Timestamp: time.Now(), - }, nil).Once() - - err = a.handleReceivedDataStream(batchEndEntry, nil, nil) - assert.NoError(t, err) - - assert.Equal(t, a.currentBatchStreamData, []byte{}) - assert.Equal(t, a.currentStreamBatchRaw, state.BatchRawV2{Blocks: make([]state.L2BlockRaw, 0)}) - assert.Equal(t, a.currentStreamL2Block, state.L2BlockRaw{}) + a.halted.Store(false) + go a.handleRollbackBatches(rollbackData) + time.Sleep(3 * time.Second) - mockState.AssertExpectations(t) - mockL1Syncr.AssertExpectations(t) + assert.True(t, a.halted.Load()) + mockEtherman.AssertExpectations(t) } -func Test_handleReceivedDataStream_L2Block(t *testing.T) { +func Test_handleRollbackBatchesError(t *testing.T) { t.Parallel() - a := Aggregator{ - currentStreamL2Block: state.L2BlockRaw{ - BlockNumber: uint64(9), - }, - currentStreamBatchRaw: state.BatchRawV2{ - Blocks: []state.L2BlockRaw{}, - }, - currentStreamBatch: state.Batch{}, - } - - // Mock data for L2Block - l2Block := &datastream.L2Block{ - Number: uint64(10), - DeltaTimestamp: uint32(5), - L1InfotreeIndex: uint32(1), - Coinbase: []byte{0x01}, - GlobalExitRoot: []byte{0x02}, - } + mockEtherman := new(mocks.EthermanMock) + mockState := new(mocks.StateInterfaceMock) - l2BlockData, err := proto.Marshal(l2Block) - assert.NoError(t, err) + mockEtherman.On("GetLatestVerifiedBatchNum").Return(uint64(110), fmt.Errorf("error")).Once() - l2BlockEntry := &datastreamer.FileEntry{ - Type: datastreamer.EntryType(datastream.EntryType_ENTRY_TYPE_L2_BLOCK), - Data: l2BlockData, + // Test data + rollbackData := synchronizer.RollbackBatchesData{ + LastBatchNumber: 100, } - err = a.handleReceivedDataStream(l2BlockEntry, nil, nil) - assert.NoError(t, err) - - assert.Equal(t, uint64(10), a.currentStreamL2Block.BlockNumber) - assert.Equal(t, uint32(5), a.currentStreamL2Block.ChangeL2BlockHeader.DeltaTimestamp) - assert.Equal(t, uint32(1), a.currentStreamL2Block.ChangeL2BlockHeader.IndexL1InfoTree) - assert.Equal(t, 0, len(a.currentStreamL2Block.Transactions)) - assert.Equal(t, uint32(1), a.currentStreamBatch.L1InfoTreeIndex) - assert.Equal(t, common.BytesToAddress([]byte{0x01}), a.currentStreamBatch.Coinbase) - assert.Equal(t, common.BytesToHash([]byte{0x02}), a.currentStreamBatch.GlobalExitRoot) -} - -func Test_handleReceivedDataStream_Transaction(t *testing.T) { - t.Parallel() - a := Aggregator{ - currentStreamL2Block: state.L2BlockRaw{ - Transactions: []state.L2TxRaw{}, - }, - logger: log.GetDefaultLogger(), + ctx: context.Background(), + etherman: mockEtherman, + state: mockState, + logger: log.GetDefaultLogger(), + halted: atomic.Bool{}, } - tx := ethTypes.NewTransaction( - 0, - common.HexToAddress("0x01"), - big.NewInt(1000000000000000000), - uint64(21000), - big.NewInt(20000000000), - nil, - ) - - // Encode transaction into RLP format - var buf bytes.Buffer - err := tx.EncodeRLP(&buf) - require.NoError(t, err, "Failed to encode transaction") - - transaction := &datastream.Transaction{ - L2BlockNumber: uint64(10), - Index: uint64(0), - IsValid: true, - Encoded: buf.Bytes(), - EffectiveGasPricePercentage: uint32(90), - } - - transactionData, err := proto.Marshal(transaction) - assert.NoError(t, err) - - transactionEntry := &datastreamer.FileEntry{ - Type: datastreamer.EntryType(datastream.EntryType_ENTRY_TYPE_TRANSACTION), - Data: transactionData, - } - - err = a.handleReceivedDataStream(transactionEntry, nil, nil) - assert.NoError(t, err) + a.halted.Store(false) + go a.handleRollbackBatches(rollbackData) + time.Sleep(3 * time.Second) - assert.Len(t, a.currentStreamL2Block.Transactions, 1) - assert.Equal(t, uint8(90), a.currentStreamL2Block.Transactions[0].EfficiencyPercentage) - assert.False(t, a.currentStreamL2Block.Transactions[0].TxAlreadyEncoded) - assert.NotNil(t, a.currentStreamL2Block.Transactions[0].Tx) + assert.True(t, a.halted.Load()) + mockEtherman.AssertExpectations(t) } func Test_sendFinalProofSuccess(t *testing.T) { @@ -403,18 +256,13 @@ func Test_sendFinalProofSuccess(t *testing.T) { } a.cfg = cfg - m.stateMock.On("GetBatch", mock.Anything, batchNumFinal, nil).Run(func(args mock.Arguments) { - }).Return(&state.DBBatch{ - Batch: state.Batch{ - LocalExitRoot: common.Hash{}, - StateRoot: common.Hash{}, - }, - }, nil).Once() + batch := rpctypes.NewRPCBatch(batchNumFinal, common.Hash{}, []string{}, []byte{}, common.Hash{}, common.Hash{}, common.Hash{}, common.Address{}, false) + m.rpcMock.On("GetBatch", batchNumFinal).Return(batch, nil) m.etherman.On("GetRollupId").Return(uint32(1)).Once() testHash := common.BytesToHash([]byte("test hash")) - m.aggLayerClientMock.On("SendTx", mock.Anything).Return(testHash, nil).Once() - m.aggLayerClientMock.On("WaitTxToBeMined", testHash, mock.Anything).Return(nil).Once() + m.aggLayerClientMock.On("SendTx", mock.Anything).Return(testHash, nil) + m.aggLayerClientMock.On("WaitTxToBeMined", testHash, mock.Anything).Return(nil) }, asserts: func(a *Aggregator) { assert.False(a.verifyingProof) @@ -433,13 +281,8 @@ func Test_sendFinalProofSuccess(t *testing.T) { } a.cfg = cfg - m.stateMock.On("GetBatch", mock.Anything, batchNumFinal, nil).Run(func(args mock.Arguments) { - }).Return(&state.DBBatch{ - Batch: state.Batch{ - LocalExitRoot: common.Hash{}, - StateRoot: common.Hash{}, - }, - }, nil).Once() + batch := rpctypes.NewRPCBatch(batchNumFinal, common.Hash{}, []string{}, []byte{}, common.Hash{}, common.Hash{}, common.Hash{}, common.Address{}, false) + m.rpcMock.On("GetBatch", batchNumFinal).Return(batch, nil) m.etherman.On("BuildTrustedVerifyBatchesTxData", batchNum-1, batchNumFinal, mock.Anything, common.HexToAddress(senderAddr)).Return(&toAddr, data, nil).Once() m.ethTxManager.On("Add", mock.Anything, &toAddr, big.NewInt(0), data, a.cfg.GasOffset, (*ethTypes.BlobTxSidecar)(nil)).Return(nil, nil).Once() @@ -458,6 +301,7 @@ func Test_sendFinalProofSuccess(t *testing.T) { ethTxManager := mocks.NewEthTxManagerClientMock(t) etherman := mocks.NewEthermanMock(t) aggLayerClient := mocks.NewAgglayerClientInterfaceMock(t) + rpcMock := mocks.NewRPCInterfaceMock(t) curve := elliptic.P256() privateKey, err := ecdsa.GenerateKey(curve, rand.Reader) @@ -474,6 +318,7 @@ func Test_sendFinalProofSuccess(t *testing.T) { stateDBMutex: &sync.Mutex{}, timeSendFinalProofMutex: &sync.RWMutex{}, sequencerPrivateKey: privateKey, + rpcClient: rpcMock, } a.ctx, a.exit = context.WithCancel(context.Background()) @@ -482,6 +327,7 @@ func Test_sendFinalProofSuccess(t *testing.T) { ethTxManager: ethTxManager, etherman: etherman, aggLayerClientMock: aggLayerClient, + rpcMock: rpcMock, } if tc.setup != nil { tc.setup(m, &a) @@ -532,7 +378,7 @@ func Test_sendFinalProofError(t *testing.T) { { name: "Failed to settle on Agglayer: GetBatch error", setup: func(m mox, a *Aggregator) { - m.stateMock.On("GetBatch", mock.Anything, batchNumFinal, nil).Run(func(args mock.Arguments) { + m.rpcMock.On("GetBatch", batchNumFinal).Run(func(args mock.Arguments) { // test is done, stop the sendFinalProof method fmt.Println("Stopping sendFinalProof") a.exit() @@ -550,13 +396,8 @@ func Test_sendFinalProofError(t *testing.T) { } a.cfg = cfg - m.stateMock.On("GetBatch", mock.Anything, batchNumFinal, nil).Run(func(args mock.Arguments) { - }).Return(&state.DBBatch{ - Batch: state.Batch{ - LocalExitRoot: common.Hash{}, - StateRoot: common.Hash{}, - }, - }, nil).Once() + batch := rpctypes.NewRPCBatch(batchNumFinal, common.Hash{}, []string{}, []byte{}, common.Hash{}, common.Hash{}, common.Hash{}, common.Address{}, false) + m.rpcMock.On("GetBatch", batchNumFinal).Return(batch, nil) m.etherman.On("GetRollupId").Return(uint32(1)).Once() m.aggLayerClientMock.On("SendTx", mock.Anything).Run(func(args mock.Arguments) { @@ -579,13 +420,8 @@ func Test_sendFinalProofError(t *testing.T) { } a.cfg = cfg - m.stateMock.On("GetBatch", mock.Anything, batchNumFinal, nil).Run(func(args mock.Arguments) { - }).Return(&state.DBBatch{ - Batch: state.Batch{ - LocalExitRoot: common.Hash{}, - StateRoot: common.Hash{}, - }, - }, nil).Once() + batch := rpctypes.NewRPCBatch(batchNumFinal, common.Hash{}, []string{}, []byte{}, common.Hash{}, common.Hash{}, common.Hash{}, common.Address{}, false) + m.rpcMock.On("GetBatch", batchNumFinal).Return(batch, nil) m.etherman.On("GetRollupId").Return(uint32(1)).Once() m.aggLayerClientMock.On("SendTx", mock.Anything).Return(common.Hash{}, nil).Once() @@ -608,13 +444,8 @@ func Test_sendFinalProofError(t *testing.T) { } a.cfg = cfg - m.stateMock.On("GetBatch", mock.Anything, batchNumFinal, nil).Run(func(args mock.Arguments) { - }).Return(&state.DBBatch{ - Batch: state.Batch{ - LocalExitRoot: common.Hash{}, - StateRoot: common.Hash{}, - }, - }, nil).Once() + batch := rpctypes.NewRPCBatch(batchNumFinal, common.Hash{}, []string{}, []byte{}, common.Hash{}, common.Hash{}, common.Hash{}, common.Address{}, false) + m.rpcMock.On("GetBatch", batchNumFinal).Return(batch, nil) m.etherman.On("BuildTrustedVerifyBatchesTxData", batchNum-1, batchNumFinal, mock.Anything, sender).Run(func(args mock.Arguments) { fmt.Println("Stopping sendFinalProof") @@ -636,13 +467,8 @@ func Test_sendFinalProofError(t *testing.T) { } a.cfg = cfg - m.stateMock.On("GetBatch", mock.Anything, batchNumFinal, nil).Run(func(args mock.Arguments) { - }).Return(&state.DBBatch{ - Batch: state.Batch{ - LocalExitRoot: common.Hash{}, - StateRoot: common.Hash{}, - }, - }, nil).Once() + batch := rpctypes.NewRPCBatch(batchNumFinal, common.Hash{}, []string{}, []byte{}, common.Hash{}, common.Hash{}, common.Hash{}, common.Address{}, false) + m.rpcMock.On("GetBatch", batchNumFinal).Return(batch, nil) m.etherman.On("BuildTrustedVerifyBatchesTxData", batchNum-1, batchNumFinal, mock.Anything, sender).Return(nil, nil, nil).Once() m.ethTxManager.On("Add", mock.Anything, mock.Anything, big.NewInt(0), mock.Anything, a.cfg.GasOffset, (*ethTypes.BlobTxSidecar)(nil)).Run(func(args mock.Arguments) { @@ -664,6 +490,7 @@ func Test_sendFinalProofError(t *testing.T) { ethTxManager := mocks.NewEthTxManagerClientMock(t) etherman := mocks.NewEthermanMock(t) aggLayerClient := mocks.NewAgglayerClientInterfaceMock(t) + rpcMock := mocks.NewRPCInterfaceMock(t) curve := elliptic.P256() privateKey, err := ecdsa.GenerateKey(curve, rand.Reader) @@ -680,6 +507,7 @@ func Test_sendFinalProofError(t *testing.T) { stateDBMutex: &sync.Mutex{}, timeSendFinalProofMutex: &sync.RWMutex{}, sequencerPrivateKey: privateKey, + rpcClient: rpcMock, } a.ctx, a.exit = context.WithCancel(context.Background()) @@ -688,6 +516,7 @@ func Test_sendFinalProofError(t *testing.T) { ethTxManager: ethTxManager, etherman: etherman, aggLayerClientMock: aggLayerClient, + rpcMock: rpcMock, } if tc.setup != nil { tc.setup(m, &a) @@ -760,19 +589,13 @@ func Test_buildFinalProof(t *testing.T) { }, } - finalDBBatch := &state.DBBatch{ - Batch: state.Batch{ - StateRoot: common.BytesToHash([]byte("mock StateRoot")), - LocalExitRoot: common.BytesToHash([]byte("mock LocalExitRoot")), - }, - } - m.proverMock.On("Name").Return("name").Once() m.proverMock.On("ID").Return("id").Once() m.proverMock.On("Addr").Return("addr").Once() m.proverMock.On("FinalProof", recursiveProof.Proof, a.cfg.SenderAddress).Return(&finalProofID, nil).Once() m.proverMock.On("WaitFinalProof", mock.Anything, finalProofID).Return(&finalProof, nil).Once() - m.stateMock.On("GetBatch", mock.Anything, batchNumFinal, nil).Return(finalDBBatch, nil).Once() + finalBatch := rpctypes.NewRPCBatch(batchNumFinal, common.Hash{}, []string{}, []byte{}, common.Hash{}, common.BytesToHash([]byte("mock LocalExitRoot")), common.BytesToHash([]byte("mock StateRoot")), common.Address{}, false) + m.rpcMock.On("GetBatch", batchNumFinal).Return(finalBatch, nil).Once() }, asserts: func(err error, fProof *prover.FinalProof) { assert.NoError(err) @@ -789,9 +612,11 @@ func Test_buildFinalProof(t *testing.T) { t.Run(tc.name, func(t *testing.T) { proverMock := mocks.NewProverInterfaceMock(t) stateMock := mocks.NewStateInterfaceMock(t) + rpcMock := mocks.NewRPCInterfaceMock(t) m := mox{ proverMock: proverMock, stateMock: stateMock, + rpcMock: rpcMock, } a := Aggregator{ state: stateMock, @@ -799,6 +624,7 @@ func Test_buildFinalProof(t *testing.T) { cfg: Config{ SenderAddress: common.BytesToAddress([]byte("from")).Hex(), }, + rpcClient: rpcMock, } tc.setup(m, &a) @@ -1594,13 +1420,19 @@ func Test_tryGenerateBatchProof(t *testing.T) { TxProfitabilityCheckerType: ProfitabilityAcceptAll, SenderAddress: from.Hex(), IntervalAfterWhichBatchConsolidateAnyway: types.Duration{Duration: time.Second * 1}, + ChainID: uint64(1), + ForkId: uint64(12), } lastVerifiedBatchNum := uint64(22) + batchNum := uint64(23) + batchToProve := state.Batch{ BatchNumber: batchNum, } + proofID := "proofId" + proverName := "proverName" proverID := "proverID" recursiveProof := "recursiveProof" @@ -1609,6 +1441,7 @@ func Test_tryGenerateBatchProof(t *testing.T) { matchProverCtxFn := func(ctx context.Context) bool { return ctx.Value("owner") == ownerProver } matchAggregatorCtxFn := func(ctx context.Context) bool { return ctx.Value("owner") == ownerAggregator } fixedTimestamp := time.Date(2023, 10, 13, 15, 0, 0, 0, time.UTC) + l1InfoTreeLeaf := []synchronizer.L1InfoTreeLeaf{ { GlobalExitRoot: common.Hash{}, @@ -1682,11 +1515,13 @@ func Test_tryGenerateBatchProof(t *testing.T) { ChainID: uint64(1), ForkID: uint64(12), } - dbBatch := state.DBBatch{ - Witness: []byte("witness"), - Batch: batch, + virtualBatch := synchronizer.VirtualBatch{ + BatchNumber: lastVerifiedBatchNum + 1, + BatchL2Data: batchL2Data, + L1InfoRoot: &l1InfoRoot, } + m.synchronizerMock.On("GetVirtualBatchByBatchNumber", mock.Anything, lastVerifiedBatchNum+1).Return(&virtualBatch, nil).Once() m.etherman.On("GetLatestVerifiedBatchNum").Return(lastVerifiedBatchNum, nil).Once() m.stateMock.On("CheckProofExistsForBatch", mock.MatchedBy(matchProverCtxFn), mock.AnythingOfType("uint64"), nil).Return(false, nil).Once() sequence := synchronizer.SequencedBatches{ @@ -1694,7 +1529,12 @@ func Test_tryGenerateBatchProof(t *testing.T) { ToBatchNumber: uint64(20), } m.synchronizerMock.On("GetSequenceByBatchNumber", mock.MatchedBy(matchProverCtxFn), lastVerifiedBatchNum+1).Return(&sequence, nil).Once() - m.stateMock.On("GetBatch", mock.MatchedBy(matchProverCtxFn), lastVerifiedBatchNum+1, nil).Return(&dbBatch, nil).Once() + + rpcBatch := rpctypes.NewRPCBatch(lastVerifiedBatchNum+1, common.Hash{}, []string{}, batchL2Data, common.Hash{}, common.BytesToHash([]byte("mock LocalExitRoot")), common.BytesToHash([]byte("mock StateRoot")), common.Address{}, false) + rpcBatch.SetLastL2BLockTimestamp(uint64(time.Now().Unix())) + m.rpcMock.On("GetBatch", lastVerifiedBatchNum+1).Return(rpcBatch, nil) + m.rpcMock.On("GetWitness", lastVerifiedBatchNum+1, false).Return([]byte("witness"), nil) + m.stateMock.On("AddSequence", mock.MatchedBy(matchProverCtxFn), mock.Anything, nil).Return(nil).Once() m.stateMock.On("AddGeneratedProof", mock.MatchedBy(matchProverCtxFn), mock.Anything, nil).Run( func(args mock.Arguments) { @@ -1716,13 +1556,8 @@ func Test_tryGenerateBatchProof(t *testing.T) { }, }, nil).Twice() - oldDBBatch := state.DBBatch{ - Batch: state.Batch{ - AccInputHash: common.Hash{}, - }, - } - m.stateMock.On("GetBatch", mock.Anything, lastVerifiedBatchNum, nil).Return(&oldDBBatch, nil).Twice() - expectedInputProver, err := a.buildInputProver(context.Background(), &batch, dbBatch.Witness) + m.rpcMock.On("GetBatch", lastVerifiedBatchNum).Return(rpcBatch, nil) + expectedInputProver, err := a.buildInputProver(context.Background(), &batch, []byte("witness")) require.NoError(err) m.proverMock.On("BatchProof", expectedInputProver).Return(nil, errTest).Once() @@ -1733,7 +1568,6 @@ func Test_tryGenerateBatchProof(t *testing.T) { assert.ErrorIs(err, errTest) }, }, - //nolint:dupl { name: "WaitRecursiveProof prover error", setup: func(m mox, a *Aggregator) { @@ -1753,11 +1587,15 @@ func Test_tryGenerateBatchProof(t *testing.T) { ChainID: uint64(1), ForkID: uint64(12), } - dbBatch := state.DBBatch{ - Witness: []byte("witness"), - Batch: batch, + + virtualBatch := synchronizer.VirtualBatch{ + BatchNumber: lastVerifiedBatchNum + 1, + BatchL2Data: batchL2Data, + L1InfoRoot: &l1InfoRoot, } + m.synchronizerMock.On("GetVirtualBatchByBatchNumber", mock.Anything, lastVerifiedBatchNum+1).Return(&virtualBatch, nil).Once() + m.etherman.On("GetLatestVerifiedBatchNum").Return(lastVerifiedBatchNum, nil).Once() m.stateMock.On("CheckProofExistsForBatch", mock.MatchedBy(matchProverCtxFn), mock.AnythingOfType("uint64"), nil).Return(false, nil).Once() sequence := synchronizer.SequencedBatches{ @@ -1765,7 +1603,10 @@ func Test_tryGenerateBatchProof(t *testing.T) { ToBatchNumber: uint64(20), } m.synchronizerMock.On("GetSequenceByBatchNumber", mock.MatchedBy(matchProverCtxFn), lastVerifiedBatchNum+1).Return(&sequence, nil).Once() - m.stateMock.On("GetBatch", mock.MatchedBy(matchProverCtxFn), lastVerifiedBatchNum+1, nil).Return(&dbBatch, nil).Once() + rpcBatch := rpctypes.NewRPCBatch(lastVerifiedBatchNum+1, common.Hash{}, []string{}, batchL2Data, common.Hash{}, common.BytesToHash([]byte("mock LocalExitRoot")), common.BytesToHash([]byte("mock StateRoot")), common.Address{}, false) + rpcBatch.SetLastL2BLockTimestamp(uint64(time.Now().Unix())) + m.rpcMock.On("GetBatch", lastVerifiedBatchNum+1).Return(rpcBatch, nil) + m.rpcMock.On("GetWitness", lastVerifiedBatchNum+1, false).Return([]byte("witness"), nil) m.stateMock.On("AddSequence", mock.MatchedBy(matchProverCtxFn), mock.Anything, nil).Return(nil).Once() m.stateMock.On("AddGeneratedProof", mock.MatchedBy(matchProverCtxFn), mock.Anything, nil).Run( func(args mock.Arguments) { @@ -1788,13 +1629,8 @@ func Test_tryGenerateBatchProof(t *testing.T) { }, }, nil).Twice() - oldDBBatch := state.DBBatch{ - Batch: state.Batch{ - AccInputHash: common.Hash{}, - }, - } - m.stateMock.On("GetBatch", mock.Anything, lastVerifiedBatchNum, nil).Return(&oldDBBatch, nil).Twice() - expectedInputProver, err := a.buildInputProver(context.Background(), &batch, dbBatch.Witness) + m.rpcMock.On("GetBatch", lastVerifiedBatchNum).Return(rpcBatch, nil).Twice() + expectedInputProver, err := a.buildInputProver(context.Background(), &batch, []byte("witness")) require.NoError(err) m.proverMock.On("BatchProof", expectedInputProver).Return(&proofID, nil).Once() @@ -1805,8 +1641,7 @@ func Test_tryGenerateBatchProof(t *testing.T) { assert.False(result) assert.ErrorIs(err, errTest) }, - }, //nolint:dupl - //nolint:dupl + }, { name: "DeleteBatchProofs error after WaitRecursiveProof prover error", setup: func(m mox, a *Aggregator) { @@ -1826,10 +1661,6 @@ func Test_tryGenerateBatchProof(t *testing.T) { ChainID: uint64(1), ForkID: uint64(12), } - dbBatch := state.DBBatch{ - Witness: []byte("witness"), - Batch: batch, - } m.etherman.On("GetLatestVerifiedBatchNum").Return(lastVerifiedBatchNum, nil).Once() m.stateMock.On("CheckProofExistsForBatch", mock.MatchedBy(matchProverCtxFn), mock.AnythingOfType("uint64"), nil).Return(false, nil).Once() @@ -1838,7 +1669,9 @@ func Test_tryGenerateBatchProof(t *testing.T) { ToBatchNumber: uint64(20), } m.synchronizerMock.On("GetSequenceByBatchNumber", mock.MatchedBy(matchProverCtxFn), lastVerifiedBatchNum+1).Return(&sequence, nil).Once() - m.stateMock.On("GetBatch", mock.MatchedBy(matchProverCtxFn), lastVerifiedBatchNum+1, nil).Return(&dbBatch, nil).Once() + rpcBatch := rpctypes.NewRPCBatch(lastVerifiedBatchNum+1, common.Hash{}, []string{}, batchL2Data, common.Hash{}, common.BytesToHash([]byte("mock LocalExitRoot")), common.BytesToHash([]byte("mock StateRoot")), common.Address{}, false) + rpcBatch.SetLastL2BLockTimestamp(uint64(time.Now().Unix())) + m.rpcMock.On("GetBatch", lastVerifiedBatchNum+1).Return(rpcBatch, nil).Once() m.stateMock.On("AddSequence", mock.MatchedBy(matchProverCtxFn), mock.Anything, nil).Return(nil).Once() m.stateMock.On("AddGeneratedProof", mock.MatchedBy(matchProverCtxFn), mock.Anything, nil).Run( func(args mock.Arguments) { @@ -1861,15 +1694,20 @@ func Test_tryGenerateBatchProof(t *testing.T) { }, }, nil).Twice() - oldDBBatch := state.DBBatch{ - Batch: state.Batch{ - AccInputHash: common.Hash{}, - }, - } - m.stateMock.On("GetBatch", mock.Anything, lastVerifiedBatchNum, nil).Return(&oldDBBatch, nil).Twice() - expectedInputProver, err := a.buildInputProver(context.Background(), &batch, dbBatch.Witness) + m.rpcMock.On("GetBatch", lastVerifiedBatchNum).Return(rpcBatch, nil).Twice() + expectedInputProver, err := a.buildInputProver(context.Background(), &batch, []byte("witness")) require.NoError(err) + m.rpcMock.On("GetWitness", lastVerifiedBatchNum+1, false).Return([]byte("witness"), nil) + + virtualBatch := synchronizer.VirtualBatch{ + BatchNumber: lastVerifiedBatchNum + 1, + BatchL2Data: batchL2Data, + L1InfoRoot: &l1InfoRoot, + } + + m.synchronizerMock.On("GetVirtualBatchByBatchNumber", mock.Anything, lastVerifiedBatchNum+1).Return(&virtualBatch, nil).Once() + m.proverMock.On("BatchProof", expectedInputProver).Return(&proofID, nil).Once() m.proverMock.On("WaitRecursiveProof", mock.MatchedBy(matchProverCtxFn), proofID).Return("", common.Hash{}, errTest).Once() m.stateMock.On("DeleteGeneratedProofs", mock.MatchedBy(matchAggregatorCtxFn), batchToProve.BatchNumber, batchToProve.BatchNumber, nil).Return(errTest).Once() @@ -1878,7 +1716,7 @@ func Test_tryGenerateBatchProof(t *testing.T) { assert.False(result) assert.ErrorIs(err, errTest) }, - }, //nolint:dupl + }, { name: "not time to send final ok", setup: func(m mox, a *Aggregator) { @@ -1899,10 +1737,6 @@ func Test_tryGenerateBatchProof(t *testing.T) { ChainID: uint64(1), ForkID: uint64(12), } - dbBatch := state.DBBatch{ - Witness: []byte("witness"), - Batch: batch, - } m.etherman.On("GetLatestVerifiedBatchNum").Return(lastVerifiedBatchNum, nil).Once() m.stateMock.On("CheckProofExistsForBatch", mock.MatchedBy(matchProverCtxFn), mock.AnythingOfType("uint64"), nil).Return(false, nil).Once() @@ -1911,7 +1745,7 @@ func Test_tryGenerateBatchProof(t *testing.T) { ToBatchNumber: uint64(20), } m.synchronizerMock.On("GetSequenceByBatchNumber", mock.MatchedBy(matchProverCtxFn), lastVerifiedBatchNum+1).Return(&sequence, nil).Once() - m.stateMock.On("GetBatch", mock.MatchedBy(matchProverCtxFn), lastVerifiedBatchNum+1, nil).Return(&dbBatch, nil).Once() + m.stateMock.On("AddSequence", mock.MatchedBy(matchProverCtxFn), mock.Anything, nil).Return(nil).Once() m.stateMock.On("AddGeneratedProof", mock.MatchedBy(matchProverCtxFn), mock.Anything, nil).Run( func(args mock.Arguments) { @@ -1934,13 +1768,23 @@ func Test_tryGenerateBatchProof(t *testing.T) { }, }, nil).Twice() - oldDBBatch := state.DBBatch{ - Batch: state.Batch{ - AccInputHash: common.Hash{}, - }, + rpcBatch := rpctypes.NewRPCBatch(lastVerifiedBatchNum, common.Hash{}, []string{}, batchL2Data, common.Hash{}, common.BytesToHash([]byte("mock LocalExitRoot")), common.BytesToHash([]byte("mock StateRoot")), common.Address{}, false) + rpcBatch.SetLastL2BLockTimestamp(uint64(time.Now().Unix())) + rpcBatch2 := rpctypes.NewRPCBatch(lastVerifiedBatchNum+1, common.Hash{}, []string{}, batchL2Data, common.Hash{}, common.BytesToHash([]byte("mock LocalExitRoot")), common.BytesToHash([]byte("mock StateRoot")), common.Address{}, false) + rpcBatch2.SetLastL2BLockTimestamp(uint64(time.Now().Unix())) + m.rpcMock.On("GetBatch", lastVerifiedBatchNum).Return(rpcBatch, nil) + m.rpcMock.On("GetBatch", lastVerifiedBatchNum+1).Return(rpcBatch2, nil) + m.rpcMock.On("GetWitness", lastVerifiedBatchNum+1, false).Return([]byte("witness"), nil) + + virtualBatch := synchronizer.VirtualBatch{ + BatchNumber: lastVerifiedBatchNum + 1, + BatchL2Data: batchL2Data, + L1InfoRoot: &l1InfoRoot, } - m.stateMock.On("GetBatch", mock.Anything, lastVerifiedBatchNum, nil).Return(&oldDBBatch, nil).Twice() - expectedInputProver, err := a.buildInputProver(context.Background(), &batch, dbBatch.Witness) + + m.synchronizerMock.On("GetVirtualBatchByBatchNumber", mock.Anything, lastVerifiedBatchNum+1).Return(&virtualBatch, nil).Once() + + expectedInputProver, err := a.buildInputProver(context.Background(), &batch, []byte("witness")) require.NoError(err) m.proverMock.On("BatchProof", expectedInputProver).Return(&proofID, nil).Once() @@ -1987,10 +1831,6 @@ func Test_tryGenerateBatchProof(t *testing.T) { ChainID: uint64(1), ForkID: uint64(12), } - dbBatch := state.DBBatch{ - Witness: []byte("witness"), - Batch: batch, - } m.etherman.On("GetLatestVerifiedBatchNum").Return(lastVerifiedBatchNum, nil).Once() m.stateMock.On("CheckProofExistsForBatch", mock.MatchedBy(matchProverCtxFn), mock.AnythingOfType("uint64"), nil).Return(false, nil).Once() @@ -1999,7 +1839,25 @@ func Test_tryGenerateBatchProof(t *testing.T) { ToBatchNumber: uint64(20), } m.synchronizerMock.On("GetSequenceByBatchNumber", mock.MatchedBy(matchProverCtxFn), lastVerifiedBatchNum+1).Return(&sequence, nil).Once() - m.stateMock.On("GetBatch", mock.MatchedBy(matchProverCtxFn), lastVerifiedBatchNum+1, nil).Return(&dbBatch, nil).Once() + + rpcBatch := rpctypes.NewRPCBatch(lastVerifiedBatchNum, common.Hash{}, []string{}, batchL2Data, common.Hash{}, common.BytesToHash([]byte("mock LocalExitRoot")), common.BytesToHash([]byte("mock StateRoot")), common.Address{}, false) + rpcBatch.SetLastL2BLockTimestamp(uint64(time.Now().Unix())) + rpcBatch2 := rpctypes.NewRPCBatch(lastVerifiedBatchNum+1, common.Hash{}, []string{}, batchL2Data, common.Hash{}, common.BytesToHash([]byte("mock LocalExitRoot")), common.BytesToHash([]byte("mock StateRoot")), common.Address{}, false) + rpcBatch2.SetLastL2BLockTimestamp(uint64(time.Now().Unix())) + m.rpcMock.On("GetBatch", lastVerifiedBatchNum).Return(rpcBatch, nil) + m.rpcMock.On("GetBatch", lastVerifiedBatchNum+1).Return(rpcBatch2, nil) + m.rpcMock.On("GetWitness", lastVerifiedBatchNum+1, false).Return([]byte("witness"), nil) + + virtualBatch := synchronizer.VirtualBatch{ + BatchNumber: lastVerifiedBatchNum + 1, + BatchL2Data: batchL2Data, + L1InfoRoot: &l1InfoRoot, + } + + m.synchronizerMock.On("GetVirtualBatchByBatchNumber", mock.Anything, lastVerifiedBatchNum+1).Return(&virtualBatch, nil).Once() + + m.rpcMock.On("GetWitness", lastVerifiedBatchNum+1, false).Return([]byte("witness"), nil) + m.stateMock.On("AddSequence", mock.MatchedBy(matchProverCtxFn), mock.Anything, nil).Return(nil).Once() m.stateMock.On("AddGeneratedProof", mock.MatchedBy(matchProverCtxFn), mock.Anything, nil).Run( func(args mock.Arguments) { @@ -2022,13 +1880,7 @@ func Test_tryGenerateBatchProof(t *testing.T) { }, }, nil).Twice() - oldDBBatch := state.DBBatch{ - Batch: state.Batch{ - AccInputHash: common.Hash{}, - }, - } - m.stateMock.On("GetBatch", mock.Anything, lastVerifiedBatchNum, nil).Return(&oldDBBatch, nil).Twice() - expectedInputProver, err := a.buildInputProver(context.Background(), &batch, dbBatch.Witness) + expectedInputProver, err := a.buildInputProver(context.Background(), &batch, []byte("witness")) require.NoError(err) m.proverMock.On("BatchProof", expectedInputProver).Return(&proofID, nil).Once() @@ -2064,6 +1916,7 @@ func Test_tryGenerateBatchProof(t *testing.T) { etherman := mocks.NewEthermanMock(t) proverMock := mocks.NewProverInterfaceMock(t) synchronizerMock := mocks.NewSynchronizerInterfaceMock(t) + mockRPC := mocks.NewRPCInterfaceMock(t) a := Aggregator{ cfg: cfg, @@ -2077,6 +1930,7 @@ func Test_tryGenerateBatchProof(t *testing.T) { finalProof: make(chan finalProofMsg), profitabilityChecker: NewTxProfitabilityCheckerAcceptAll(stateMock, cfg.IntervalAfterWhichBatchConsolidateAnyway.Duration), l1Syncr: synchronizerMock, + rpcClient: mockRPC, } aggregatorCtx := context.WithValue(context.Background(), "owner", ownerAggregator) //nolint:staticcheck a.ctx, a.exit = context.WithCancel(aggregatorCtx) @@ -2087,6 +1941,7 @@ func Test_tryGenerateBatchProof(t *testing.T) { etherman: etherman, proverMock: proverMock, synchronizerMock: synchronizerMock, + rpcMock: mockRPC, } if tc.setup != nil { tc.setup(m, &a) diff --git a/aggregator/config.go b/aggregator/config.go index fbbc9c9b..cdef80fd 100644 --- a/aggregator/config.go +++ b/aggregator/config.go @@ -112,21 +112,18 @@ type Config struct { // final gas: 1100 GasOffset uint64 `mapstructure:"GasOffset"` + // RPCURL is the URL of the RPC server + RPCURL string `mapstructure:"RPCURL"` + // WitnessURL is the URL of the witness server WitnessURL string `mapstructure:"WitnessURL"` - // UseL1BatchData is a flag to enable the use of L1 batch data in the aggregator - UseL1BatchData bool `mapstructure:"UseL1BatchData"` - // UseFullWitness is a flag to enable the use of full witness in the aggregator UseFullWitness bool `mapstructure:"UseFullWitness"` // DB is the database configuration DB db.Config `mapstructure:"DB"` - // StreamClient is the config for the stream client - StreamClient StreamClientCfg `mapstructure:"StreamClient"` - // EthTxManager is the config for the ethtxmanager EthTxManager ethtxmanager.Config `mapstructure:"EthTxManager"` @@ -149,22 +146,11 @@ type Config struct { // AggLayerURL url of the agglayer service AggLayerURL string `mapstructure:"AggLayerURL"` - // MaxWitnessRetrievalWorkers is the maximum number of workers that will be used to retrieve the witness - MaxWitnessRetrievalWorkers int `mapstructure:"MaxWitnessRetrievalWorkers"` - // SyncModeOnlyEnabled is a flag that activates sync mode exclusively. // When enabled, the aggregator will sync data only from L1 and will not generate or read the data stream. SyncModeOnlyEnabled bool `mapstructure:"SyncModeOnlyEnabled"` } -// StreamClientCfg contains the data streamer's configuration properties -type StreamClientCfg struct { - // Datastream server to connect - Server string `mapstructure:"Server"` - // Log is the log configuration - Log log.Config `mapstructure:"Log"` -} - // newKeyFromKeystore creates a private key from a keystore file func newKeyFromKeystore(cfg types.KeystoreFileConfig) (*ecdsa.PrivateKey, error) { if cfg.Path == "" && cfg.Password == "" { diff --git a/aggregator/db/migrations/0004.sql b/aggregator/db/migrations/0004.sql new file mode 100644 index 00000000..cb186fc0 --- /dev/null +++ b/aggregator/db/migrations/0004.sql @@ -0,0 +1,23 @@ +-- +migrate Down +CREATE TABLE IF NOT EXISTS aggregator.batch ( + batch_num BIGINT NOT NULL, + batch jsonb NOT NULL, + datastream varchar NOT NULL, + PRIMARY KEY (batch_num) +); + +ALTER TABLE aggregator.proof + ADD CONSTRAINT IF NOT EXISTS proof_batch_num_fkey FOREIGN KEY (batch_num) REFERENCES aggregator.batch (batch_num) ON DELETE CASCADE; + +ALTER TABLE aggregator.sequence + ADD CONSTRAINT IF NOT EXISTS sequence_from_batch_num_fkey FOREIGN KEY (from_batch_num) REFERENCES aggregator.batch (batch_num) ON DELETE CASCADE; + + +-- +migrate Up +ALTER TABLE aggregator.proof + DROP CONSTRAINT IF EXISTS proof_batch_num_fkey; + +ALTER TABLE aggregator.sequence + DROP CONSTRAINT IF EXISTS sequence_from_batch_num_fkey; + +DROP TABLE IF EXISTS aggregator.batch; diff --git a/aggregator/interfaces.go b/aggregator/interfaces.go index ee70d07c..81f63d94 100644 --- a/aggregator/interfaces.go +++ b/aggregator/interfaces.go @@ -6,17 +6,21 @@ import ( ethmanTypes "github.com/0xPolygon/cdk/aggregator/ethmantypes" "github.com/0xPolygon/cdk/aggregator/prover" + "github.com/0xPolygon/cdk/rpc/types" "github.com/0xPolygon/cdk/state" "github.com/0xPolygon/zkevm-ethtx-manager/ethtxmanager" ethtxtypes "github.com/0xPolygon/zkevm-ethtx-manager/types" - "github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" + ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto/kzg4844" "github.com/jackc/pgx/v4" ) // Consumer interfaces required by the package. +type RPCInterface interface { + GetBatch(batchNumber uint64) (*types.RPCBatch, error) + GetWitness(batchNumber uint64, fullWitness bool) ([]byte, error) +} type ProverInterface interface { Name() string @@ -37,9 +41,9 @@ type Etherman interface { BuildTrustedVerifyBatchesTxData( lastVerifiedBatch, newVerifiedBatch uint64, inputs *ethmanTypes.FinalProofInputs, beneficiary common.Address, ) (to *common.Address, data []byte, err error) - GetLatestBlockHeader(ctx context.Context) (*types.Header, error) + GetLatestBlockHeader(ctx context.Context) (*ethtypes.Header, error) GetBatchAccInputHash(ctx context.Context, batchNumber uint64) (common.Hash, error) - HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) + HeaderByNumber(ctx context.Context, number *big.Int) (*ethtypes.Header, error) } // aggregatorTxProfitabilityChecker interface for different profitability @@ -62,26 +66,6 @@ type StateInterface interface { CleanupLockedProofs(ctx context.Context, duration string, dbTx pgx.Tx) (int64, error) CheckProofExistsForBatch(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (bool, error) AddSequence(ctx context.Context, sequence state.Sequence, dbTx pgx.Tx) error - AddBatch(ctx context.Context, dbBatch *state.DBBatch, dbTx pgx.Tx) error - GetBatch(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (*state.DBBatch, error) - DeleteBatchesOlderThanBatchNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) error - DeleteBatchesNewerThanBatchNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) error -} - -// StreamClient represents the stream client behaviour -type StreamClient interface { - Start() error - ExecCommandStart(fromEntry uint64) error - ExecCommandStartBookmark(fromBookmark []byte) error - ExecCommandStop() error - ExecCommandGetHeader() (datastreamer.HeaderEntry, error) - ExecCommandGetEntry(fromEntry uint64) (datastreamer.FileEntry, error) - ExecCommandGetBookmark(fromBookmark []byte) (datastreamer.FileEntry, error) - GetFromStream() uint64 - GetTotalEntries() uint64 - SetProcessEntryFunc(f datastreamer.ProcessEntryFunc) - ResetProcessEntryFunc() - IsStarted() bool } // EthTxManagerClient represents the eth tx manager interface @@ -92,7 +76,7 @@ type EthTxManagerClient interface { value *big.Int, data []byte, gasOffset uint64, - sidecar *types.BlobTxSidecar, + sidecar *ethtypes.BlobTxSidecar, ) (common.Hash, error) AddWithGas( ctx context.Context, @@ -100,11 +84,11 @@ type EthTxManagerClient interface { value *big.Int, data []byte, gasOffset uint64, - sidecar *types.BlobTxSidecar, + sidecar *ethtypes.BlobTxSidecar, gas uint64, ) (common.Hash, error) EncodeBlobData(data []byte) (kzg4844.Blob, error) - MakeBlobSidecar(blobs []kzg4844.Blob) *types.BlobTxSidecar + MakeBlobSidecar(blobs []kzg4844.Blob) *ethtypes.BlobTxSidecar ProcessPendingMonitoredTxs(ctx context.Context, resultHandler ethtxmanager.ResultHandler) Remove(ctx context.Context, id common.Hash) error RemoveAll(ctx context.Context) error diff --git a/aggregator/mocks/mock_StreamClient.go b/aggregator/mocks/mock_StreamClient.go deleted file mode 100644 index 7962d31e..00000000 --- a/aggregator/mocks/mock_StreamClient.go +++ /dev/null @@ -1,247 +0,0 @@ -// Code generated by mockery v2.39.0. DO NOT EDIT. - -package mocks - -import ( - datastreamer "github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer" - mock "github.com/stretchr/testify/mock" -) - -// StreamClientMock is an autogenerated mock type for the StreamClient type -type StreamClientMock struct { - mock.Mock -} - -// ExecCommandGetBookmark provides a mock function with given fields: fromBookmark -func (_m *StreamClientMock) ExecCommandGetBookmark(fromBookmark []byte) (datastreamer.FileEntry, error) { - ret := _m.Called(fromBookmark) - - if len(ret) == 0 { - panic("no return value specified for ExecCommandGetBookmark") - } - - var r0 datastreamer.FileEntry - var r1 error - if rf, ok := ret.Get(0).(func([]byte) (datastreamer.FileEntry, error)); ok { - return rf(fromBookmark) - } - if rf, ok := ret.Get(0).(func([]byte) datastreamer.FileEntry); ok { - r0 = rf(fromBookmark) - } else { - r0 = ret.Get(0).(datastreamer.FileEntry) - } - - if rf, ok := ret.Get(1).(func([]byte) error); ok { - r1 = rf(fromBookmark) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// ExecCommandGetEntry provides a mock function with given fields: fromEntry -func (_m *StreamClientMock) ExecCommandGetEntry(fromEntry uint64) (datastreamer.FileEntry, error) { - ret := _m.Called(fromEntry) - - if len(ret) == 0 { - panic("no return value specified for ExecCommandGetEntry") - } - - var r0 datastreamer.FileEntry - var r1 error - if rf, ok := ret.Get(0).(func(uint64) (datastreamer.FileEntry, error)); ok { - return rf(fromEntry) - } - if rf, ok := ret.Get(0).(func(uint64) datastreamer.FileEntry); ok { - r0 = rf(fromEntry) - } else { - r0 = ret.Get(0).(datastreamer.FileEntry) - } - - if rf, ok := ret.Get(1).(func(uint64) error); ok { - r1 = rf(fromEntry) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// ExecCommandGetHeader provides a mock function with given fields: -func (_m *StreamClientMock) ExecCommandGetHeader() (datastreamer.HeaderEntry, error) { - ret := _m.Called() - - if len(ret) == 0 { - panic("no return value specified for ExecCommandGetHeader") - } - - var r0 datastreamer.HeaderEntry - var r1 error - if rf, ok := ret.Get(0).(func() (datastreamer.HeaderEntry, error)); ok { - return rf() - } - if rf, ok := ret.Get(0).(func() datastreamer.HeaderEntry); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(datastreamer.HeaderEntry) - } - - if rf, ok := ret.Get(1).(func() error); ok { - r1 = rf() - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// ExecCommandStart provides a mock function with given fields: fromEntry -func (_m *StreamClientMock) ExecCommandStart(fromEntry uint64) error { - ret := _m.Called(fromEntry) - - if len(ret) == 0 { - panic("no return value specified for ExecCommandStart") - } - - var r0 error - if rf, ok := ret.Get(0).(func(uint64) error); ok { - r0 = rf(fromEntry) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// ExecCommandStartBookmark provides a mock function with given fields: fromBookmark -func (_m *StreamClientMock) ExecCommandStartBookmark(fromBookmark []byte) error { - ret := _m.Called(fromBookmark) - - if len(ret) == 0 { - panic("no return value specified for ExecCommandStartBookmark") - } - - var r0 error - if rf, ok := ret.Get(0).(func([]byte) error); ok { - r0 = rf(fromBookmark) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// ExecCommandStop provides a mock function with given fields: -func (_m *StreamClientMock) ExecCommandStop() error { - ret := _m.Called() - - if len(ret) == 0 { - panic("no return value specified for ExecCommandStop") - } - - var r0 error - if rf, ok := ret.Get(0).(func() error); ok { - r0 = rf() - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// GetFromStream provides a mock function with given fields: -func (_m *StreamClientMock) GetFromStream() uint64 { - ret := _m.Called() - - if len(ret) == 0 { - panic("no return value specified for GetFromStream") - } - - var r0 uint64 - if rf, ok := ret.Get(0).(func() uint64); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(uint64) - } - - return r0 -} - -// GetTotalEntries provides a mock function with given fields: -func (_m *StreamClientMock) GetTotalEntries() uint64 { - ret := _m.Called() - - if len(ret) == 0 { - panic("no return value specified for GetTotalEntries") - } - - var r0 uint64 - if rf, ok := ret.Get(0).(func() uint64); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(uint64) - } - - return r0 -} - -// IsStarted provides a mock function with given fields: -func (_m *StreamClientMock) IsStarted() bool { - ret := _m.Called() - - if len(ret) == 0 { - panic("no return value specified for IsStarted") - } - - var r0 bool - if rf, ok := ret.Get(0).(func() bool); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(bool) - } - - return r0 -} - -// ResetProcessEntryFunc provides a mock function with given fields: -func (_m *StreamClientMock) ResetProcessEntryFunc() { - _m.Called() -} - -// SetProcessEntryFunc provides a mock function with given fields: f -func (_m *StreamClientMock) SetProcessEntryFunc(f datastreamer.ProcessEntryFunc) { - _m.Called(f) -} - -// Start provides a mock function with given fields: -func (_m *StreamClientMock) Start() error { - ret := _m.Called() - - if len(ret) == 0 { - panic("no return value specified for Start") - } - - var r0 error - if rf, ok := ret.Get(0).(func() error); ok { - r0 = rf() - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// NewStreamClientMock creates a new instance of StreamClientMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewStreamClientMock(t interface { - mock.TestingT - Cleanup(func()) -}) *StreamClientMock { - mock := &StreamClientMock{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/aggregator/mocks/mock_rpc.go b/aggregator/mocks/mock_rpc.go new file mode 100644 index 00000000..2f3c07e4 --- /dev/null +++ b/aggregator/mocks/mock_rpc.go @@ -0,0 +1,87 @@ +// Code generated by mockery v2.39.0. DO NOT EDIT. + +package mocks + +import ( + types "github.com/0xPolygon/cdk/rpc/types" + mock "github.com/stretchr/testify/mock" +) + +// RPCInterfaceMock is an autogenerated mock type for the RPCInterface type +type RPCInterfaceMock struct { + mock.Mock +} + +// GetBatch provides a mock function with given fields: batchNumber +func (_m *RPCInterfaceMock) GetBatch(batchNumber uint64) (*types.RPCBatch, error) { + ret := _m.Called(batchNumber) + + if len(ret) == 0 { + panic("no return value specified for GetBatch") + } + + var r0 *types.RPCBatch + var r1 error + if rf, ok := ret.Get(0).(func(uint64) (*types.RPCBatch, error)); ok { + return rf(batchNumber) + } + if rf, ok := ret.Get(0).(func(uint64) *types.RPCBatch); ok { + r0 = rf(batchNumber) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.RPCBatch) + } + } + + if rf, ok := ret.Get(1).(func(uint64) error); ok { + r1 = rf(batchNumber) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetWitness provides a mock function with given fields: batchNumber, fullWitness +func (_m *RPCInterfaceMock) GetWitness(batchNumber uint64, fullWitness bool) ([]byte, error) { + ret := _m.Called(batchNumber, fullWitness) + + if len(ret) == 0 { + panic("no return value specified for GetWitness") + } + + var r0 []byte + var r1 error + if rf, ok := ret.Get(0).(func(uint64, bool) ([]byte, error)); ok { + return rf(batchNumber, fullWitness) + } + if rf, ok := ret.Get(0).(func(uint64, bool) []byte); ok { + r0 = rf(batchNumber, fullWitness) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]byte) + } + } + + if rf, ok := ret.Get(1).(func(uint64, bool) error); ok { + r1 = rf(batchNumber, fullWitness) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// NewRPCInterfaceMock creates a new instance of RPCInterfaceMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewRPCInterfaceMock(t interface { + mock.TestingT + Cleanup(func()) +}) *RPCInterfaceMock { + mock := &RPCInterfaceMock{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/aggregator/mocks/mock_state.go b/aggregator/mocks/mock_state.go index 8879dd05..74c9021b 100644 --- a/aggregator/mocks/mock_state.go +++ b/aggregator/mocks/mock_state.go @@ -16,24 +16,6 @@ type StateInterfaceMock struct { mock.Mock } -// AddBatch provides a mock function with given fields: ctx, dbBatch, dbTx -func (_m *StateInterfaceMock) AddBatch(ctx context.Context, dbBatch *state.DBBatch, dbTx pgx.Tx) error { - ret := _m.Called(ctx, dbBatch, dbTx) - - if len(ret) == 0 { - panic("no return value specified for AddBatch") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *state.DBBatch, pgx.Tx) error); ok { - r0 = rf(ctx, dbBatch, dbTx) - } else { - r0 = ret.Error(0) - } - - return r0 -} - // AddGeneratedProof provides a mock function with given fields: ctx, proof, dbTx func (_m *StateInterfaceMock) AddGeneratedProof(ctx context.Context, proof *state.Proof, dbTx pgx.Tx) error { ret := _m.Called(ctx, proof, dbTx) @@ -202,42 +184,6 @@ func (_m *StateInterfaceMock) CleanupLockedProofs(ctx context.Context, duration return r0, r1 } -// DeleteBatchesNewerThanBatchNumber provides a mock function with given fields: ctx, batchNumber, dbTx -func (_m *StateInterfaceMock) DeleteBatchesNewerThanBatchNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) error { - ret := _m.Called(ctx, batchNumber, dbTx) - - if len(ret) == 0 { - panic("no return value specified for DeleteBatchesNewerThanBatchNumber") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, uint64, pgx.Tx) error); ok { - r0 = rf(ctx, batchNumber, dbTx) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// DeleteBatchesOlderThanBatchNumber provides a mock function with given fields: ctx, batchNumber, dbTx -func (_m *StateInterfaceMock) DeleteBatchesOlderThanBatchNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) error { - ret := _m.Called(ctx, batchNumber, dbTx) - - if len(ret) == 0 { - panic("no return value specified for DeleteBatchesOlderThanBatchNumber") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, uint64, pgx.Tx) error); ok { - r0 = rf(ctx, batchNumber, dbTx) - } else { - r0 = ret.Error(0) - } - - return r0 -} - // DeleteGeneratedProofs provides a mock function with given fields: ctx, batchNumber, batchNumberFinal, dbTx func (_m *StateInterfaceMock) DeleteGeneratedProofs(ctx context.Context, batchNumber uint64, batchNumberFinal uint64, dbTx pgx.Tx) error { ret := _m.Called(ctx, batchNumber, batchNumberFinal, dbTx) @@ -274,36 +220,6 @@ func (_m *StateInterfaceMock) DeleteUngeneratedProofs(ctx context.Context, dbTx return r0 } -// GetBatch provides a mock function with given fields: ctx, batchNumber, dbTx -func (_m *StateInterfaceMock) GetBatch(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (*state.DBBatch, error) { - ret := _m.Called(ctx, batchNumber, dbTx) - - if len(ret) == 0 { - panic("no return value specified for GetBatch") - } - - var r0 *state.DBBatch - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, uint64, pgx.Tx) (*state.DBBatch, error)); ok { - return rf(ctx, batchNumber, dbTx) - } - if rf, ok := ret.Get(0).(func(context.Context, uint64, pgx.Tx) *state.DBBatch); ok { - r0 = rf(ctx, batchNumber, dbTx) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*state.DBBatch) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, uint64, pgx.Tx) error); ok { - r1 = rf(ctx, batchNumber, dbTx) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - // GetProofReadyToVerify provides a mock function with given fields: ctx, lastVerfiedBatchNumber, dbTx func (_m *StateInterfaceMock) GetProofReadyToVerify(ctx context.Context, lastVerfiedBatchNumber uint64, dbTx pgx.Tx) (*state.Proof, error) { ret := _m.Called(ctx, lastVerfiedBatchNumber, dbTx) diff --git a/config/default.go b/config/default.go index 442a44e0..5e5fafcb 100644 --- a/config/default.go +++ b/config/default.go @@ -4,7 +4,7 @@ package config // environment / deployment const DefaultMandatoryVars = ` L1URL = "http://localhost:8545" -L2URL = "localhost:8123" +L2URL = "http://localhost:8123" L1AggOracleURL = "http://test-aggoracle-l1:8545" L2AggOracleURL = "http://test-aggoracle-l2:8545" @@ -16,9 +16,8 @@ IsValidiumMode = false L2Coinbase = "0xfa3b44587990f97ba8b6ba7e230a5f0e95d14b3d" SequencerPrivateKeyPath = "/app/sequencer.keystore" SequencerPrivateKeyPassword = "test" -WitnessURL = "localhost:8123" +WitnessURL = "http://localhost:8123" AggLayerURL = "https://agglayer-dev.polygon.technology" -StreamServer = "localhost:6900" AggregatorPrivateKeyPath = "/app/keystore/aggregator.keystore" AggregatorPrivateKeyPassword = "testonly" @@ -133,13 +132,12 @@ SenderAddress = "{{SenderProofToL1Addr}}" CleanupLockedProofsInterval = "2m" GeneratingProofCleanupThreshold = "10m" GasOffset = 0 +RPCURL = "{{L2URL}}" WitnessURL = "{{WitnessURL}}" -UseL1BatchData = true UseFullWitness = false SettlementBackend = "l1" AggLayerTxTimeout = "5m" AggLayerURL = "{{AggLayerURL}}" -MaxWitnessRetrievalWorkers = 2 SyncModeOnlyEnabled = false [Aggregator.SequencerPrivateKey] Path = "{{SequencerPrivateKeyPath}}" @@ -156,8 +154,6 @@ SyncModeOnlyEnabled = false Environment ="{{Log.Environment}}" # "production" or "development" Level = "{{Log.Level}}" Outputs = ["stderr"] - [Aggregator.StreamClient] - Server = "{{StreamServer}}" [Aggregator.EthTxManager] FrequencyToMonitorTxs = "1s" WaitTxToBeMined = "2m" diff --git a/crates/cdk-config/src/aggregator.rs b/crates/cdk-config/src/aggregator.rs index 2e059a2f..8f37a9af 100644 --- a/crates/cdk-config/src/aggregator.rs +++ b/crates/cdk-config/src/aggregator.rs @@ -2,21 +2,6 @@ use ethers::types::Address; use serde::Deserialize; use url::Url; -/// The StreamClient configuration. -#[derive(Deserialize, Debug, Clone)] -pub struct StreamClient { - #[serde(rename = "Server", default)] - pub server: String, -} - -impl Default for StreamClient { - fn default() -> Self { - Self { - server: "localhost:9092".to_string(), - } - } -} - #[derive(Deserialize, Debug, Clone)] pub struct EthTxManager { #[serde(rename = "Etherman")] @@ -74,6 +59,8 @@ pub struct Aggregator { pub generating_proof_cleanup_threshold: String, #[serde(rename = "GasOffset", default)] pub gas_offset: u64, + #[serde(rename = "RPCURL", default = "default_url")] + pub rpc_url: Url, #[serde(rename = "WitnessURL", default = "default_url")] pub witness_url: Url, #[serde(rename = "SenderAddress", default = "default_address")] @@ -84,18 +71,11 @@ pub struct Aggregator { pub agg_layer_tx_timeout: String, #[serde(rename = "AggLayerURL", default = "default_url")] pub agg_layer_url: Url, - #[serde(rename = "UseL1BatchData", default)] - pub use_l1_batch_data: bool, #[serde(rename = "UseFullWitness", default)] pub use_full_witness: bool, - #[serde(rename = "MaxWitnessRetrievalWorkers", default)] - pub max_witness_retrieval_workers: u32, #[serde(rename = "SyncModeOnlyEnabled", default)] pub sync_mode_only_enabled: bool, - #[serde(rename = "StreamClient", default)] - pub stream_client: StreamClient, - #[serde(rename = "EthTxManager", default)] pub eth_tx_manager: EthTxManager, } @@ -127,18 +107,14 @@ impl Default for Aggregator { cleanup_locked_proofs_interval: "1h".to_string(), generating_proof_cleanup_threshold: "10m".to_string(), gas_offset: 0, + rpc_url: default_url(), witness_url: default_url(), sender_address: default_address(), settlement_backend: "default".to_string(), agg_layer_tx_timeout: "30s".to_string(), agg_layer_url: Url::parse("http://localhost:8547").unwrap(), - use_l1_batch_data: true, use_full_witness: false, - max_witness_retrieval_workers: 4, sync_mode_only_enabled: false, - stream_client: StreamClient { - server: "localhost:9092".to_string(), - }, eth_tx_manager: EthTxManager { etherman: Etherman { url: "http://localhost:9093".to_string(), diff --git a/crates/cdk/src/config_render.rs b/crates/cdk/src/config_render.rs index 2c230c52..ab3d05c3 100644 --- a/crates/cdk/src/config_render.rs +++ b/crates/cdk/src/config_render.rs @@ -93,7 +93,6 @@ fn render_yaml(config: &Config, res: Rendered) -> String { chain: dynamic-{chain_id} zkevm.l2-chain-id: {chain_id} zkevm.l2-sequencer-rpc-url: {l2_sequencer_rpc_url} -zkevm.l2-datastreamer-url: {datastreamer_host} zkevm.l1-chain-id: {l1_chain_id} zkevm.l1-rpc-url: {l1_rpc_url} zkevm.address-sequencer: {sequencer_address} @@ -117,7 +116,6 @@ ws: true "#, chain_id = config.aggregator.chain_id.clone(), l2_sequencer_rpc_url = config.aggregator.witness_url.to_string(), - datastreamer_host = config.aggregator.stream_client.server, l1_rpc_url = config.aggregator.eth_tx_manager.etherman.url, l1_chain_id = config.network_config.l1.l1_chain_id, sequencer_address = config.sequence_sender.l2_coinbase, diff --git a/go.mod b/go.mod index 631e54b7..ae03382e 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,6 @@ require ( github.com/0xPolygon/cdk-data-availability v0.0.10 github.com/0xPolygon/cdk-rpc v0.0.0-20241004114257-6c3cb6eebfb6 github.com/0xPolygon/zkevm-ethtx-manager v0.2.0 - github.com/0xPolygonHermez/zkevm-data-streamer v0.2.7 github.com/0xPolygonHermez/zkevm-synchronizer-l1 v1.0.5 github.com/ethereum/go-ethereum v1.14.8 github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3 diff --git a/go.sum b/go.sum index 4a0095c8..96f2dc93 100644 --- a/go.sum +++ b/go.sum @@ -6,10 +6,6 @@ github.com/0xPolygon/cdk-rpc v0.0.0-20241004114257-6c3cb6eebfb6 h1:FXL/rcO7/GtZ3 github.com/0xPolygon/cdk-rpc v0.0.0-20241004114257-6c3cb6eebfb6/go.mod h1:2scWqMMufrQXu7TikDgQ3BsyaKoX8qP26D6E262vSOg= github.com/0xPolygon/zkevm-ethtx-manager v0.2.0 h1:QWE6nKBBHkMEiza723hJk0+oZbLSdQZTX4I48jWw15I= github.com/0xPolygon/zkevm-ethtx-manager v0.2.0/go.mod h1:lqQmzSo2OXEZItD0R4Cd+lqKFxphXEWgqHefVcGDZZc= -github.com/0xPolygonHermez/zkevm-data-streamer v0.2.7 h1:73sYxRQ9cOmtYBEyHePgEwrVULR+YruSQxVXCt/SmzU= -github.com/0xPolygonHermez/zkevm-data-streamer v0.2.7/go.mod h1:7nM7Ihk+fTG1TQPwdZoGOYd3wprqqyIyjtS514uHzWE= -github.com/0xPolygonHermez/zkevm-synchronizer-l1 v1.0.4 h1:+ZbyEpaBZu88jWtov/7iBWvwgBMu5cxlvAFDxsPrnGQ= -github.com/0xPolygonHermez/zkevm-synchronizer-l1 v1.0.4/go.mod h1:X4Su/M/+hSISqdl9yomKlRsbTyuZHsRohporyHsP8gg= github.com/0xPolygonHermez/zkevm-synchronizer-l1 v1.0.5 h1:YmnhuCl349MoNASN0fMeGKU1o9HqJhiZkfMsA/1cTRA= github.com/0xPolygonHermez/zkevm-synchronizer-l1 v1.0.5/go.mod h1:X4Su/M/+hSISqdl9yomKlRsbTyuZHsRohporyHsP8gg= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= diff --git a/sequencesender/rpc.go b/rpc/batch.go similarity index 52% rename from sequencesender/rpc.go rename to rpc/batch.go index a604da37..59e10b20 100644 --- a/sequencesender/rpc.go +++ b/rpc/batch.go @@ -1,23 +1,42 @@ -package sequencesender +package rpc import ( "encoding/json" + "errors" "fmt" "math/big" "github.com/0xPolygon/cdk-rpc/rpc" "github.com/0xPolygon/cdk/log" - "github.com/0xPolygon/cdk/sequencesender/seqsendertypes/rpcbatch" + "github.com/0xPolygon/cdk/rpc/types" "github.com/0xPolygon/cdk/state" "github.com/ethereum/go-ethereum/common" ) -func getBatchFromRPC(addr string, batchNumber uint64) (*rpcbatch.RPCBatch, error) { +var ( + // ErrBusy is returned when the witness server is busy + ErrBusy = errors.New("witness server is busy") +) + +const busyResponse = "busy" + +type BatchEndpoints struct { + url string +} + +func NewBatchEndpoints(url string) *BatchEndpoints { + return &BatchEndpoints{url: url} +} + +func (b *BatchEndpoints) GetBatch(batchNumber uint64) (*types.RPCBatch, error) { type zkEVMBatch struct { + AccInputHash string `json:"accInputHash"` Blocks []string `json:"blocks"` BatchL2Data string `json:"batchL2Data"` Coinbase string `json:"coinbase"` GlobalExitRoot string `json:"globalExitRoot"` + LocalExitRoot string `json:"localExitRoot"` + StateRoot string `json:"stateRoot"` Closed bool `json:"closed"` Timestamp string `json:"timestamp"` } @@ -26,7 +45,7 @@ func getBatchFromRPC(addr string, batchNumber uint64) (*rpcbatch.RPCBatch, error log.Infof("Getting batch %d from RPC", batchNumber) - response, err := rpc.JSONRPCCall(addr, "zkevm_getBatchByNumber", batchNumber) + response, err := rpc.JSONRPCCall(b.url, "zkevm_getBatchByNumber", batchNumber) if err != nil { return nil, err } @@ -47,14 +66,13 @@ func getBatchFromRPC(addr string, batchNumber uint64) (*rpcbatch.RPCBatch, error return nil, fmt.Errorf("error unmarshalling the batch from the response calling zkevm_getBatchByNumber: %w", err) } - rpcBatch, err := rpcbatch.New(batchNumber, zkEVMBatchData.Blocks, common.FromHex(zkEVMBatchData.BatchL2Data), - common.HexToHash(zkEVMBatchData.GlobalExitRoot), common.HexToAddress(zkEVMBatchData.Coinbase), zkEVMBatchData.Closed) - if err != nil { - return nil, fmt.Errorf("error creating the rpc batch: %w", err) - } + rpcBatch := types.NewRPCBatch(batchNumber, common.HexToHash(zkEVMBatchData.AccInputHash), zkEVMBatchData.Blocks, + common.FromHex(zkEVMBatchData.BatchL2Data), common.HexToHash(zkEVMBatchData.GlobalExitRoot), + common.HexToHash(zkEVMBatchData.LocalExitRoot), common.HexToHash(zkEVMBatchData.StateRoot), + common.HexToAddress(zkEVMBatchData.Coinbase), zkEVMBatchData.Closed) if len(zkEVMBatchData.Blocks) > 0 { - lastL2BlockTimestamp, err := getL2BlockTimestampFromRPC(addr, zkEVMBatchData.Blocks[len(zkEVMBatchData.Blocks)-1]) + lastL2BlockTimestamp, err := b.GetL2BlockTimestamp(zkEVMBatchData.Blocks[len(zkEVMBatchData.Blocks)-1]) if err != nil { return nil, fmt.Errorf("error getting the last l2 block timestamp from the rpc: %w", err) } @@ -67,14 +85,14 @@ func getBatchFromRPC(addr string, batchNumber uint64) (*rpcbatch.RPCBatch, error return rpcBatch, nil } -func getL2BlockTimestampFromRPC(addr, blockHash string) (uint64, error) { +func (b *BatchEndpoints) GetL2BlockTimestamp(blockHash string) (uint64, error) { type zkeEVML2Block struct { Timestamp string `json:"timestamp"` } log.Infof("Getting l2 block timestamp from RPC. Block hash: %s", blockHash) - response, err := rpc.JSONRPCCall(addr, "eth_getBlockByHash", blockHash, false) + response, err := rpc.JSONRPCCall(b.url, "eth_getBlockByHash", blockHash, false) if err != nil { return 0, err } @@ -93,3 +111,39 @@ func getL2BlockTimestampFromRPC(addr, blockHash string) (uint64, error) { return new(big.Int).SetBytes(common.FromHex(l2Block.Timestamp)).Uint64(), nil } + +func (b *BatchEndpoints) GetWitness(batchNumber uint64, fullWitness bool) ([]byte, error) { + var ( + witness string + response rpc.Response + err error + ) + + witnessType := "trimmed" + if fullWitness { + witnessType = "full" + } + + log.Infof("Requesting witness for batch %d of type %s", batchNumber, witnessType) + + response, err = rpc.JSONRPCCall(b.url, "zkevm_getBatchWitness", batchNumber, witnessType) + if err != nil { + return nil, err + } + + // Check if the response is an error + if response.Error != nil { + if response.Error.Message == busyResponse { + return nil, ErrBusy + } + + return nil, fmt.Errorf("error from witness for batch %d: %v", batchNumber, response.Error) + } + + err = json.Unmarshal(response.Result, &witness) + if err != nil { + return nil, err + } + + return common.FromHex(witness), nil +} diff --git a/rpc/batch_test.go b/rpc/batch_test.go new file mode 100644 index 00000000..d6940bf3 --- /dev/null +++ b/rpc/batch_test.go @@ -0,0 +1,265 @@ +package rpc + +import ( + "encoding/json" + "errors" + "fmt" + "net/http" + "net/http/httptest" + "testing" + + "github.com/0xPolygon/cdk-rpc/rpc" + "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/require" +) + +func Test_getBatchFromRPC(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + batch uint64 + getBatchByNumberResp string + getBlockByHasResp string + getBatchByNumberErr error + getBlockByHashErr error + expectBlocks int + expectData []byte + expectTimestamp uint64 + expectErr error + }{ + { + name: "successfully fetched", + getBatchByNumberResp: `{"jsonrpc":"2.0","id":1,"result":{"blocks":["1", "2", "3"],"batchL2Data":"0x1234567"}}`, + getBlockByHasResp: `{"jsonrpc":"2.0","id":1,"result":{"timestamp":"0x123456"}}`, + batch: 0, + expectBlocks: 3, + expectData: common.FromHex("0x1234567"), + expectTimestamp: 1193046, + expectErr: nil, + }, + { + name: "invalid json", + getBatchByNumberResp: `{"jsonrpc":"2.0","id":1,"result":{"blocks":invalid,"batchL2Data":"test"}}`, + batch: 0, + expectBlocks: 3, + expectData: nil, + expectErr: errors.New("invalid character 'i' looking for beginning of value"), + }, + { + name: "wrong json", + getBatchByNumberResp: `{"jsonrpc":"2.0","id":1,"result":{"blocks":"invalid","batchL2Data":"test"}}`, + batch: 0, + expectBlocks: 3, + expectData: nil, + expectErr: errors.New("error unmarshalling the batch from the response calling zkevm_getBatchByNumber: json: cannot unmarshal string into Go struct field zkEVMBatch.blocks of type []string"), + }, + { + name: "error in the response", + getBatchByNumberResp: `{"jsonrpc":"2.0","id":1,"result":null,"error":{"code":-32602,"message":"Invalid params"}}`, + batch: 0, + expectBlocks: 0, + expectData: nil, + expectErr: errors.New("error in the response calling zkevm_getBatchByNumber: &{-32602 Invalid params }"), + }, + { + name: "http failed", + getBatchByNumberErr: errors.New("failed to fetch"), + batch: 0, + expectBlocks: 0, + expectData: nil, + expectErr: errors.New("invalid status code, expected: 200, found: 500"), + }, + } + + for _, tt := range tests { + tt := tt + + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var req rpc.Request + err := json.NewDecoder(r.Body).Decode(&req) + require.NoError(t, err) + + switch req.Method { + case "zkevm_getBatchByNumber": + if tt.getBatchByNumberErr != nil { + http.Error(w, tt.getBatchByNumberErr.Error(), http.StatusInternalServerError) + return + } + + _, _ = w.Write([]byte(tt.getBatchByNumberResp)) + case "eth_getBlockByHash": + if tt.getBlockByHashErr != nil { + http.Error(w, tt.getBlockByHashErr.Error(), http.StatusInternalServerError) + return + } + _, _ = w.Write([]byte(tt.getBlockByHasResp)) + default: + http.Error(w, "method not found", http.StatusNotFound) + } + })) + defer srv.Close() + + rcpBatchClient := NewBatchEndpoints(srv.URL) + rpcBatch, err := rcpBatchClient.GetBatch(tt.batch) + if rpcBatch != nil { + copiedrpcBatch := rpcBatch.DeepCopy() + require.NotNil(t, copiedrpcBatch) + str := copiedrpcBatch.String() + require.NotEmpty(t, str) + } + if tt.expectErr != nil { + require.Equal(t, tt.expectErr.Error(), err.Error()) + } else { + require.NoError(t, err) + require.Equal(t, tt.expectTimestamp, rpcBatch.LastL2BLockTimestamp()) + require.Equal(t, tt.expectData, rpcBatch.L2Data()) + } + }) + } +} + +func Test_getBatchWitnessRPC(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + batch uint64 + getBatchWitnessResp string + getBatchWitnessErr error + expectData []byte + expectErr error + full bool + }{ + { + name: "get batch trimmed witness success", + batch: 1, + getBatchWitnessResp: `{"jsonrpc":"2.0","id":1,"result":"0x0123456"}`, + getBatchWitnessErr: nil, + expectData: common.FromHex("0x0123456"), + expectErr: nil, + full: false, + }, + { + name: "get batch full witness success", + batch: 1, + getBatchWitnessResp: `{"jsonrpc":"2.0","id":1,"result":"0x0123456"}`, + getBatchWitnessErr: nil, + expectData: common.FromHex("0x0123456"), + expectErr: nil, + full: true, + }, + { + name: "get batch witness busy", + batch: 1, + getBatchWitnessResp: `{"jsonrpc":"2.0","id":1,"result":"", "error":{"code":-32000,"message":"busy"}}`, + getBatchWitnessErr: nil, + expectData: []byte{}, + expectErr: ErrBusy, + full: false, + }, + } + + for _, tt := range tests { + tt := tt + + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var req rpc.Request + err := json.NewDecoder(r.Body).Decode(&req) + require.NoError(t, err) + + switch req.Method { + case "zkevm_getBatchWitness": + if tt.getBatchWitnessErr != nil { + http.Error(w, tt.getBatchWitnessErr.Error(), http.StatusInternalServerError) + return + } + _, _ = w.Write([]byte(tt.getBatchWitnessResp)) + default: + http.Error(w, "method not found", http.StatusNotFound) + } + })) + defer srv.Close() + + rcpBatchClient := NewBatchEndpoints(srv.URL) + witness, err := rcpBatchClient.GetWitness(tt.batch, false) + if tt.expectErr != nil { + require.Equal(t, tt.expectErr.Error(), err.Error()) + } else { + require.NoError(t, err) + require.Equal(t, tt.expectData, witness) + } + }) + } +} + +func Test_getGetL2BlockTimestamp(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + blockHash []byte + response string + error error + expectData uint64 + expectErr error + }{ + { + name: "success", + blockHash: []byte{1}, + response: `{"jsonrpc":"2.0","id":1,"result":{"timestamp":"0x123456"}}`, + error: nil, + expectData: uint64(0x123456), + expectErr: nil, + }, + { + name: "fail", + blockHash: []byte{2}, + response: `{"jsonrpc":"2.0","id":1,"result":{"timestamp":"0x123456"}}`, + error: fmt.Errorf("error"), + expectData: 0, + expectErr: fmt.Errorf("invalid status code, expected: 200, found: 500"), + }, + } + + for _, tt := range tests { + tt := tt + + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var req rpc.Request + err := json.NewDecoder(r.Body).Decode(&req) + require.NoError(t, err) + + switch req.Method { + case "eth_getBlockByHash": + if tt.error != nil { + http.Error(w, tt.error.Error(), http.StatusInternalServerError) + return + } + _, _ = w.Write([]byte(tt.response)) + default: + http.Error(w, "method not found", http.StatusNotFound) + } + })) + defer srv.Close() + + rcpBatchClient := NewBatchEndpoints(srv.URL) + timestamp, err := rcpBatchClient.GetL2BlockTimestamp(string(tt.blockHash)) + if tt.expectErr != nil { + require.Equal(t, tt.expectErr.Error(), err.Error()) + } else { + require.NoError(t, err) + require.Equal(t, tt.expectData, timestamp) + } + }) + } +} diff --git a/sequencesender/seqsendertypes/rpcbatch/rpcbatch.go b/rpc/types/rpcbatch.go similarity index 70% rename from sequencesender/seqsendertypes/rpcbatch/rpcbatch.go rename to rpc/types/rpcbatch.go index fafc1841..93e1158d 100644 --- a/sequencesender/seqsendertypes/rpcbatch/rpcbatch.go +++ b/rpc/types/rpcbatch.go @@ -1,4 +1,4 @@ -package rpcbatch +package types import ( "fmt" @@ -8,35 +8,45 @@ import ( ) type RPCBatch struct { - batchNumber uint64 `json:"batchNumber"` - blockHashes []string `json:"blocks"` - batchL2Data []byte `json:"batchL2Data"` - globalExitRoot common.Hash `json:"globalExitRoot"` - coinbase common.Address `json:"coinbase"` - closed bool `json:"closed"` - lastL2BlockTimestamp uint64 `json:"lastL2BlockTimestamp"` - l1InfoTreeIndex uint32 `json:"l1InfoTreeIndex"` -} - -func New(batchNumber uint64, blockHashes []string, batchL2Data []byte, globalExitRoot common.Hash, - coinbase common.Address, closed bool) (*RPCBatch, error) { + batchNumber uint64 + accInputHash common.Hash + blockHashes []string + batchL2Data []byte + globalExitRoot common.Hash + localExitRoot common.Hash + stateRoot common.Hash + coinbase common.Address + closed bool + lastL2BlockTimestamp uint64 + l1InfoTreeIndex uint32 +} + +func NewRPCBatch(batchNumber uint64, accInputHash common.Hash, blockHashes []string, batchL2Data []byte, + globalExitRoot common.Hash, localExitRoot common.Hash, stateRoot common.Hash, + coinbase common.Address, closed bool) *RPCBatch { return &RPCBatch{ batchNumber: batchNumber, + accInputHash: accInputHash, blockHashes: blockHashes, batchL2Data: batchL2Data, globalExitRoot: globalExitRoot, + localExitRoot: localExitRoot, + stateRoot: stateRoot, coinbase: coinbase, closed: closed, - }, nil + } } // DeepCopy func (b *RPCBatch) DeepCopy() seqsendertypes.Batch { return &RPCBatch{ + accInputHash: b.accInputHash, batchNumber: b.batchNumber, blockHashes: b.blockHashes, batchL2Data: b.batchL2Data, globalExitRoot: b.globalExitRoot, + localExitRoot: b.localExitRoot, + stateRoot: b.stateRoot, coinbase: b.coinbase, closed: b.closed, lastL2BlockTimestamp: b.lastL2BlockTimestamp, @@ -84,6 +94,21 @@ func (b *RPCBatch) GlobalExitRoot() common.Hash { return b.globalExitRoot } +// LocalExitRoot +func (b *RPCBatch) LocalExitRoot() common.Hash { + return b.localExitRoot +} + +// StateRoot +func (b *RPCBatch) StateRoot() common.Hash { + return b.stateRoot +} + +// AccInputHash +func (b *RPCBatch) AccInputHash() common.Hash { + return b.accInputHash +} + // L1InfoTreeIndex func (b *RPCBatch) L1InfoTreeIndex() uint32 { return b.l1InfoTreeIndex diff --git a/scripts/local_config b/scripts/local_config index 9a1f55cf..6922f15e 100755 --- a/scripts/local_config +++ b/scripts/local_config @@ -313,7 +313,7 @@ echo " " echo "- Add next configuration to vscode launch.json" cat << EOF { - "name": "Debug cdk"", + "name": "Debug cdk", "type": "go", "request": "launch", "mode": "auto", diff --git a/sequencesender.json b/sequencesender.json deleted file mode 100644 index 0967ef42..00000000 --- a/sequencesender.json +++ /dev/null @@ -1 +0,0 @@ -{} diff --git a/sequencesender/config.go b/sequencesender/config.go index f264f904..4f77500b 100644 --- a/sequencesender/config.go +++ b/sequencesender/config.go @@ -71,11 +71,3 @@ type Config struct { // GetBatchWaitInterval is the time to wait to query for a new batch when there are no more batches available GetBatchWaitInterval types.Duration `mapstructure:"GetBatchWaitInterval"` } - -// StreamClientCfg contains the data streamer's configuration properties -type StreamClientCfg struct { - // Datastream server to connect - Server string `mapstructure:"Server"` - // Log is the log configuration - Log log.Config `mapstructure:"Log"` -} diff --git a/sequencesender/mocks/mock_etherman.go b/sequencesender/mocks/mock_etherman.go index 46a70170..539ef9a7 100644 --- a/sequencesender/mocks/mock_etherman.go +++ b/sequencesender/mocks/mock_etherman.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.1. DO NOT EDIT. +// Code generated by mockery v2.39.0. DO NOT EDIT. package mocks diff --git a/sequencesender/mocks/mock_ethtxmanager.go b/sequencesender/mocks/mock_ethtxmanager.go index f3b456a4..16de5be6 100644 --- a/sequencesender/mocks/mock_ethtxmanager.go +++ b/sequencesender/mocks/mock_ethtxmanager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.1. DO NOT EDIT. +// Code generated by mockery v2.39.0. DO NOT EDIT. package mocks @@ -6,13 +6,13 @@ import ( context "context" big "math/big" - ethtxtypes "github.com/0xPolygon/zkevm-ethtx-manager/types" - common "github.com/ethereum/go-ethereum/common" mock "github.com/stretchr/testify/mock" types "github.com/ethereum/go-ethereum/core/types" + + zkevm_ethtx_managertypes "github.com/0xPolygon/zkevm-ethtx-manager/types" ) // EthTxManagerMock is an autogenerated mock type for the EthTxManager type @@ -69,22 +69,22 @@ func (_m *EthTxManagerMock) Remove(ctx context.Context, hash common.Hash) error } // Result provides a mock function with given fields: ctx, hash -func (_m *EthTxManagerMock) Result(ctx context.Context, hash common.Hash) (ethtxtypes.MonitoredTxResult, error) { +func (_m *EthTxManagerMock) Result(ctx context.Context, hash common.Hash) (zkevm_ethtx_managertypes.MonitoredTxResult, error) { ret := _m.Called(ctx, hash) if len(ret) == 0 { panic("no return value specified for Result") } - var r0 ethtxtypes.MonitoredTxResult + var r0 zkevm_ethtx_managertypes.MonitoredTxResult var r1 error - if rf, ok := ret.Get(0).(func(context.Context, common.Hash) (ethtxtypes.MonitoredTxResult, error)); ok { + if rf, ok := ret.Get(0).(func(context.Context, common.Hash) (zkevm_ethtx_managertypes.MonitoredTxResult, error)); ok { return rf(ctx, hash) } - if rf, ok := ret.Get(0).(func(context.Context, common.Hash) ethtxtypes.MonitoredTxResult); ok { + if rf, ok := ret.Get(0).(func(context.Context, common.Hash) zkevm_ethtx_managertypes.MonitoredTxResult); ok { r0 = rf(ctx, hash) } else { - r0 = ret.Get(0).(ethtxtypes.MonitoredTxResult) + r0 = ret.Get(0).(zkevm_ethtx_managertypes.MonitoredTxResult) } if rf, ok := ret.Get(1).(func(context.Context, common.Hash) error); ok { @@ -97,27 +97,27 @@ func (_m *EthTxManagerMock) Result(ctx context.Context, hash common.Hash) (ethtx } // ResultsByStatus provides a mock function with given fields: ctx, status -func (_m *EthTxManagerMock) ResultsByStatus(ctx context.Context, status []ethtxtypes.MonitoredTxStatus) ([]ethtxtypes.MonitoredTxResult, error) { +func (_m *EthTxManagerMock) ResultsByStatus(ctx context.Context, status []zkevm_ethtx_managertypes.MonitoredTxStatus) ([]zkevm_ethtx_managertypes.MonitoredTxResult, error) { ret := _m.Called(ctx, status) if len(ret) == 0 { panic("no return value specified for ResultsByStatus") } - var r0 []ethtxtypes.MonitoredTxResult + var r0 []zkevm_ethtx_managertypes.MonitoredTxResult var r1 error - if rf, ok := ret.Get(0).(func(context.Context, []ethtxtypes.MonitoredTxStatus) ([]ethtxtypes.MonitoredTxResult, error)); ok { + if rf, ok := ret.Get(0).(func(context.Context, []zkevm_ethtx_managertypes.MonitoredTxStatus) ([]zkevm_ethtx_managertypes.MonitoredTxResult, error)); ok { return rf(ctx, status) } - if rf, ok := ret.Get(0).(func(context.Context, []ethtxtypes.MonitoredTxStatus) []ethtxtypes.MonitoredTxResult); ok { + if rf, ok := ret.Get(0).(func(context.Context, []zkevm_ethtx_managertypes.MonitoredTxStatus) []zkevm_ethtx_managertypes.MonitoredTxResult); ok { r0 = rf(ctx, status) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).([]ethtxtypes.MonitoredTxResult) + r0 = ret.Get(0).([]zkevm_ethtx_managertypes.MonitoredTxResult) } } - if rf, ok := ret.Get(1).(func(context.Context, []ethtxtypes.MonitoredTxStatus) error); ok { + if rf, ok := ret.Get(1).(func(context.Context, []zkevm_ethtx_managertypes.MonitoredTxStatus) error); ok { r1 = rf(ctx, status) } else { r1 = ret.Error(1) diff --git a/sequencesender/mocks/mock_rpc.go b/sequencesender/mocks/mock_rpc.go new file mode 100644 index 00000000..e06378a6 --- /dev/null +++ b/sequencesender/mocks/mock_rpc.go @@ -0,0 +1,88 @@ +// Code generated by mockery v2.39.0. DO NOT EDIT. + +package mocks + +import ( + mock "github.com/stretchr/testify/mock" + + types "github.com/0xPolygon/cdk/rpc/types" +) + +// RPCInterfaceMock is an autogenerated mock type for the RPCInterface type +type RPCInterfaceMock struct { + mock.Mock +} + +// GetBatch provides a mock function with given fields: batchNumber +func (_m *RPCInterfaceMock) GetBatch(batchNumber uint64) (*types.RPCBatch, error) { + ret := _m.Called(batchNumber) + + if len(ret) == 0 { + panic("no return value specified for GetBatch") + } + + var r0 *types.RPCBatch + var r1 error + if rf, ok := ret.Get(0).(func(uint64) (*types.RPCBatch, error)); ok { + return rf(batchNumber) + } + if rf, ok := ret.Get(0).(func(uint64) *types.RPCBatch); ok { + r0 = rf(batchNumber) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.RPCBatch) + } + } + + if rf, ok := ret.Get(1).(func(uint64) error); ok { + r1 = rf(batchNumber) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetWitness provides a mock function with given fields: batchNumber, fullWitness +func (_m *RPCInterfaceMock) GetWitness(batchNumber uint64, fullWitness bool) ([]byte, error) { + ret := _m.Called(batchNumber, fullWitness) + + if len(ret) == 0 { + panic("no return value specified for GetWitness") + } + + var r0 []byte + var r1 error + if rf, ok := ret.Get(0).(func(uint64, bool) ([]byte, error)); ok { + return rf(batchNumber, fullWitness) + } + if rf, ok := ret.Get(0).(func(uint64, bool) []byte); ok { + r0 = rf(batchNumber, fullWitness) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]byte) + } + } + + if rf, ok := ret.Get(1).(func(uint64, bool) error); ok { + r1 = rf(batchNumber, fullWitness) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// NewRPCInterfaceMock creates a new instance of RPCInterfaceMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewRPCInterfaceMock(t interface { + mock.TestingT + Cleanup(func()) +}) *RPCInterfaceMock { + mock := &RPCInterfaceMock{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/sequencesender/rpc_test.go b/sequencesender/rpc_test.go deleted file mode 100644 index 4774b237..00000000 --- a/sequencesender/rpc_test.go +++ /dev/null @@ -1,115 +0,0 @@ -package sequencesender - -import ( - "encoding/json" - "errors" - "net/http" - "net/http/httptest" - "testing" - - "github.com/0xPolygon/cdk-rpc/rpc" - "github.com/ethereum/go-ethereum/common" - "github.com/stretchr/testify/require" -) - -func Test_getBatchFromRPC(t *testing.T) { - t.Parallel() - - tests := []struct { - name string - batch uint64 - getBatchByNumberResp string - getBlockByHasResp string - getBatchByNumberErr error - getBlockByHashErr error - expectBlocks int - expectData []byte - expectTimestamp uint64 - expectErr error - }{ - { - name: "successfully fetched", - getBatchByNumberResp: `{"jsonrpc":"2.0","id":1,"result":{"blocks":["1", "2", "3"],"batchL2Data":"0x1234567"}}`, - getBlockByHasResp: `{"jsonrpc":"2.0","id":1,"result":{"timestamp":"0x123456"}}`, - batch: 0, - expectBlocks: 3, - expectData: common.FromHex("0x1234567"), - expectTimestamp: 1193046, - expectErr: nil, - }, - { - name: "invalid json", - getBatchByNumberResp: `{"jsonrpc":"2.0","id":1,"result":{"blocks":invalid,"batchL2Data":"test"}}`, - batch: 0, - expectBlocks: 3, - expectData: nil, - expectErr: errors.New("invalid character 'i' looking for beginning of value"), - }, - { - name: "wrong json", - getBatchByNumberResp: `{"jsonrpc":"2.0","id":1,"result":{"blocks":"invalid","batchL2Data":"test"}}`, - batch: 0, - expectBlocks: 3, - expectData: nil, - expectErr: errors.New("error unmarshalling the batch from the response calling zkevm_getBatchByNumber: json: cannot unmarshal string into Go struct field zkEVMBatch.blocks of type []string"), - }, - { - name: "error in the response", - getBatchByNumberResp: `{"jsonrpc":"2.0","id":1,"result":null,"error":{"code":-32602,"message":"Invalid params"}}`, - batch: 0, - expectBlocks: 0, - expectData: nil, - expectErr: errors.New("error in the response calling zkevm_getBatchByNumber: &{-32602 Invalid params }"), - }, - { - name: "http failed", - getBatchByNumberErr: errors.New("failed to fetch"), - batch: 0, - expectBlocks: 0, - expectData: nil, - expectErr: errors.New("invalid status code, expected: 200, found: 500"), - }, - } - - for _, tt := range tests { - tt := tt - - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - - srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - var req rpc.Request - err := json.NewDecoder(r.Body).Decode(&req) - require.NoError(t, err) - - switch req.Method { - case "zkevm_getBatchByNumber": - if tt.getBatchByNumberErr != nil { - http.Error(w, tt.getBatchByNumberErr.Error(), http.StatusInternalServerError) - return - } - - _, _ = w.Write([]byte(tt.getBatchByNumberResp)) - case "eth_getBlockByHash": - if tt.getBlockByHashErr != nil { - http.Error(w, tt.getBlockByHashErr.Error(), http.StatusInternalServerError) - return - } - _, _ = w.Write([]byte(tt.getBlockByHasResp)) - default: - http.Error(w, "method not found", http.StatusNotFound) - } - })) - defer srv.Close() - - rpcBatch, err := getBatchFromRPC(srv.URL, tt.batch) - if tt.expectErr != nil { - require.Equal(t, tt.expectErr.Error(), err.Error()) - } else { - require.NoError(t, err) - require.Equal(t, tt.expectTimestamp, rpcBatch.LastL2BLockTimestamp()) - require.Equal(t, tt.expectData, rpcBatch.L2Data()) - } - }) - } -} diff --git a/sequencesender/sequencesender.go b/sequencesender/sequencesender.go index 0a044356..432b3777 100644 --- a/sequencesender/sequencesender.go +++ b/sequencesender/sequencesender.go @@ -12,15 +12,16 @@ import ( "github.com/0xPolygon/cdk/etherman" "github.com/0xPolygon/cdk/log" + "github.com/0xPolygon/cdk/rpc" + "github.com/0xPolygon/cdk/rpc/types" "github.com/0xPolygon/cdk/sequencesender/seqsendertypes" - "github.com/0xPolygon/cdk/sequencesender/seqsendertypes/rpcbatch" "github.com/0xPolygon/cdk/sequencesender/txbuilder" "github.com/0xPolygon/cdk/state" "github.com/0xPolygon/zkevm-ethtx-manager/ethtxmanager" ethtxlog "github.com/0xPolygon/zkevm-ethtx-manager/log" ethtxtypes "github.com/0xPolygon/zkevm-ethtx-manager/types" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" + ethtypes "github.com/ethereum/go-ethereum/core/types" ) const ten = 10 @@ -34,7 +35,7 @@ type EthTxManager interface { value *big.Int, data []byte, gasOffset uint64, - sidecar *types.BlobTxSidecar, + sidecar *ethtypes.BlobTxSidecar, gas uint64, ) (common.Hash, error) Remove(ctx context.Context, hash common.Hash) error @@ -45,11 +46,17 @@ type EthTxManager interface { // Etherman represents the etherman behaviour type Etherman interface { CurrentNonce(ctx context.Context, address common.Address) (uint64, error) - GetLatestBlockHeader(ctx context.Context) (*types.Header, error) + GetLatestBlockHeader(ctx context.Context) (*ethtypes.Header, error) EstimateGas(ctx context.Context, from common.Address, to *common.Address, value *big.Int, data []byte) (uint64, error) GetLatestBatchNumber() (uint64, error) } +// RPCInterface represents the RPC interface +type RPCInterface interface { + GetBatch(batchNumber uint64) (*types.RPCBatch, error) + GetWitness(batchNumber uint64, fullWitness bool) ([]byte, error) +} + // SequenceSender represents a sequence sender type SequenceSender struct { cfg Config @@ -69,6 +76,7 @@ type SequenceSender struct { validStream bool // Not valid while receiving data before the desired batch seqSendingStopped uint32 // If there is a critical error TxBuilder txbuilder.TxBuilder + rpcClient RPCInterface } type sequenceData struct { @@ -90,6 +98,7 @@ func New(cfg Config, logger *log.Logger, sequenceData: make(map[uint64]*sequenceData), validStream: false, TxBuilder: txBuilder, + rpcClient: rpc.NewBatchEndpoints(cfg.RPCURL), } logger.Infof("TxBuilder configuration: %s", txBuilder.String()) @@ -162,7 +171,7 @@ func (s *SequenceSender) batchRetrieval(ctx context.Context) error { return ctx.Err() default: // Try to retrieve batch from RPC - rpcBatch, err := getBatchFromRPC(s.cfg.RPCURL, currentBatchNumber) + rpcBatch, err := s.rpcClient.GetBatch(currentBatchNumber) if err != nil { if errors.Is(err, ethtxmanager.ErrNotFound) { s.logger.Infof("batch %d not found in RPC", currentBatchNumber) @@ -191,7 +200,7 @@ func (s *SequenceSender) batchRetrieval(ctx context.Context) error { } } -func (s *SequenceSender) populateSequenceData(rpcBatch *rpcbatch.RPCBatch, batchNumber uint64) error { +func (s *SequenceSender) populateSequenceData(rpcBatch *types.RPCBatch, batchNumber uint64) error { s.mutexSequence.Lock() defer s.mutexSequence.Unlock() diff --git a/sequencesender/sequencesender_test.go b/sequencesender/sequencesender_test.go index 3db4a803..e1d694e5 100644 --- a/sequencesender/sequencesender_test.go +++ b/sequencesender/sequencesender_test.go @@ -10,6 +10,7 @@ import ( types2 "github.com/0xPolygon/cdk/config/types" "github.com/0xPolygon/cdk/etherman" "github.com/0xPolygon/cdk/log" + rpctypes "github.com/0xPolygon/cdk/rpc/types" "github.com/0xPolygon/cdk/sequencesender/mocks" "github.com/0xPolygon/cdk/sequencesender/seqsendertypes" "github.com/0xPolygon/cdk/sequencesender/txbuilder" @@ -98,6 +99,7 @@ func Test_Start(t *testing.T) { name string getEthTxManager func(t *testing.T) *mocks.EthTxManagerMock getEtherman func(t *testing.T) *mocks.EthermanMock + getRPC func(t *testing.T) *mocks.RPCInterfaceMock batchWaitDuration types2.Duration expectNonce uint64 expectLastVirtualBatch uint64 @@ -122,6 +124,14 @@ func Test_Start(t *testing.T) { mngr.On("ResultsByStatus", mock.Anything, []ethtxtypes.MonitoredTxStatus(nil)).Return(nil, nil) return mngr }, + getRPC: func(t *testing.T) *mocks.RPCInterfaceMock { + t.Helper() + + mngr := mocks.NewRPCInterfaceMock(t) + mngr.On("GetBatch", mock.Anything).Return(&rpctypes.RPCBatch{}, nil) + return mngr + }, + batchWaitDuration: types2.NewDuration(time.Millisecond), expectNonce: 3, expectLastVirtualBatch: 1, @@ -149,7 +159,8 @@ func Test_Start(t *testing.T) { GetBatchWaitInterval: tt.batchWaitDuration, WaitPeriodSendSequence: types2.NewDuration(1 * time.Millisecond), }, - logger: log.GetDefaultLogger(), + logger: log.GetDefaultLogger(), + rpcClient: tt.getRPC(t), } ctx, cancel := context.WithCancel(context.Background()) diff --git a/state/datastream.go b/state/datastream.go deleted file mode 100644 index 7687dba7..00000000 --- a/state/datastream.go +++ /dev/null @@ -1,12 +0,0 @@ -package state - -import ( - "github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer" -) - -const ( - // StreamTypeSequencer represents a Sequencer stream - StreamTypeSequencer datastreamer.StreamType = 1 - // EntryTypeBookMark represents a bookmark entry - EntryTypeBookMark datastreamer.EntryType = datastreamer.EtBookmark -) diff --git a/state/interfaces.go b/state/interfaces.go index ce825685..fc4eb495 100644 --- a/state/interfaces.go +++ b/state/interfaces.go @@ -23,8 +23,4 @@ type storage interface { CleanupGeneratedProofs(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) error CleanupLockedProofs(ctx context.Context, duration string, dbTx pgx.Tx) (int64, error) CheckProofExistsForBatch(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (bool, error) - AddBatch(ctx context.Context, dbBatch *DBBatch, dbTx pgx.Tx) error - GetBatch(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (*DBBatch, error) - DeleteBatchesOlderThanBatchNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) error - DeleteBatchesNewerThanBatchNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) error } diff --git a/state/pgstatestorage/batch.go b/state/pgstatestorage/batch.go deleted file mode 100644 index 6273f064..00000000 --- a/state/pgstatestorage/batch.go +++ /dev/null @@ -1,66 +0,0 @@ -package pgstatestorage - -import ( - "context" - "errors" - - "github.com/0xPolygon/cdk/state" - "github.com/ethereum/go-ethereum/common" - "github.com/jackc/pgx/v4" -) - -// AddBatch stores a batch -func (p *PostgresStorage) AddBatch(ctx context.Context, dbBatch *state.DBBatch, dbTx pgx.Tx) error { - const addInputHashSQL = ` - INSERT INTO aggregator.batch (batch_num, batch, datastream, witness) - VALUES ($1, $2, $3, $4) - ON CONFLICT (batch_num) DO UPDATE - SET batch = $2, datastream = $3, witness = $4 - ` - e := p.getExecQuerier(dbTx) - _, err := e.Exec( - ctx, addInputHashSQL, dbBatch.Batch.BatchNumber, &dbBatch.Batch, - common.Bytes2Hex(dbBatch.Datastream), common.Bytes2Hex(dbBatch.Witness), - ) - return err -} - -// GetBatch gets a batch by a given batch number -func (p *PostgresStorage) GetBatch(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (*state.DBBatch, error) { - const getInputHashSQL = "SELECT batch, datastream, witness FROM aggregator.batch WHERE batch_num = $1" - e := p.getExecQuerier(dbTx) - var batch state.Batch - var streamStr string - var witnessStr string - err := e.QueryRow(ctx, getInputHashSQL, batchNumber).Scan(&batch, &streamStr, &witnessStr) - if errors.Is(err, pgx.ErrNoRows) { - return nil, state.ErrNotFound - } else if err != nil { - return nil, err - } - return &state.DBBatch{ - Batch: batch, - Datastream: common.Hex2Bytes(streamStr), - Witness: common.Hex2Bytes(witnessStr), - }, nil -} - -// DeleteBatchesOlderThanBatchNumber deletes batches previous to the given batch number -func (p *PostgresStorage) DeleteBatchesOlderThanBatchNumber( - ctx context.Context, batchNumber uint64, dbTx pgx.Tx, -) error { - const deleteBatchesSQL = "DELETE FROM aggregator.batch WHERE batch_num < $1" - e := p.getExecQuerier(dbTx) - _, err := e.Exec(ctx, deleteBatchesSQL, batchNumber) - return err -} - -// DeleteBatchesNewerThanBatchNumber deletes batches previous to the given batch number -func (p *PostgresStorage) DeleteBatchesNewerThanBatchNumber( - ctx context.Context, batchNumber uint64, dbTx pgx.Tx, -) error { - const deleteBatchesSQL = "DELETE FROM aggregator.batch WHERE batch_num > $1" - e := p.getExecQuerier(dbTx) - _, err := e.Exec(ctx, deleteBatchesSQL, batchNumber) - return err -} diff --git a/test/Makefile b/test/Makefile index a1b51bb1..d173c423 100644 --- a/test/Makefile +++ b/test/Makefile @@ -17,9 +17,9 @@ COMMON_MOCKERY_PARAMS=--disable-version-string --with-expecter --exported generate-mocks-sequencesender: ## Generates mocks for sequencesender, using mockery tool rm -Rf ../sequencesender/txbuilder/mocks_txbuilder export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --all --case snake --dir ../sequencesender/txbuilder --output ../sequencesender/txbuilder/mocks_txbuilder --outpkg mocks_txbuilder ${COMMON_MOCKERY_PARAMS} - export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=EthTxManager --dir=../sequencesender --output=../sequencesender/mocks --outpkg=mocks --structname=EthTxMngrMock --filename=mock_ethtxmanager.go + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=EthTxManager --dir=../sequencesender --output=../sequencesender/mocks --outpkg=mocks --structname=EthTxManagerMock --filename=mock_ethtxmanager.go export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=Etherman --dir=../sequencesender --output=../sequencesender/mocks --outpkg=mocks --structname=EthermanMock --filename=mock_etherman.go - export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=StreamClient --dir=../sequencesender --output=../sequencesender/mocks --outpkg=mocks --structname=StreamClientMock --filename=mock_streamclient.go + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=RPCInterface --dir=../sequencesender --output=../sequencesender/mocks --outpkg=mocks --structname=RPCInterfaceMock --filename=mock_rpc.go .PHONY: generate-mocks-da generate-mocks-da: ## Generates mocks for dataavailability, using mockery tool @@ -55,9 +55,10 @@ generate-mocks-aggregator: ## Generates mocks for aggregator, using mockery tool export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=StateInterface --dir=../aggregator --output=../aggregator/mocks --outpkg=mocks --structname=StateInterfaceMock --filename=mock_state.go export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=AgglayerClientInterface --dir=../aggregator/agglayer --output=../aggregator/mocks --outpkg=mocks --structname=AgglayerClientInterfaceMock --filename=mock_agglayer_client.go export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=Synchronizer --srcpkg=github.com/0xPolygonHermez/zkevm-synchronizer-l1/synchronizer --output=../aggregator/mocks --outpkg=mocks --structname=SynchronizerInterfaceMock --filename=mock_synchronizer.go - export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=StreamClient --dir=../aggregator --output=../aggregator/mocks --outpkg=mocks --structname=StreamClientMock --filename=mock_streamclient.go export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=EthTxManagerClient --dir=../aggregator --output=../aggregator/mocks --outpkg=mocks --structname=EthTxManagerClientMock --filename=mock_eth_tx_manager.go export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=Tx --srcpkg=github.com/jackc/pgx/v4 --output=../aggregator/mocks --outpkg=mocks --structname=DbTxMock --filename=mock_dbtx.go + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=RPCInterface --dir=../aggregator --output=../aggregator/mocks --outpkg=mocks --structname=RPCInterfaceMock --filename=mock_rpc.go + .PHONY: test-e2e-fork9-validium test-e2e-fork9-validium: stop diff --git a/test/config/kurtosis-cdk-node-config.toml.template b/test/config/kurtosis-cdk-node-config.toml.template index 8fd9e82b..fa01b528 100644 --- a/test/config/kurtosis-cdk-node-config.toml.template +++ b/test/config/kurtosis-cdk-node-config.toml.template @@ -22,10 +22,9 @@ SenderProofToL1Addr = "{{.zkevm_l2_agglayer_address}}" - +RPCURL = "http://{{.l2_rpc_name}}{{.deployment_suffix}}:{{.zkevm_rpc_http_port}}" WitnessURL = "http://{{.l2_rpc_name}}{{.deployment_suffix}}:{{.zkevm_rpc_http_port}}" AggLayerURL = "http://agglayer:{{.agglayer_port}}" -StreamServer = "{{.sequencer_name}}{{.deployment_suffix}}:{{.zkevm_data_streamer_port}}" diff --git a/test/config/test.config.toml b/test/config/test.config.toml index 61fd4401..94940469 100644 --- a/test/config/test.config.toml +++ b/test/config/test.config.toml @@ -15,8 +15,6 @@ WaitPeriodPurgeTxFile = "60m" MaxPendingTx = 1 RPCURL = "http://127.0.0.1:8123" GetBatchWaitInterval = "10s" - [SequenceSender.StreamClient] - Server = "127.0.0.1:6900" [SequenceSender.EthTxManager] FrequencyToMonitorTxs = "1s" WaitTxToBeMined = "2m" @@ -52,12 +50,11 @@ GeneratingProofCleanupThreshold = "10m" BatchProofSanityCheckEnabled = true ForkId = 9 GasOffset = 0 -WitnessURL = "http://zkevm-erigon-seq:8123" -UseL1BatchData = true +RPCURL = "http://127.0.0.1:8123" +WitnessURL = "http://127.0.0.1:8123" SettlementBackend = "l1" AggLayerTxTimeout = "5m" AggLayerURL = "" -MaxWitnessRetrievalWorkers = 2 SyncModeOnlyEnabled = false UseFullWitness = false SequencerPrivateKey = {} @@ -73,8 +70,6 @@ SequencerPrivateKey = {} Environment = "development" # "production" or "development" Level = "info" Outputs = ["stderr"] - [Aggregator.StreamClient] - Server = "zkevm-erigon-seq:6900" [Aggregator.EthTxManager] FrequencyToMonitorTxs = "1s" WaitTxToBeMined = "2m"