Skip to content

Commit

Permalink
feat(input-reader): Add input-reader retry policy
Browse files Browse the repository at this point in the history
  • Loading branch information
fmoura committed Jul 9, 2024
1 parent d02a7ec commit 984437c
Show file tree
Hide file tree
Showing 18 changed files with 570 additions and 281 deletions.
14 changes: 14 additions & 0 deletions cmd/cartesi-rollups-cli/root/deps/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ const examples = `# Run all deps:
cartesi-rollups-cli run-deps`

var depsConfig = deps.NewDefaultDepsConfig()
var disablePostgres = false
var disableDevnet = false
var verbose = false

func init() {
Expand Down Expand Up @@ -58,6 +60,10 @@ func init() {
deps.DefaultDevnetNoMining,
"Devnet disable mining")

Cmd.Flags().BoolVar(&disablePostgres, "disable-postgres", false, "Disable Postgres")

Cmd.Flags().BoolVar(&disableDevnet, "disable-devnet", false, "Disable Devnet")

Cmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "verbose logs")
}

Expand All @@ -77,6 +83,14 @@ func run(cmd *cobra.Command, args []string) {
slog.SetDefault(logger)
}

if disablePostgres {
depsConfig.Postgres = nil
}

if disableDevnet {
depsConfig.Devnet = nil
}

depsContainers, err := deps.Run(ctx, *depsConfig)
cobra.CheckErr(err)

Expand Down
325 changes: 325 additions & 0 deletions internal/evmreader/evmreader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,325 @@
// (c) Cartesi and individual authors (see AUTHORS)
// SPDX-License-Identifier: Apache-2.0 (see LICENSE)

package evmreader

import (
"context"
"fmt"
"log/slog"
"math/big"

"github.com/cartesi/rollups-node/internal/node/model"
"github.com/cartesi/rollups-node/pkg/contracts/inputbox"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc"
)

// EvmReader reads inputs from the blockchain
type EvmReader struct {
client EthClient
wsClient EthWsClient
inputSource InputSource
repository InputReaderRepository
inputBoxAddress common.Address
inputBoxBlockNumber uint64
}

// Interface for Input reading
type InputSource interface {
// Wrapper for FilterInputAdded()
RetrieveInputs(
opts *bind.FilterOpts,
appContract []common.Address,
index []*big.Int,
) ([]*inputbox.InputBoxInputAdded, error)
}

// Interface for the node repository
type InputReaderRepository interface {
InsertInputsAndUpdateLastProcessedBlock(
ctx context.Context,
inputs []model.Input,
blockNumber uint64,
appAddress common.Address,
) error
GetAllApplications(
ctx context.Context,
) ([]model.Application, error)
}

// EthClient mimics part of ethclient.Client functions to narrow down the
// interface needed by the InputReader, and must be binded to Http enpoint
type EthClient interface {
HeaderByNumber(
ctx context.Context,
number *big.Int,
) (*types.Header, error)
}

// EthWsClient mimics part of ethclient.Client functions to narrow down the
// interface needed by the InputReader, and must be binded to WS endpoint
type EthWsClient interface {
SubscribeNewHead(
ctx context.Context,
ch chan<- *types.Header,
) (ethereum.Subscription, error)
}

type SubscriptionError struct {
Cause error
}

func (e *SubscriptionError) Error() string {
return fmt.Sprintf("Subscription error : %v", e.Cause)
}

func (r EvmReader) String() string {
return "input-reader"
}

// Creates a new InputReader.
func NewEvmReader(
client EthClient,
wsClient EthWsClient,
inputSource InputSource,
repository InputReaderRepository,
inputBoxAddress common.Address,
inputBoxBlockNumber uint64,
) EvmReader {
return EvmReader{
client: client,
wsClient: wsClient,
inputSource: inputSource,
repository: repository,
inputBoxAddress: inputBoxAddress,
inputBoxBlockNumber: inputBoxBlockNumber,
}
}

type nextInputRangeClassification struct {
lastProcessedBlock uint64
lastDefaultBlockType string
}

func (r *EvmReader) classifyApplicationsByLastProcessedInput(apps []model.Application) map[nextInputRangeClassification][]model.Application {
result := make(map[nextInputRangeClassification][]model.Application)

for _, app := range apps {
var classification = nextInputRangeClassification{
lastProcessedBlock: app.LastProcessedBlock,
lastDefaultBlockType: "finalized",
}
result[classification] = append(result[classification], app)
}

return result

}

func (r *EvmReader) checkForNewInputs(ctx context.Context) error {

// Get All Applications
apps, err := r.repository.GetAllApplications(ctx)
if err != nil {
return err
}

groupedApps := r.classifyApplicationsByLastProcessedInput(apps)

for classification, apps := range groupedApps {

// Safeguard: Only check blocks after InputBox was deployed
if classification.lastProcessedBlock < r.inputBoxBlockNumber {
classification.lastProcessedBlock = r.inputBoxBlockNumber
}

currentMostRecentFinalizedHeader, err := r.fetchMostRecentdHeader(ctx, classification.lastDefaultBlockType)
if err != nil {
slog.Error("Error fetching most recent block",
"last default block",
classification.lastDefaultBlockType,
"error",
err)
continue
}
currentMostRecentFinalizedBlockNumber := currentMostRecentFinalizedHeader.Number.Uint64()

if currentMostRecentFinalizedBlockNumber > classification.lastProcessedBlock {

err = r.readInputs(ctx,
classification.lastProcessedBlock+1,
currentMostRecentFinalizedBlockNumber,
apps,
)
if err != nil {
return err
}
} else if classification.lastProcessedBlock < currentMostRecentFinalizedBlockNumber {
slog.Warn(
"current most recent block is lower than the last processed one",
"most recent block",
currentMostRecentFinalizedBlockNumber,
"last processed",
classification.lastProcessedBlock,
)
}
}
}

Check failure on line 171 in internal/evmreader/evmreader.go

View workflow job for this annotation

GitHub Actions / test-go

missing return) (typecheck)

Check failure on line 171 in internal/evmreader/evmreader.go

View workflow job for this annotation

GitHub Actions / test-go

missing return

func (r *EvmReader) Start(
ctx context.Context,
ready chan<- struct{},
) error {

r.checkForNewInputs(ctx)

for {
watchForNewInputsError := r.watchForNewInputs(ctx, ready)
if _, ok := watchForNewInputsError.(*SubscriptionError); !ok {
return watchForNewInputsError
}
slog.Debug(watchForNewInputsError.Error())
slog.Debug("Reconnecting...")
}
}

// Fetch the most recent `finalized` header, up to what all inputs should be
// considered finalized in L1
func (r *EvmReader) fetchMostRecentdHeader(
ctx context.Context,
defaultBlockType string,
) (*types.Header, error) {

var defaultBlockTypeNumber int64
switch defaultBlockType {
case "latest":
defaultBlockTypeNumber = rpc.LatestBlockNumber.Int64()
break
case "safe":
defaultBlockTypeNumber = rpc.SafeBlockNumber.Int64()
break
case "pending":
defaultBlockTypeNumber = rpc.PendingBlockNumber.Int64()
break
case "finalized":
defaultBlockTypeNumber = rpc.FinalizedBlockNumber.Int64()
break
default:
return nil, fmt.Errorf("Default block type not supported", "type", defaultBlockType)
}

header, err :=
r.client.HeaderByNumber(
ctx,
new(big.Int).SetInt64(defaultBlockTypeNumber))
if err != nil {
return nil, fmt.Errorf("Failed to retrieve header. %v", err)
}

if header == nil {
return nil, fmt.Errorf("Returned header is nil")
}
return header, nil
}

// Read inputs from the InputSource given specific filter options.
func (r *EvmReader) readInputs(
ctx context.Context,
startBlock uint64,
endBlock uint64,
apps []model.Application,
) error {
filter := []common.Address{}

for _, app := range apps {
filter = append(filter, app.AppAddress)
}

opts := bind.FilterOpts{
Context: ctx,
Start: startBlock,
End: &endBlock,
}

inputsEvents, err := r.inputSource.RetrieveInputs(&opts, filter, nil)
if err != nil {
return fmt.Errorf("Failed to read inputs from block %v to block %v. %v",
startBlock,
endBlock,
err)
}

var inputs = make(map[common.Address][]model.Input)
for _, event := range inputsEvents {
slog.Debug("received input ", "app", event.AppContract, "index", event.Index)
input := model.Input{
Index: event.Index.Uint64(),
CompletionStatus: model.InputStatusNone,
Blob: event.Input,
BlockNumber: event.Raw.BlockNumber,
AppAddress: event.AppContract,
}
inputs[event.AppContract] = append(inputs[event.AppContract], input)
}

for address, inputs := range inputs {
err = r.repository.InsertInputsAndUpdateLastProcessedBlock(
ctx,
inputs,
endBlock,
address,
)
if err != nil {
slog.Error("Error inserting inputs",
"app",
address,
"error",
err,
)
continue
}
if len(inputs) > 0 {
slog.Debug("all inputs stored successfully", "app", address)
}
}

return nil
}

// Watch for new blocks and reads new inputs from finalized blocks which have not
// been processed yet.
func (r *EvmReader) watchForNewInputs(
ctx context.Context,
ready chan<- struct{},
) error {
headers := make(chan *types.Header)
sub, err := r.wsClient.SubscribeNewHead(ctx, headers)
if err != nil {
return fmt.Errorf("could not start subscription: %v", err)
}
ready <- struct{}{}
defer sub.Unsubscribe()

for {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-sub.Err():
return &SubscriptionError{Cause: err}
case <-headers:

err = r.checkForNewInputs(ctx)
if err != nil {
slog.Error("Error checking got new inputs",
"error",
err,
)
}

}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// (c) Cartesi and individual authors (see AUTHORS)
// SPDX-License-Identifier: Apache-2.0 (see LICENSE)

package inputreader
package evmreader

import (
"context"
Expand Down Expand Up @@ -71,7 +71,7 @@ type InputReaderSuite struct {
wsClient *MockEthClient
inputBox *MockInputBox
repository *MockRepository
inputReader *InputReader
inputReader *EvmReader
}

func TestInputReaderSuite(t *testing.T) {
Expand Down
Loading

0 comments on commit 984437c

Please sign in to comment.