From 790e4aa43596403854e8ad36bb00ea89767c4c95 Mon Sep 17 00:00:00 2001 From: Francisco Moura Date: Mon, 2 Sep 2024 21:32:12 -0300 Subject: [PATCH] fixup! feat(evm-reader): Read claim acceptance --- .../evmreader/{claimreader.go => claim.go} | 32 +- .../{claimreader_test.go => claim_test.go} | 59 ++- internal/evmreader/evmreader.go | 364 ++------------ internal/evmreader/evmreader_test.go | 403 +-------------- internal/evmreader/input.go | 307 ++++++++++++ internal/evmreader/input_test.go | 461 ++++++++++++++++++ internal/evmreader/inputreader.go | 4 - 7 files changed, 893 insertions(+), 737 deletions(-) rename internal/evmreader/{claimreader.go => claim.go} (90%) rename internal/evmreader/{claimreader_test.go => claim_test.go} (84%) create mode 100644 internal/evmreader/input.go create mode 100644 internal/evmreader/input_test.go delete mode 100644 internal/evmreader/inputreader.go diff --git a/internal/evmreader/claimreader.go b/internal/evmreader/claim.go similarity index 90% rename from internal/evmreader/claimreader.go rename to internal/evmreader/claim.go index 96ebc2354..7a24f1df8 100644 --- a/internal/evmreader/claimreader.go +++ b/internal/evmreader/claim.go @@ -16,7 +16,7 @@ import ( func (r *EvmReader) checkForClaimStatus( ctx context.Context, - apps []Application, + apps []application, mostRecentBlockNumber uint64, ) { @@ -66,7 +66,7 @@ func (r *EvmReader) checkForClaimStatus( func (r *EvmReader) readAndUpdateClaims( ctx context.Context, - apps []Application, + apps []application, lastClaimCheck, mostRecentBlockNumber uint64, ) { @@ -80,14 +80,12 @@ func (r *EvmReader) readAndUpdateClaims( appAddresses := appToAddresses(apps) - consensusContract, err := r.contractFactory.NewIConsensus(iConsensusAddress) - if err != nil { - slog.Error("Error instantiating IConsensus", - "apps", apps, - "IConsensus", iConsensusAddress, - "error", err) + // All these apps shares the same IConsensus + // So we can grab the first one + if len(apps) == 0 { continue } + consensusContract := apps[0].consensusContract // Retrieve Claim Acceptance Events from blockchain appClaimAcceptanceEventMap, err := r.readClaimAcceptanceFromBlockchain( @@ -204,27 +202,13 @@ func (r *EvmReader) readClaimAcceptanceFromBlockchain( return appClaimAcceptanceMap, nil } -// Index applications given a key extractor function -func indexApps[K comparable]( - keyExtractor func(Application) K, - apps []Application, -) map[K][]Application { - - result := make(map[K][]Application) - for _, item := range apps { - key := keyExtractor(item) - result[key] = append(result[key], item) - } - return result -} - // LastClaimCheck key extractor function intended to be used with `indexApps` function -func keyByLastClaimCheck(app Application) uint64 { +func keyByLastClaimCheck(app application) uint64 { return app.LastClaimCheckBlock } // IConsensus address key extractor function intended to be used with `indexApps` function -func keyByIConsensus(app Application) Address { +func keyByIConsensus(app application) Address { return app.IConsensusAddress } diff --git a/internal/evmreader/claimreader_test.go b/internal/evmreader/claim_test.go similarity index 84% rename from internal/evmreader/claimreader_test.go rename to internal/evmreader/claim_test.go index e420400d5..fac6728e5 100644 --- a/internal/evmreader/claimreader_test.go +++ b/internal/evmreader/claim_test.go @@ -48,6 +48,53 @@ func (s *EvmReaderSuite) TestNoClaimsAcceptance() { LastClaimCheckBlock: 0x11, }}, nil).Once() + s.repository.Unset("StoreClaimsTransaction") + s.repository.On("StoreClaimsTransaction", + mock.Anything, + mock.Anything, + mock.Anything, + ).Once().Run(func(arguments mock.Arguments) { + obj := arguments.Get(1) + claims, ok := obj.([]*Epoch) + s.Require().True(ok) + s.Require().Equal(0, len(claims)) + + }).Return(nil) + + s.repository.Unset("StoreClaimsTransaction") + s.repository.On("StoreClaimsTransaction", + mock.Anything, + mock.Anything, + mock.Anything, + ).Once().Run(func(arguments mock.Arguments) { + obj := arguments.Get(1) + claims, ok := obj.([]*Epoch) + s.Require().True(ok) + s.Require().Equal(0, len(claims)) + + obj = arguments.Get(2) + lastClaimCheck, ok := obj.(uint64) + s.Require().True(ok) + s.Require().Equal(uint64(17), lastClaimCheck) + + }).Return(nil) + s.repository.On("StoreClaimsTransaction", + mock.Anything, + mock.Anything, + mock.Anything, + ).Once().Run(func(arguments mock.Arguments) { + obj := arguments.Get(1) + claims, ok := obj.([]*Epoch) + s.Require().True(ok) + s.Require().Equal(0, len(claims)) + + obj = arguments.Get(2) + lastClaimCheck, ok := obj.(uint64) + s.Require().True(ok) + s.Require().Equal(uint64(18), lastClaimCheck) + + }).Return(nil) + //No Inputs s.inputBox.Unset("RetrieveInputs") s.inputBox.On("RetrieveInputs", @@ -101,7 +148,7 @@ func (s *EvmReaderSuite) TestNoClaimsAcceptance() { } -func (s *EvmReaderSuite) TestReadClaimsAcceptance() { +func (s *EvmReaderSuite) TestReadClaimAcceptance() { appAddress := common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E") @@ -240,16 +287,6 @@ func (s *EvmReaderSuite) TestReadClaimsAcceptance() { mock.Anything, mock.Anything, ).Return(&header0, nil).Once() - s.client.On( - "HeaderByNumber", - mock.Anything, - mock.Anything, - ).Return(&header1, nil).Once() - s.client.On( - "HeaderByNumber", - mock.Anything, - mock.Anything, - ).Return(&header2, nil).Once() // Start service ready := make(chan struct{}, 1) diff --git a/internal/evmreader/evmreader.go b/internal/evmreader/evmreader.go index 08412e99a..77ef3c06a 100644 --- a/internal/evmreader/evmreader.go +++ b/internal/evmreader/evmreader.go @@ -83,6 +83,12 @@ func (e *SubscriptionError) Error() string { return fmt.Sprintf("Subscription error : %v", e.Cause) } +type application struct { + Application + applicationContract ApplicationContract + consensusContract ConsensusContract +} + // EvmReader reads inputs from the blockchain type EvmReader struct { client EthClient @@ -161,7 +167,7 @@ func (r *EvmReader) watchForNewBlocks(ctx context.Context, ready chan<- struct{} // Every time a new block arrives // Get All Applications - apps, err := r.repository.GetAllRunningApplications(ctx) + runningApps, err := r.repository.GetAllRunningApplications(ctx) if err != nil { slog.Error("Error retrieving running applications for new inputs", "error", @@ -170,8 +176,21 @@ func (r *EvmReader) watchForNewBlocks(ctx context.Context, ready chan<- struct{} continue } + // Build Contracts + var apps []application + for _, app := range runningApps { + applicationContract, consensusContract, err := r.getAppContracts(app) + if err != nil { + slog.Error("Error retrieving application contracts", "app", app, "error", err) + continue + } + apps = append(apps, application{Application: app, + applicationContract: applicationContract, + consensusContract: consensusContract}) + } + if len(apps) == 0 { - slog.Info("No running applications") + slog.Info("No running consistent applications") continue } @@ -195,78 +214,6 @@ func (r *EvmReader) watchForNewBlocks(ctx context.Context, ready chan<- struct{} } } -// Check if is there new Inputs for all running Applications -func (r *EvmReader) checkForNewInputs( - ctx context.Context, - apps []Application, - mostRecentBlockNumber uint64, -) { - - slog.Debug("Checking for new inputs") - - groupedApps := r.classifyApplicationsByLastProcessedInput(apps) - - for lastProcessedBlock, apps := range groupedApps { - - appAddresses := appToAddresses(apps) - - // Safeguard: Only check blocks starting from the block where the InputBox - // contract was deployed as Inputs can be added to that same block - if lastProcessedBlock < r.inputBoxDeploymentBlock { - lastProcessedBlock = r.inputBoxDeploymentBlock - 1 - } - - if mostRecentBlockNumber > lastProcessedBlock { - - slog.Info("Checking inputs for applications", - "apps", appAddresses, - "last processed block", lastProcessedBlock, - "most recent block", mostRecentBlockNumber, - ) - - err := r.readAndStoreInputs(ctx, - lastProcessedBlock+1, - mostRecentBlockNumber, - apps, - ) - if err != nil { - slog.Error("Error reading inputs", - "apps", appAddresses, - "last processed block", lastProcessedBlock, - "most recent block", mostRecentBlockNumber, - "error", err, - ) - continue - } - } else if mostRecentBlockNumber < lastProcessedBlock { - slog.Warn( - "Not reading inputs: most recent block is lower than the last processed one", - "apps", appAddresses, - "last processed block", lastProcessedBlock, - "most recent block", mostRecentBlockNumber, - ) - } else { - slog.Info("Not reading inputs: already checked the most recent blocks", - "apps", appAddresses, - "last processed block", lastProcessedBlock, - "most recent block", mostRecentBlockNumber, - ) - } - } -} - -// Group Applications that have processed til the same block height -func (r *EvmReader) classifyApplicationsByLastProcessedInput( - apps []Application, -) map[uint64][]Application { - result := make(map[uint64][]Application) - for _, app := range apps { - result[app.LastProcessedBlock] = append(result[app.LastProcessedBlock], app) - } - - return result -} - // Fetch the most recent header up till the // given default block func (r *EvmReader) fetchMostRecentHeader( @@ -302,202 +249,13 @@ func (r *EvmReader) fetchMostRecentHeader( return header, nil } -// Read and store inputs from the InputSource given specific filter options. -func (r *EvmReader) readAndStoreInputs( - ctx context.Context, - startBlock uint64, - endBlock uint64, - apps []Application, -) error { - appsToProcess := []common.Address{} - - for _, app := range apps { - - // Get App EpochLength - err := r.addAppEpochLengthIntoCache(app) - if err != nil { - slog.Error("Error adding epoch length into cache", - "app", app.ContractAddress, - "error", err) - continue - } - - appsToProcess = append(appsToProcess, app.ContractAddress) - - } - - if len(appsToProcess) == 0 { - slog.Warn("No valid running applications") - return nil - } - - // Retrieve Inputs from blockchain - appInputsMap, err := r.readInputsFromBlockchain(ctx, appsToProcess, startBlock, endBlock) - if err != nil { - return fmt.Errorf("failed to read inputs from block %v to block %v. %w", - startBlock, - endBlock, - err) - } - - // Index Inputs into epochs and handle epoch finalization - for address, inputs := range appInputsMap { - - epochLength := r.epochLengthCache[address] - - // Retrieves last open epoch from DB - currentEpoch, err := r.repository.GetEpoch(ctx, - calculateEpochIndex(epochLength, startBlock), address) - if err != nil { - slog.Error("Error retrieving existing current epoch", - "app", address, - "error", err, - ) - continue - } - - // Check current epoch status - if currentEpoch != nil && currentEpoch.Status != EpochStatusOpen { - slog.Error("Current epoch is not open", - "app", address, - "epoch-index", currentEpoch.Index, - "status", currentEpoch.Status, - ) - continue - } - - // Initialize epochs inputs map - var epochInputMap = make(map[*Epoch][]Input) - - // Index Inputs into epochs - for _, input := range inputs { - - inputEpochIndex := calculateEpochIndex(epochLength, input.BlockNumber) - - // If input belongs into a new epoch, close the previous known one - if currentEpoch != nil && currentEpoch.Index != inputEpochIndex { - currentEpoch.Status = EpochStatusClosed - slog.Info("Closing epoch", - "app", currentEpoch.AppAddress, - "epoch-index", currentEpoch.Index, - "start", currentEpoch.FirstBlock, - "end", currentEpoch.LastBlock) - // Add it to inputMap, so it will be stored - epochInputMap[currentEpoch] = []Input{} - currentEpoch = nil - } - if currentEpoch == nil { - currentEpoch = &Epoch{ - Index: inputEpochIndex, - FirstBlock: inputEpochIndex * epochLength, - LastBlock: (inputEpochIndex * epochLength) + epochLength - 1, - Status: EpochStatusOpen, - AppAddress: address, - } - } - - slog.Info("Indexing new Input into epoch", - "app", address, - "index", input.Index, - "block", input.BlockNumber, - "epoch-index", inputEpochIndex) - - currentInputs, ok := epochInputMap[currentEpoch] - if !ok { - currentInputs = []Input{} - } - epochInputMap[currentEpoch] = append(currentInputs, *input) - - } - - // Indexed all inputs. Check if it is time to close this epoch - if currentEpoch != nil && endBlock >= currentEpoch.LastBlock { - currentEpoch.Status = EpochStatusClosed - slog.Info("Closing epoch", - "app", currentEpoch.AppAddress, - "epoch-index", currentEpoch.Index, - "start", currentEpoch.FirstBlock, - "end", currentEpoch.LastBlock) - // Add to inputMap so it is stored - _, ok := epochInputMap[currentEpoch] - if !ok { - epochInputMap[currentEpoch] = []Input{} - } - } - - _, _, err = r.repository.StoreEpochAndInputsTransaction( - ctx, - epochInputMap, - endBlock, - address, - ) - if err != nil { - slog.Error("Error storing inputs and epochs", - "app", address, - "error", err, - ) - continue - } - - // Store everything - if len(epochInputMap) > 0 { - - slog.Debug("Inputs and epochs stored successfully", - "app", address, - "start-block", startBlock, - "end-block", endBlock, - "total epochs", len(epochInputMap), - "total inputs", len(inputs), - ) - } else { - slog.Debug("No inputs or epochs to store") - } - - } - - return nil -} - -// Checks the epoch length cache and read epoch length from IConsensus -// and add it to the cache if needed -func (r *EvmReader) addAppEpochLengthIntoCache(app Application) error { - - epochLength, ok := r.epochLengthCache[app.ContractAddress] - if !ok { - - consensus, err := r.getIConsensus(app) - if err != nil { - return errors.Join( - fmt.Errorf("error retrieving IConsensus contract for app: %s", - app.ContractAddress), - err) - } - - epochLength, err = r.getEpochLengthFromContract(consensus) - if err != nil { - return errors.Join( - fmt.Errorf("error retrieving epoch length from contracts for app %s", - app.ContractAddress), - err) - } - r.epochLengthCache[app.ContractAddress] = epochLength - slog.Info("Got epoch length from IConsensus", - "app", app.ContractAddress, - "epoch length", epochLength) - } else { - slog.Debug("Got epoch length from cache", - "app", app.ContractAddress, - "epoch length", epochLength) - } - - return nil -} - -// Retrieve ConsensusContract for a given Application -func (r *EvmReader) getIConsensus(app Application) (ConsensusContract, error) { +// Retrieve ApplicationContract and ConsensusContract for a given Application. +// Also validates if IConsensus configuration matches the ApplicationContract one +func (r *EvmReader) getAppContracts(app Application, +) (ApplicationContract, ConsensusContract, error) { applicationContract, err := r.contractFactory.NewApplication(app.ContractAddress) if err != nil { - return nil, errors.Join( + return nil, nil, errors.Join( fmt.Errorf("error building application contract"), err, ) @@ -505,14 +263,14 @@ func (r *EvmReader) getIConsensus(app Application) (ConsensusContract, error) { } consensusAddress, err := applicationContract.GetConsensus(nil) if err != nil { - return nil, errors.Join( + return nil, nil, errors.Join( fmt.Errorf("error retrieving application consensus"), err, ) } if app.IConsensusAddress != consensusAddress { - return nil, + return nil, nil, fmt.Errorf("IConsensus addresses do not match. Deployed: %s. Configured: %s", consensusAddress, app.IConsensusAddress) @@ -520,17 +278,17 @@ func (r *EvmReader) getIConsensus(app Application) (ConsensusContract, error) { consensus, err := r.contractFactory.NewIConsensus(consensusAddress) if err != nil { - return nil, errors.Join( + return nil, nil, errors.Join( fmt.Errorf("error building consensus contract"), err, ) } - return consensus, nil + return applicationContract, consensus, nil } // Reads the application epoch length given it's consesus contract -func (r *EvmReader) getEpochLengthFromContract(consensus ConsensusContract) (uint64, error) { +func getEpochLengthFromContract(consensus ConsensusContract) (uint64, error) { epochLengthRaw, err := consensus.GetEpochLength(nil) if err != nil { @@ -543,50 +301,6 @@ func (r *EvmReader) getEpochLengthFromContract(consensus ConsensusContract) (uin return epochLengthRaw.Uint64(), nil } -// Read inputs from the blockchain ordered by Input index -func (r *EvmReader) readInputsFromBlockchain( - ctx context.Context, - appsAddresses []Address, - startBlock, endBlock uint64, -) (map[Address][]*Input, error) { - - // Initialize app input map - var appInputsMap = make(map[Address][]*Input) - for _, appsAddress := range appsAddresses { - appInputsMap[appsAddress] = []*Input{} - } - - opts := bind.FilterOpts{ - Context: ctx, - Start: startBlock, - End: &endBlock, - } - inputsEvents, err := r.inputSource.RetrieveInputs(&opts, appsAddresses, nil) - if err != nil { - return nil, err - } - - // Order inputs as order is not enforced by RetrieveInputs method nor the APIs - for _, event := range inputsEvents { - slog.Debug("Received input", - "app", event.AppContract, - "index", event.Index, - "block", event.Raw.BlockNumber) - input := &Input{ - Index: event.Index.Uint64(), - CompletionStatus: InputStatusNone, - RawData: event.Input, - BlockNumber: event.Raw.BlockNumber, - AppAddress: event.AppContract, - } - - // Insert Sorted - appInputsMap[event.AppContract] = insertSorted( - sortByInputIndex, appInputsMap[event.AppContract], input) - } - return appInputsMap, nil -} - // Util functions // Calculates the epoch index given the input block number @@ -594,7 +308,7 @@ func calculateEpochIndex(epochLength uint64, blockNumber uint64) uint64 { return blockNumber / epochLength } -func appToAddresses(apps []Application) []Address { +func appToAddresses(apps []application) []Address { var addresses []Address for _, app := range apps { addresses = append(addresses, app.ContractAddress) @@ -618,3 +332,17 @@ func insertSorted[T any](compare func(a, b *T) int, slice []*T, item *T) []*T { compare) return slices.Insert(slice, i, item) } + +// Index applications given a key extractor function +func indexApps[K comparable]( + keyExtractor func(application) K, + apps []application, +) map[K][]application { + + result := make(map[K][]application) + for _, item := range apps { + key := keyExtractor(item) + result[key] = append(result[key], item) + } + return result +} diff --git a/internal/evmreader/evmreader_test.go b/internal/evmreader/evmreader_test.go index 23786c402..6976b9ce4 100644 --- a/internal/evmreader/evmreader_test.go +++ b/internal/evmreader/evmreader_test.go @@ -167,115 +167,16 @@ func (s *EvmReaderSuite) TestItFailsToSubscribeForNewInputsOnStart() { s.client.AssertNumberOfCalls(s.T(), "SubscribeNewHead", 1) } -func (s *EvmReaderSuite) TestItReadsInputsFromNewBlocks() { +func (s *EvmReaderSuite) TestItWrongIConsensus() { - wsClient := FakeWSEhtClient{} - - evmReader := NewEvmReader( - s.client, - &wsClient, - s.inputBox, - s.repository, - 0x10, - DefaultBlockStatusLatest, - s.contractFactory, - ) - - // Prepare repository - s.repository.Unset("GetAllRunningApplications") - s.repository.On( - "GetAllRunningApplications", - mock.Anything, - ).Return([]Application{{ - ContractAddress: common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E"), - IConsensusAddress: common.HexToAddress("0xdeadbeef"), - LastProcessedBlock: 0x00, - }}, nil).Once() - s.repository.On( - "GetAllRunningApplications", - mock.Anything, - ).Return([]Application{{ - ContractAddress: common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E"), - IConsensusAddress: common.HexToAddress("0xdeadbeef"), - LastProcessedBlock: 0x11, - }}, nil).Once() - - // Prepare Client - s.client.Unset("HeaderByNumber") - s.client.On( - "HeaderByNumber", - mock.Anything, - mock.Anything, - ).Return(&header0, nil).Once() - s.client.On( - "HeaderByNumber", - mock.Anything, - mock.Anything, - ).Return(&header1, nil).Once() - s.client.On( - "HeaderByNumber", - mock.Anything, - mock.Anything, - ).Return(&header2, nil).Once() + consensusContract := &MockIConsensusContract{} - // Prepare sequence of inputs - s.inputBox.Unset("RetrieveInputs") - events_0 := []inputbox.InputBoxInputAdded{inputAddedEvent0} - mostRecentBlockNumber_0 := uint64(0x11) - retrieveInputsOpts_0 := bind.FilterOpts{ - Context: s.ctx, - Start: 0x10, - End: &mostRecentBlockNumber_0, - } - s.inputBox.On( - "RetrieveInputs", - &retrieveInputsOpts_0, - mock.Anything, - mock.Anything, - ).Return(events_0, nil) + contractFactory := newEmvReaderContractFactory() - events_1 := []inputbox.InputBoxInputAdded{inputAddedEvent1} - mostRecentBlockNumber_1 := uint64(0x12) - retrieveInputsOpts_1 := bind.FilterOpts{ - Context: s.ctx, - Start: 0x12, - End: &mostRecentBlockNumber_1, - } - s.inputBox.On( - "RetrieveInputs", - &retrieveInputsOpts_1, - mock.Anything, + contractFactory.Unset("NewIConsensus") + contractFactory.On("NewIConsensus", mock.Anything, - ).Return(events_1, nil) - - // Start service - ready := make(chan struct{}, 1) - errChannel := make(chan error, 1) - - go func() { - errChannel <- evmReader.Run(s.ctx, ready) - }() - - select { - case <-ready: - break - case err := <-errChannel: - s.FailNow("unexpected error signal", err) - } - - wsClient.fireNewHead(&header0) - wsClient.fireNewHead(&header1) - time.Sleep(time.Second) - - s.inputBox.AssertNumberOfCalls(s.T(), "RetrieveInputs", 2) - s.repository.AssertNumberOfCalls( - s.T(), - "StoreEpochAndInputsTransaction", - 2, - ) -} - -func (s *EvmReaderSuite) TestItReadsInputsFromNewBlocksWrongIConsensus() { + ).Return(consensusContract, nil) wsClient := FakeWSEhtClient{} @@ -286,86 +187,21 @@ func (s *EvmReaderSuite) TestItReadsInputsFromNewBlocksWrongIConsensus() { s.repository, 0x10, DefaultBlockStatusLatest, - s.contractFactory, + contractFactory, ) - // Prepare repository - s.repository.Unset("GetAllRunningApplications") - s.repository.On( - "GetAllRunningApplications", - mock.Anything, - ).Return([]Application{{ - ContractAddress: common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E"), - IConsensusAddress: common.HexToAddress("0xFFFFFFFF"), - LastProcessedBlock: 0x00, - }}, nil).Once() - s.repository.On( - "GetAllRunningApplications", - mock.Anything, - ).Return([]Application{{ - ContractAddress: common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E"), - IConsensusAddress: common.HexToAddress("0xFFFFFFFF"), - LastProcessedBlock: 0x11, - }}, nil).Once() + // Prepare consensus + claimEvent0 := &iconsensus.IConsensusClaimAcceptance{ + AppContract: common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E"), + LastProcessedBlockNumber: big.NewInt(3), + Claim: common.HexToHash("0xdeadbeef"), + } - // Prepare Client - s.client.Unset("HeaderByNumber") - s.client.On( - "HeaderByNumber", - mock.Anything, - mock.Anything, - ).Return(&header0, nil).Once() - s.client.On( - "HeaderByNumber", - mock.Anything, - mock.Anything, - ).Return(&header1, nil).Once() - s.client.On( - "HeaderByNumber", + claimEvents := []*iconsensus.IConsensusClaimAcceptance{claimEvent0} + consensusContract.On("RetrieveClaimAcceptanceEvents", mock.Anything, mock.Anything, - ).Return(&header2, nil).Once() - - // Start service - ready := make(chan struct{}, 1) - errChannel := make(chan error, 1) - - go func() { - errChannel <- evmReader.Run(s.ctx, ready) - }() - - select { - case <-ready: - break - case err := <-errChannel: - s.FailNow("unexpected error signal", err) - } - - wsClient.fireNewHead(&header0) - wsClient.fireNewHead(&header1) - time.Sleep(time.Second) - - s.inputBox.AssertNumberOfCalls(s.T(), "RetrieveInputs", 0) - s.repository.AssertNumberOfCalls( - s.T(), - "StoreEpochAndInputsTransaction", - 0, - ) -} - -func (s *EvmReaderSuite) TestItUpdatesLastProcessedBlockWhenThereIsNoInputs() { - - wsClient := FakeWSEhtClient{} - - evmReader := NewEvmReader( - s.client, - &wsClient, - s.inputBox, - s.repository, - 0x10, - DefaultBlockStatusLatest, - s.contractFactory, - ) + ).Return(claimEvents, nil).Once() // Prepare repository s.repository.Unset("GetAllRunningApplications") @@ -374,17 +210,9 @@ func (s *EvmReaderSuite) TestItUpdatesLastProcessedBlockWhenThereIsNoInputs() { mock.Anything, ).Return([]Application{{ ContractAddress: common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E"), - IConsensusAddress: common.HexToAddress("0xdeadbeef"), + IConsensusAddress: common.HexToAddress("0xFFFFFFFF"), LastProcessedBlock: 0x00, }}, nil).Once() - s.repository.On( - "GetAllRunningApplications", - mock.Anything, - ).Return([]Application{{ - ContractAddress: common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E"), - IConsensusAddress: common.HexToAddress("0xdeadbeef"), - LastProcessedBlock: 0x11, - }}, nil).Once() // Prepare Client s.client.Unset("HeaderByNumber") @@ -393,46 +221,6 @@ func (s *EvmReaderSuite) TestItUpdatesLastProcessedBlockWhenThereIsNoInputs() { mock.Anything, mock.Anything, ).Return(&header0, nil).Once() - s.client.On( - "HeaderByNumber", - mock.Anything, - mock.Anything, - ).Return(&header1, nil).Once() - s.client.On( - "HeaderByNumber", - mock.Anything, - mock.Anything, - ).Return(&header2, nil).Once() - - // Prepare sequence of inputs - s.inputBox.Unset("RetrieveInputs") - events_0 := []inputbox.InputBoxInputAdded{} - mostRecentBlockNumber_0 := uint64(0x11) - retrieveInputsOpts_0 := bind.FilterOpts{ - Context: s.ctx, - Start: 0x10, - End: &mostRecentBlockNumber_0, - } - s.inputBox.On( - "RetrieveInputs", - &retrieveInputsOpts_0, - mock.Anything, - mock.Anything, - ).Return(events_0, nil) - - events_1 := []inputbox.InputBoxInputAdded{} - mostRecentBlockNumber_1 := uint64(0x12) - retrieveInputsOpts_1 := bind.FilterOpts{ - Context: s.ctx, - Start: 0x12, - End: &mostRecentBlockNumber_1, - } - s.inputBox.On( - "RetrieveInputs", - &retrieveInputsOpts_1, - mock.Anything, - mock.Anything, - ).Return(events_1, nil) // Start service ready := make(chan struct{}, 1) @@ -450,166 +238,21 @@ func (s *EvmReaderSuite) TestItUpdatesLastProcessedBlockWhenThereIsNoInputs() { } wsClient.fireNewHead(&header0) - wsClient.fireNewHead(&header1) time.Sleep(time.Second) - s.inputBox.AssertNumberOfCalls(s.T(), "RetrieveInputs", 2) - s.repository.AssertNumberOfCalls( - s.T(), - "StoreEpochAndInputsTransaction", - 2, - ) -} - -func (s *EvmReaderSuite) TestItReadsMultipleInputsFromSingleNewBlock() { - - wsClient := FakeWSEhtClient{} - - inputReader := NewEvmReader( - s.client, - &wsClient, - s.inputBox, - s.repository, - 0x10, - DefaultBlockStatusLatest, - s.contractFactory, - ) - - // Prepare Client - s.client.Unset("HeaderByNumber") - s.client.On( - "HeaderByNumber", - mock.Anything, - mock.Anything, - ).Return(&header2, nil).Once() - - // Prepare sequence of inputs - s.inputBox.Unset("RetrieveInputs") - events_2 := []inputbox.InputBoxInputAdded{inputAddedEvent2, inputAddedEvent3} - mostRecentBlockNumber_2 := uint64(0x13) - retrieveInputsOpts_2 := bind.FilterOpts{ - Context: s.ctx, - Start: 0x13, - End: &mostRecentBlockNumber_2, - } - s.inputBox.On( - "RetrieveInputs", - &retrieveInputsOpts_2, - mock.Anything, - mock.Anything, - ).Return(events_2, nil) - - // Prepare Repo - s.repository.Unset("GetAllRunningApplications") - s.repository.On( - "GetAllRunningApplications", - mock.Anything, - ).Return([]Application{{ - ContractAddress: common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E"), - IConsensusAddress: common.HexToAddress("0xdeadbeef"), - LastProcessedBlock: 0x12, - }}, nil).Once() - s.repository.Unset("StoreEpochAndInputsTransaction") - s.repository.On( - "StoreEpochAndInputsTransaction", - mock.Anything, - mock.Anything, - mock.Anything, - mock.Anything, - ).Once().Run(func(arguments mock.Arguments) { - var epochInputMap map[*Epoch][]Input - obj := arguments.Get(1) - epochInputMap, ok := obj.(map[*Epoch][]Input) - s.Require().True(ok) - s.Require().Equal(1, len(epochInputMap)) - for _, inputs := range epochInputMap { - s.Require().Equal(2, len(inputs)) - break - } - - }).Return(make(map[uint64]uint64), make(map[uint64][]uint64), nil) - - // Start service - ready := make(chan struct{}, 1) - errChannel := make(chan error, 1) - - go func() { - errChannel <- inputReader.Run(s.ctx, ready) - }() - - select { - case <-ready: - break - case err := <-errChannel: - s.FailNow("unexpected error signal", err) - } - - wsClient.fireNewHead(&header2) - // Give a time for - time.Sleep(1 * time.Second) - - s.inputBox.AssertNumberOfCalls(s.T(), "RetrieveInputs", 1) + // Do not process inputs + s.inputBox.AssertNumberOfCalls(s.T(), "RetrieveInputs", 0) s.repository.AssertNumberOfCalls( s.T(), "StoreEpochAndInputsTransaction", - 1, - ) -} - -func (s *EvmReaderSuite) TestItStartsWhenLasProcessedBlockIsTheMostRecentBlock() { - - wsClient := FakeWSEhtClient{} - inputReader := NewEvmReader( - s.client, - &wsClient, - s.inputBox, - s.repository, - 0x10, - DefaultBlockStatusLatest, - s.contractFactory, + 0, ) - // Prepare Client - s.client.Unset("HeaderByNumber") - s.client.On( - "HeaderByNumber", - mock.Anything, - mock.Anything, - ).Return(&header0, nil).Once() - - // Prepare Repo - s.repository.Unset("GetAllRunningApplications") - s.repository.On( - "GetAllRunningApplications", - mock.Anything, - ).Return([]Application{{ - ContractAddress: common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E"), - IConsensusAddress: common.HexToAddress("0xdeadbeef"), - LastProcessedBlock: 0x11, - }}, nil).Once() - - // Start service - ready := make(chan struct{}, 1) - errChannel := make(chan error, 1) - - go func() { - errChannel <- inputReader.Run(s.ctx, ready) - }() - - select { - case <-ready: - break - case err := <-errChannel: - s.FailNow("unexpected error signal", err) - } - - wsClient.fireNewHead(&header2) - time.Sleep(1 * time.Second) - - s.inputBox.AssertNumberOfCalls(s.T(), "RetrieveInputs", 0) + // Do not process claims + s.inputBox.AssertNumberOfCalls(s.T(), "RetrieveClaimAcceptanceEvents", 0) s.repository.AssertNumberOfCalls( s.T(), - "StoreEpochAndInputsTransaction", + "StoreClaimsTransaction", 0, ) } diff --git a/internal/evmreader/input.go b/internal/evmreader/input.go new file mode 100644 index 000000000..c8881cdbc --- /dev/null +++ b/internal/evmreader/input.go @@ -0,0 +1,307 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +package evmreader + +import ( + "context" + "errors" + "fmt" + "log/slog" + + . "github.com/cartesi/rollups-node/internal/node/model" + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" +) + +// Check if is there new Inputs for all running Applications +func (r *EvmReader) checkForNewInputs( + ctx context.Context, + apps []application, + mostRecentBlockNumber uint64, +) { + + slog.Debug("Checking for new inputs") + + groupedApps := indexApps(byLastProcessedBlock, apps) + + for lastProcessedBlock, apps := range groupedApps { + + appAddresses := appToAddresses(apps) + + // Safeguard: Only check blocks starting from the block where the InputBox + // contract was deployed as Inputs can be added to that same block + if lastProcessedBlock < r.inputBoxDeploymentBlock { + lastProcessedBlock = r.inputBoxDeploymentBlock - 1 + } + + if mostRecentBlockNumber > lastProcessedBlock { + + slog.Info("Checking inputs for applications", + "apps", appAddresses, + "last processed block", lastProcessedBlock, + "most recent block", mostRecentBlockNumber, + ) + + err := r.readAndStoreInputs(ctx, + lastProcessedBlock+1, + mostRecentBlockNumber, + apps, + ) + if err != nil { + slog.Error("Error reading inputs", + "apps", appAddresses, + "last processed block", lastProcessedBlock, + "most recent block", mostRecentBlockNumber, + "error", err, + ) + continue + } + } else if mostRecentBlockNumber < lastProcessedBlock { + slog.Warn( + "Not reading inputs: most recent block is lower than the last processed one", + "apps", appAddresses, + "last processed block", lastProcessedBlock, + "most recent block", mostRecentBlockNumber, + ) + } else { + slog.Info("Not reading inputs: already checked the most recent blocks", + "apps", appAddresses, + "last processed block", lastProcessedBlock, + "most recent block", mostRecentBlockNumber, + ) + } + } +} + +// Read and store inputs from the InputSource given specific filter options. +func (r *EvmReader) readAndStoreInputs( + ctx context.Context, + startBlock uint64, + endBlock uint64, + apps []application, +) error { + appsToProcess := []common.Address{} + + for _, app := range apps { + + // Get App EpochLength + err := r.addAppEpochLengthIntoCache(app) + if err != nil { + slog.Error("Error adding epoch length into cache", + "app", app.ContractAddress, + "error", err) + continue + } + + appsToProcess = append(appsToProcess, app.ContractAddress) + + } + + if len(appsToProcess) == 0 { + slog.Warn("No valid running applications") + return nil + } + + // Retrieve Inputs from blockchain + appInputsMap, err := r.readInputsFromBlockchain(ctx, appsToProcess, startBlock, endBlock) + if err != nil { + return fmt.Errorf("failed to read inputs from block %v to block %v. %w", + startBlock, + endBlock, + err) + } + + // Index Inputs into epochs and handle epoch finalization + for address, inputs := range appInputsMap { + + epochLength := r.epochLengthCache[address] + + // Retrieves last open epoch from DB + currentEpoch, err := r.repository.GetEpoch(ctx, + calculateEpochIndex(epochLength, startBlock), address) + if err != nil { + slog.Error("Error retrieving existing current epoch", + "app", address, + "error", err, + ) + continue + } + + // Check current epoch status + if currentEpoch != nil && currentEpoch.Status != EpochStatusOpen { + slog.Error("Current epoch is not open", + "app", address, + "epoch-index", currentEpoch.Index, + "status", currentEpoch.Status, + ) + continue + } + + // Initialize epochs inputs map + var epochInputMap = make(map[*Epoch][]Input) + + // Index Inputs into epochs + for _, input := range inputs { + + inputEpochIndex := calculateEpochIndex(epochLength, input.BlockNumber) + + // If input belongs into a new epoch, close the previous known one + if currentEpoch != nil && currentEpoch.Index != inputEpochIndex { + currentEpoch.Status = EpochStatusClosed + slog.Info("Closing epoch", + "app", currentEpoch.AppAddress, + "epoch-index", currentEpoch.Index, + "start", currentEpoch.FirstBlock, + "end", currentEpoch.LastBlock) + // Add it to inputMap, so it will be stored + epochInputMap[currentEpoch] = []Input{} + currentEpoch = nil + } + if currentEpoch == nil { + currentEpoch = &Epoch{ + Index: inputEpochIndex, + FirstBlock: inputEpochIndex * epochLength, + LastBlock: (inputEpochIndex * epochLength) + epochLength - 1, + Status: EpochStatusOpen, + AppAddress: address, + } + } + + slog.Info("Indexing new Input into epoch", + "app", address, + "index", input.Index, + "block", input.BlockNumber, + "epoch-index", inputEpochIndex) + + currentInputs, ok := epochInputMap[currentEpoch] + if !ok { + currentInputs = []Input{} + } + epochInputMap[currentEpoch] = append(currentInputs, *input) + + } + + // Indexed all inputs. Check if it is time to close this epoch + if currentEpoch != nil && endBlock >= currentEpoch.LastBlock { + currentEpoch.Status = EpochStatusClosed + slog.Info("Closing epoch", + "app", currentEpoch.AppAddress, + "epoch-index", currentEpoch.Index, + "start", currentEpoch.FirstBlock, + "end", currentEpoch.LastBlock) + // Add to inputMap so it is stored + _, ok := epochInputMap[currentEpoch] + if !ok { + epochInputMap[currentEpoch] = []Input{} + } + } + + _, _, err = r.repository.StoreEpochAndInputsTransaction( + ctx, + epochInputMap, + endBlock, + address, + ) + if err != nil { + slog.Error("Error storing inputs and epochs", + "app", address, + "error", err, + ) + continue + } + + // Store everything + if len(epochInputMap) > 0 { + + slog.Debug("Inputs and epochs stored successfully", + "app", address, + "start-block", startBlock, + "end-block", endBlock, + "total epochs", len(epochInputMap), + "total inputs", len(inputs), + ) + } else { + slog.Debug("No inputs or epochs to store") + } + + } + + return nil +} + +// Checks the epoch length cache and read epoch length from IConsensus +// and add it to the cache if needed +func (r *EvmReader) addAppEpochLengthIntoCache(app application) error { + + epochLength, ok := r.epochLengthCache[app.ContractAddress] + if !ok { + + epochLength, err := getEpochLengthFromContract(app.consensusContract) + if err != nil { + return errors.Join( + fmt.Errorf("error retrieving epoch length from contracts for app %s", + app.ContractAddress), + err) + } + r.epochLengthCache[app.ContractAddress] = epochLength + slog.Info("Got epoch length from IConsensus", + "app", app.ContractAddress, + "epoch length", epochLength) + } else { + slog.Debug("Got epoch length from cache", + "app", app.ContractAddress, + "epoch length", epochLength) + } + + return nil +} + +// Read inputs from the blockchain ordered by Input index +func (r *EvmReader) readInputsFromBlockchain( + ctx context.Context, + appsAddresses []Address, + startBlock, endBlock uint64, +) (map[Address][]*Input, error) { + + // Initialize app input map + var appInputsMap = make(map[Address][]*Input) + for _, appsAddress := range appsAddresses { + appInputsMap[appsAddress] = []*Input{} + } + + opts := bind.FilterOpts{ + Context: ctx, + Start: startBlock, + End: &endBlock, + } + inputsEvents, err := r.inputSource.RetrieveInputs(&opts, appsAddresses, nil) + if err != nil { + return nil, err + } + + // Order inputs as order is not enforced by RetrieveInputs method nor the APIs + for _, event := range inputsEvents { + slog.Debug("Received input", + "app", event.AppContract, + "index", event.Index, + "block", event.Raw.BlockNumber) + input := &Input{ + Index: event.Index.Uint64(), + CompletionStatus: InputStatusNone, + RawData: event.Input, + BlockNumber: event.Raw.BlockNumber, + AppAddress: event.AppContract, + } + + // Insert Sorted + appInputsMap[event.AppContract] = insertSorted( + sortByInputIndex, appInputsMap[event.AppContract], input) + } + return appInputsMap, nil +} + +// // byLastProcessedBlock key extractor function intended to be used with `indexApps` function +func byLastProcessedBlock(app application) uint64 { + return app.LastProcessedBlock +} diff --git a/internal/evmreader/input_test.go b/internal/evmreader/input_test.go new file mode 100644 index 000000000..a5776c99d --- /dev/null +++ b/internal/evmreader/input_test.go @@ -0,0 +1,461 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +package evmreader + +import ( + "time" + + . "github.com/cartesi/rollups-node/internal/node/model" + "github.com/cartesi/rollups-node/pkg/contracts/inputbox" + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/mock" +) + +func (s *EvmReaderSuite) TestItReadsInputsFromNewBlocks() { + + wsClient := FakeWSEhtClient{} + + evmReader := NewEvmReader( + s.client, + &wsClient, + s.inputBox, + s.repository, + 0x10, + DefaultBlockStatusLatest, + s.contractFactory, + ) + + // Prepare repository + s.repository.Unset("GetAllRunningApplications") + s.repository.On( + "GetAllRunningApplications", + mock.Anything, + ).Return([]Application{{ + ContractAddress: common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E"), + IConsensusAddress: common.HexToAddress("0xdeadbeef"), + LastProcessedBlock: 0x00, + }}, nil).Once() + s.repository.On( + "GetAllRunningApplications", + mock.Anything, + ).Return([]Application{{ + ContractAddress: common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E"), + IConsensusAddress: common.HexToAddress("0xdeadbeef"), + LastProcessedBlock: 0x11, + }}, nil).Once() + + // Prepare Client + s.client.Unset("HeaderByNumber") + s.client.On( + "HeaderByNumber", + mock.Anything, + mock.Anything, + ).Return(&header0, nil).Once() + s.client.On( + "HeaderByNumber", + mock.Anything, + mock.Anything, + ).Return(&header1, nil).Once() + s.client.On( + "HeaderByNumber", + mock.Anything, + mock.Anything, + ).Return(&header2, nil).Once() + + // Prepare sequence of inputs + s.inputBox.Unset("RetrieveInputs") + events_0 := []inputbox.InputBoxInputAdded{inputAddedEvent0} + mostRecentBlockNumber_0 := uint64(0x11) + retrieveInputsOpts_0 := bind.FilterOpts{ + Context: s.ctx, + Start: 0x10, + End: &mostRecentBlockNumber_0, + } + s.inputBox.On( + "RetrieveInputs", + &retrieveInputsOpts_0, + mock.Anything, + mock.Anything, + ).Return(events_0, nil) + + events_1 := []inputbox.InputBoxInputAdded{inputAddedEvent1} + mostRecentBlockNumber_1 := uint64(0x12) + retrieveInputsOpts_1 := bind.FilterOpts{ + Context: s.ctx, + Start: 0x12, + End: &mostRecentBlockNumber_1, + } + s.inputBox.On( + "RetrieveInputs", + &retrieveInputsOpts_1, + mock.Anything, + mock.Anything, + ).Return(events_1, nil) + + // Start service + ready := make(chan struct{}, 1) + errChannel := make(chan error, 1) + + go func() { + errChannel <- evmReader.Run(s.ctx, ready) + }() + + select { + case <-ready: + break + case err := <-errChannel: + s.FailNow("unexpected error signal", err) + } + + wsClient.fireNewHead(&header0) + wsClient.fireNewHead(&header1) + time.Sleep(time.Second) + + s.inputBox.AssertNumberOfCalls(s.T(), "RetrieveInputs", 2) + s.repository.AssertNumberOfCalls( + s.T(), + "StoreEpochAndInputsTransaction", + 2, + ) +} + +func (s *EvmReaderSuite) TestItReadsInputsFromNewBlocksWrongIConsensus() { + + wsClient := FakeWSEhtClient{} + + evmReader := NewEvmReader( + s.client, + &wsClient, + s.inputBox, + s.repository, + 0x10, + DefaultBlockStatusLatest, + s.contractFactory, + ) + + // Prepare repository + s.repository.Unset("GetAllRunningApplications") + s.repository.On( + "GetAllRunningApplications", + mock.Anything, + ).Return([]Application{{ + ContractAddress: common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E"), + IConsensusAddress: common.HexToAddress("0xFFFFFFFF"), + LastProcessedBlock: 0x00, + }}, nil).Once() + s.repository.On( + "GetAllRunningApplications", + mock.Anything, + ).Return([]Application{{ + ContractAddress: common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E"), + IConsensusAddress: common.HexToAddress("0xFFFFFFFF"), + LastProcessedBlock: 0x11, + }}, nil).Once() + + // Prepare Client + s.client.Unset("HeaderByNumber") + s.client.On( + "HeaderByNumber", + mock.Anything, + mock.Anything, + ).Return(&header0, nil).Once() + s.client.On( + "HeaderByNumber", + mock.Anything, + mock.Anything, + ).Return(&header1, nil).Once() + s.client.On( + "HeaderByNumber", + mock.Anything, + mock.Anything, + ).Return(&header2, nil).Once() + + // Start service + ready := make(chan struct{}, 1) + errChannel := make(chan error, 1) + + go func() { + errChannel <- evmReader.Run(s.ctx, ready) + }() + + select { + case <-ready: + break + case err := <-errChannel: + s.FailNow("unexpected error signal", err) + } + + wsClient.fireNewHead(&header0) + wsClient.fireNewHead(&header1) + time.Sleep(time.Second) + + s.inputBox.AssertNumberOfCalls(s.T(), "RetrieveInputs", 0) + s.repository.AssertNumberOfCalls( + s.T(), + "StoreEpochAndInputsTransaction", + 0, + ) +} + +func (s *EvmReaderSuite) TestItUpdatesLastProcessedBlockWhenThereIsNoInputs() { + + wsClient := FakeWSEhtClient{} + + evmReader := NewEvmReader( + s.client, + &wsClient, + s.inputBox, + s.repository, + 0x10, + DefaultBlockStatusLatest, + s.contractFactory, + ) + + // Prepare repository + s.repository.Unset("GetAllRunningApplications") + s.repository.On( + "GetAllRunningApplications", + mock.Anything, + ).Return([]Application{{ + ContractAddress: common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E"), + IConsensusAddress: common.HexToAddress("0xdeadbeef"), + LastProcessedBlock: 0x00, + }}, nil).Once() + s.repository.On( + "GetAllRunningApplications", + mock.Anything, + ).Return([]Application{{ + ContractAddress: common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E"), + IConsensusAddress: common.HexToAddress("0xdeadbeef"), + LastProcessedBlock: 0x11, + }}, nil).Once() + + // Prepare Client + s.client.Unset("HeaderByNumber") + s.client.On( + "HeaderByNumber", + mock.Anything, + mock.Anything, + ).Return(&header0, nil).Once() + s.client.On( + "HeaderByNumber", + mock.Anything, + mock.Anything, + ).Return(&header1, nil).Once() + s.client.On( + "HeaderByNumber", + mock.Anything, + mock.Anything, + ).Return(&header2, nil).Once() + + // Prepare sequence of inputs + s.inputBox.Unset("RetrieveInputs") + events_0 := []inputbox.InputBoxInputAdded{} + mostRecentBlockNumber_0 := uint64(0x11) + retrieveInputsOpts_0 := bind.FilterOpts{ + Context: s.ctx, + Start: 0x10, + End: &mostRecentBlockNumber_0, + } + s.inputBox.On( + "RetrieveInputs", + &retrieveInputsOpts_0, + mock.Anything, + mock.Anything, + ).Return(events_0, nil) + + events_1 := []inputbox.InputBoxInputAdded{} + mostRecentBlockNumber_1 := uint64(0x12) + retrieveInputsOpts_1 := bind.FilterOpts{ + Context: s.ctx, + Start: 0x12, + End: &mostRecentBlockNumber_1, + } + s.inputBox.On( + "RetrieveInputs", + &retrieveInputsOpts_1, + mock.Anything, + mock.Anything, + ).Return(events_1, nil) + + // Start service + ready := make(chan struct{}, 1) + errChannel := make(chan error, 1) + + go func() { + errChannel <- evmReader.Run(s.ctx, ready) + }() + + select { + case <-ready: + break + case err := <-errChannel: + s.FailNow("unexpected error signal", err) + } + + wsClient.fireNewHead(&header0) + wsClient.fireNewHead(&header1) + time.Sleep(time.Second) + + s.inputBox.AssertNumberOfCalls(s.T(), "RetrieveInputs", 2) + s.repository.AssertNumberOfCalls( + s.T(), + "StoreEpochAndInputsTransaction", + 2, + ) +} + +func (s *EvmReaderSuite) TestItReadsMultipleInputsFromSingleNewBlock() { + + wsClient := FakeWSEhtClient{} + + inputReader := NewEvmReader( + s.client, + &wsClient, + s.inputBox, + s.repository, + 0x10, + DefaultBlockStatusLatest, + s.contractFactory, + ) + + // Prepare Client + s.client.Unset("HeaderByNumber") + s.client.On( + "HeaderByNumber", + mock.Anything, + mock.Anything, + ).Return(&header2, nil).Once() + + // Prepare sequence of inputs + s.inputBox.Unset("RetrieveInputs") + events_2 := []inputbox.InputBoxInputAdded{inputAddedEvent2, inputAddedEvent3} + mostRecentBlockNumber_2 := uint64(0x13) + retrieveInputsOpts_2 := bind.FilterOpts{ + Context: s.ctx, + Start: 0x13, + End: &mostRecentBlockNumber_2, + } + s.inputBox.On( + "RetrieveInputs", + &retrieveInputsOpts_2, + mock.Anything, + mock.Anything, + ).Return(events_2, nil) + + // Prepare Repo + s.repository.Unset("GetAllRunningApplications") + s.repository.On( + "GetAllRunningApplications", + mock.Anything, + ).Return([]Application{{ + ContractAddress: common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E"), + IConsensusAddress: common.HexToAddress("0xdeadbeef"), + LastProcessedBlock: 0x12, + }}, nil).Once() + s.repository.Unset("StoreEpochAndInputsTransaction") + s.repository.On( + "StoreEpochAndInputsTransaction", + mock.Anything, + mock.Anything, + mock.Anything, + mock.Anything, + ).Once().Run(func(arguments mock.Arguments) { + var epochInputMap map[*Epoch][]Input + obj := arguments.Get(1) + epochInputMap, ok := obj.(map[*Epoch][]Input) + s.Require().True(ok) + s.Require().Equal(1, len(epochInputMap)) + for _, inputs := range epochInputMap { + s.Require().Equal(2, len(inputs)) + break + } + + }).Return(make(map[uint64]uint64), make(map[uint64][]uint64), nil) + + // Start service + ready := make(chan struct{}, 1) + errChannel := make(chan error, 1) + + go func() { + errChannel <- inputReader.Run(s.ctx, ready) + }() + + select { + case <-ready: + break + case err := <-errChannel: + s.FailNow("unexpected error signal", err) + } + + wsClient.fireNewHead(&header2) + // Give a time for + time.Sleep(1 * time.Second) + + s.inputBox.AssertNumberOfCalls(s.T(), "RetrieveInputs", 1) + s.repository.AssertNumberOfCalls( + s.T(), + "StoreEpochAndInputsTransaction", + 1, + ) +} + +func (s *EvmReaderSuite) TestItStartsWhenLasProcessedBlockIsTheMostRecentBlock() { + + wsClient := FakeWSEhtClient{} + inputReader := NewEvmReader( + s.client, + &wsClient, + s.inputBox, + s.repository, + 0x10, + DefaultBlockStatusLatest, + s.contractFactory, + ) + + // Prepare Client + s.client.Unset("HeaderByNumber") + s.client.On( + "HeaderByNumber", + mock.Anything, + mock.Anything, + ).Return(&header0, nil).Once() + + // Prepare Repo + s.repository.Unset("GetAllRunningApplications") + s.repository.On( + "GetAllRunningApplications", + mock.Anything, + ).Return([]Application{{ + ContractAddress: common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E"), + IConsensusAddress: common.HexToAddress("0xdeadbeef"), + LastProcessedBlock: 0x11, + }}, nil).Once() + + // Start service + ready := make(chan struct{}, 1) + errChannel := make(chan error, 1) + + go func() { + errChannel <- inputReader.Run(s.ctx, ready) + }() + + select { + case <-ready: + break + case err := <-errChannel: + s.FailNow("unexpected error signal", err) + } + + wsClient.fireNewHead(&header2) + time.Sleep(1 * time.Second) + + s.inputBox.AssertNumberOfCalls(s.T(), "RetrieveInputs", 0) + s.repository.AssertNumberOfCalls( + s.T(), + "StoreEpochAndInputsTransaction", + 0, + ) +} diff --git a/internal/evmreader/inputreader.go b/internal/evmreader/inputreader.go deleted file mode 100644 index cd256f677..000000000 --- a/internal/evmreader/inputreader.go +++ /dev/null @@ -1,4 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -package evmreader