Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingester main loop #5

Merged
merged 2 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 1 addition & 28 deletions client/duneapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package duneapi

import (
"bytes"
"context"
"encoding/json"
"fmt"
"log/slog"
Expand All @@ -19,18 +18,9 @@ const (
)

type BlockchainIngester interface {
// Sync pushes to DuneAPI the RPCBlock Payloads as they are received in an endless loop
// it will block until:
// - the context is cancelled
// - channel is closed
// - a fatal error occurs
Sync(ctx context.Context, blocksCh <-chan models.RPCBlock) error

// SendBlock sends a block to DuneAPI
SendBlock(payload models.RPCBlock) error

// TODO:
// - Batching multiple blocks in a single request
// - API to discover the latest block number ingested
// this can also provide "next block ranges" to push to DuneAPI
// - log/metrics on catching up/falling behind, distance from tip of chain
Expand Down Expand Up @@ -69,25 +59,8 @@ func New(log *slog.Logger, cfg Config) (*client, error) { // revive:disable-line
}, nil
}

func (c *client) Sync(ctx context.Context, blocksCh <-chan models.RPCBlock) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case payload, ok := <-blocksCh:
if !ok {
return nil // channel closed
}
if err := c.SendBlock(payload); err != nil {
// TODO: implement DeadLetterQueue
// this will leave a "block gap" in DuneAPI, TODO: implement a way to fill this gap
c.log.Error("SendBlock failed, continuing..", "blockNumber", payload.BlockNumber, "error", err)
}
}
}
}

// SendBlock sends a block to DuneAPI
// TODO: support batching multiple blocks in a single request
func (c *client) SendBlock(payload models.RPCBlock) error {
start := time.Now()
buffer := c.bufPool.Get().(*bytes.Buffer)
Expand Down
16 changes: 8 additions & 8 deletions client/jsonrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,6 @@ import (
type BlockchainClient interface {
LatestBlockNumber() (int64, error)
BlockByNumber(ctx context.Context, blockNumber int64) (models.RPCBlock, error)

// SendBlocks sends blocks from startBlockNumber to endBlockNumber to outChan, inclusive.
// If endBlockNumber is -1, it sends blocks from startBlockNumber to the tip of the chain
// it will run continuously until the context is cancelled
SendBlocks(ctx context.Context, outChan chan models.RPCBlock, startBlockNumber, endBlockNumber int64) error

Close() error
}

Expand All @@ -37,13 +31,13 @@ type rpcClient struct {
bufPool *sync.Pool
}

func NewRPCClient(cfg Config, log *slog.Logger) *rpcClient { // revive:disable-line:unexported-return
func NewClient(log *slog.Logger, cfg Config) (*rpcClient, error) { // revive:disable-line:unexported-return
client := retryablehttp.NewClient()
client.RetryMax = MaxRetries
client.Logger = log
client.CheckRetry = retryablehttp.DefaultRetryPolicy
client.Backoff = retryablehttp.LinearJitterBackoff
return &rpcClient{
rpc := &rpcClient{
client: client,
cfg: cfg,
log: log,
Expand All @@ -53,6 +47,12 @@ func NewRPCClient(cfg Config, log *slog.Logger) *rpcClient { // revive:disable-l
},
},
}
// lets validate RPC node is up & reachable
_, err := rpc.LatestBlockNumber()
if err != nil {
return nil, fmt.Errorf("failed to connect to jsonrpc: %w", err)
}
return rpc, nil
}

func (c *rpcClient) LatestBlockNumber() (int64, error) {
Expand Down
39 changes: 14 additions & 25 deletions client/jsonrpc/opstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"log/slog"
"time"

"github.com/duneanalytics/blockchain-ingester/models"
"golang.org/x/sync/errgroup"
Expand All @@ -16,9 +17,12 @@ type OpStackClient struct {

var _ BlockchainClient = &OpStackClient{}

func NewOpStackClient(cfg Config, log *slog.Logger) *OpStackClient {
rpcClient := NewRPCClient(cfg, log)
return &OpStackClient{*rpcClient}
func NewOpStackClient(log *slog.Logger, cfg Config) (*OpStackClient, error) {
rpcClient, err := NewClient(log, cfg)
if err != nil {
return nil, err
}
return &OpStackClient{*rpcClient}, nil
}

// BlockByNumber returns the block with the given blockNumber.
Expand All @@ -31,6 +35,13 @@ func NewOpStackClient(cfg Config, log *slog.Logger) *OpStackClient {
//
// we should handle the case where it is not available
func (c *OpStackClient) BlockByNumber(ctx context.Context, blockNumber int64) (models.RPCBlock, error) {
tStart := time.Now()
defer func() {
c.log.Info("BlockByNumber",
"blockNumber", blockNumber,
"duration", time.Since(tStart),
)
}()
blockNumberHex := fmt.Sprintf("0x%x", blockNumber)

// TODO: split this into mandatory and optional methods
Expand Down Expand Up @@ -79,25 +90,3 @@ func (c *OpStackClient) BlockByNumber(ctx context.Context, blockNumber int64) (m
Payload: buffer.Bytes(),
}, nil
}

func (c *OpStackClient) SendBlocks(
ctx context.Context, outChan chan models.RPCBlock, startBlockNumber, endBlockNumber int64,
) error {
dontStop := endBlockNumber <= startBlockNumber
for blockNumber := startBlockNumber; dontStop || startBlockNumber <= endBlockNumber; blockNumber++ {
block, err := c.BlockByNumber(ctx, blockNumber)
if err != nil {
c.log.Error("Failed to get block by number",
"blockNumber", blockNumber,
"error", err,
)
return err
}
select {
case <-ctx.Done():
return nil
case outChan <- block:
}
}
return nil
}
40 changes: 28 additions & 12 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ import (
"time"

"github.com/duneanalytics/blockchain-ingester/client/duneapi"
"github.com/duneanalytics/blockchain-ingester/client/jsonrpc"
"github.com/duneanalytics/blockchain-ingester/config"
"github.com/duneanalytics/blockchain-ingester/ingester"
"github.com/duneanalytics/blockchain-ingester/models"
)

func init() {
Expand Down Expand Up @@ -42,21 +45,34 @@ func main() {
defer duneClient.Close()

var wg stdsync.WaitGroup
var rpcClient jsonrpc.BlockchainClient

// rpcClient, err := jsonrpc.NewClient(&rpc.Config{
// NodeURL: cfg.BlockchainNodeURL,
// PoolInterval: cfg.PoolInterval,
// Stack: cfg.BlockchainStack,
// })
switch cfg.RPCStack {
case models.OpStack:
rpcClient, err = jsonrpc.NewOpStackClient(logger, jsonrpc.Config{
URL: cfg.RPCNode.NodeURL,
})
default:
stdlog.Fatalf("unsupported RPC stack: %s", cfg.RPCStack)
}
if err != nil {
stdlog.Fatal(err)
}

ingester := ingester.New(
logger,
rpcClient,
duneClient,
ingester.Config{
PollInterval: cfg.PollInterval,
StartBlockHeight: cfg.BlockHeight,
},
)

// harvester, err := harvester.New(&harvester.Config{
// Logger: logger,
// DuneClient: duneClient,
// RPCClient: rpcClient,
// })
wg.Add(1)
ingester.Run(context.Background(), &wg)

// wg.Add(1)
// harvester.Run(ctx, &wg)
// TODO: add a metrics exporter or healthcheck http endpoint ?

_, cancelFn := context.WithCancel(context.Background())
quit := make(chan os.Signal, 1)
Expand Down
11 changes: 5 additions & 6 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ func (d DuneClient) HasError() error {
}

type RPCClient struct {
NodeURL string `long:"rpc-node-url" env:"RPC_NODE_URL" description:"URL for the blockchain node"`
PoolInterval time.Duration `long:"rpc-pool-interval" env:"RPC_POOL_INTERVAL" description:"Interval to pool the blockchain node" default:"500millis"` // nolint:lll
NodeURL string `long:"rpc-node-url" env:"RPC_NODE_URL" description:"URL for the blockchain node"`
}

func (r RPCClient) HasError() error {
Expand All @@ -33,12 +32,12 @@ func (r RPCClient) HasError() error {
}

type Config struct {
BlockHeight int64 `long:"block-height" env:"BLOCK_HEIGHT" description:"block height to start from" default:"-1"` // nolint:lll
BlockchainName string `long:"blockchain-name" env:"BLOCKCHAIN_NAME" description:"name of the blockchain" required:"true"` // nolint:lll
Dune DuneClient
PollInterval time.Duration `long:"rpc-poll-interval" env:"RPC_POLL_INTERVAL" description:"Interval to poll the blockchain node" default:"500millis"` // nolint:lll
RPCNode RPCClient
BlockchainName string `long:"blockchain-name" env:"BLOCKCHAIN_NAME" description:"name of the blockchain" required:"true"` // nolint:lll
RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll
BatchSize int `long:"batch-size" env:"BATCH_SIZE" description:"number of blocks to submit to Dune" default:"3"` // nolint:lll
BlockHeight int64 `long:"block-height" env:"BLOCK_HEIGHT" description:"block height to start from" default:"-1"` // nolint:lll
RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll
}

func (c Config) HasError() error {
Expand Down
82 changes: 82 additions & 0 deletions ingester/ingester.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package ingester

import (
"context"
"log/slog"
"sync"
"time"

"github.com/duneanalytics/blockchain-ingester/client/duneapi"
"github.com/duneanalytics/blockchain-ingester/client/jsonrpc"
"github.com/duneanalytics/blockchain-ingester/models"
)

type Ingester interface {
Run(ctx context.Context, wg *sync.WaitGroup) error

// ConsumeBlocks sends blocks from startBlockNumber to endBlockNumber to outChan, inclusive.
// If endBlockNumber is -1, it sends blocks from startBlockNumber to the tip of the chain
// it will run continuously until the context is cancelled
ConsumeBlocks(ctx context.Context, outChan chan models.RPCBlock, startBlockNumber, endBlockNumber int64) error

// SendBlocks pushes to DuneAPI the RPCBlock Payloads as they are received in an endless loop
// it will block until:
// - the context is cancelled
// - channel is closed
// - a fatal error occurs
SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock) error

// This is just a placeholder for now
Info() Info
}

const defaultMaxBatchSize = 1

type Config struct {
MaxBatchSize int
StartBlockHeight int64
msf marked this conversation as resolved.
Show resolved Hide resolved
PollInterval time.Duration
}

type Info struct {
LatestBlockNumber int64
IngestedBlockNumber int64
ConsumedBlockNumber int64
RPCErrors []ErrorInfo
DuneErrors []ErrorInfo
}

type ErrorInfo struct {
Timestamp time.Time
BlockNumber int64
Error error
}

type ingester struct {
log *slog.Logger
node jsonrpc.BlockchainClient
dune duneapi.BlockchainIngester
cfg Config
info Info
}

func New(log *slog.Logger, node jsonrpc.BlockchainClient, dune duneapi.BlockchainIngester, cfg Config) Ingester {
ing := &ingester{
log: log,
node: node,
dune: dune,
cfg: cfg,
info: Info{
RPCErrors: []ErrorInfo{},
DuneErrors: []ErrorInfo{},
},
}
if ing.cfg.MaxBatchSize == 0 {
ing.cfg.MaxBatchSize = defaultMaxBatchSize
}
return ing
}

func (i *ingester) Info() Info {
return Info{}
}
Loading
Loading